1.线程同步的概念

同步即协同步调,按预定的先后次序运行。

线程同步,指一个线程发出某一功能调用时,在没有得到结果之前,该调用不返回。同时其它线程为保证数据一致性,不能调用该功能。

假设有 4 个线程 A、B、C、D,当前一个线程 A 对内存中的共享资源进行访问的时候,其他线程 B, C, D 都不可以对这块内存进行操作,直到线程 A 对这块内存访问完毕为止,B,C,D 中的一个才能访问这块内存,剩余的两个需要继续阻塞等待,以此类推,直至所有的线程都对这块内存操作完毕。 线程对内存的这种访问方式就称之为线程同步,通过对概念的介绍,我们可以了解到==所谓的同步并不是多个线程同时对内存进行访问,而是按照先后顺序依次进行的==。如果不按照这种规则,就会产生数据混乱。

2.数据混乱原因

  1. 资源共享(独享资源则不会)
  2. 调度随机(意味着数据访问会出现竞争)
  3. 线程间缺乏必要的同步机制。

以上 3 点中,前两点不能改变,欲提高效率,传递数据,资源必须共享。只要共享资源,就一定会出现竞争。只要存在竞争关系,数据就很容易出现混乱。
所以只能从第三点着手解决。使多个线程在访问共享资源的时候,出现互斥。

3.互斥量mutex

Linux 中提供一把互斥锁mutex(也称之为互斥量)。 ==每个线程在对资源操作前都尝试先加锁,成功加锁才能操作==,操作结束解锁。 资源还是共享的,线程间也还是竞争的,但通过“锁”就将资源的访问变成互斥操作,而后与时间有关的错误也不会再产生了。但应注意:==同一时刻,只能有一个线程持有该锁。== 当 A 线程对某个全局变量加锁访问,B 在访问前尝试加锁,拿不到锁,B 阻塞。C 线程不去加锁,而直接访问该全局变量,依然能够访问,但会出现数据混乱。 所以,互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制。但,并没有强制限定。因此,即使有了mutex,如果有线程不按规则来访问数据,依然会造成数据混乱。

01.png

主要应用的函数

  • pthread_mutex_init 函数
  • pthread_mutex_destroy 函数
  • pthread_mutex_lock 函数
  • pthread_mutex_trylock 函数
  • pthread_mutex_unlock 函数

以上 5 个函数的返回值都是:成功返回 0, 失败返回错误号。 pthread_mutex_t 类型,其本质是一个结构体。为简化理解,应用时可忽略其实现细节,简单当成整数看待。下面具体介绍这些函数:

在 Linux 中互斥锁的类型为 pthread_mutex_t,创建一个这种类型的变量就得到了一把互斥锁:

pthread_mutex_t mutex;

变量mutex 只有两种取值 1、0

3.1 初始化和销毁函数

1
2
3
4
5
6
// 初始化互斥锁
// restrict: 是一个关键字, 用来修饰指针, 只有这个关键字修饰的指针可以访问指向的内存地址, 其他指针是不行的
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
// 释放互斥锁资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);
  • 参数:
    • mutex: 传出参数,互斥锁变量的地址,调用时应传 &mutex
    • restrict 关键字:是一个关键字, 用来修饰指针,告诉编译器,所有修改该指针指向内存中内容的操作,只能通过本指针完成。 不能通过除本指针以外的其他变量或指针修改
    • attr: 互斥锁的属性,一般使用默认属性即可,这个参数指定为 NULL
  1. 静态初始化:如果互斥锁 mutex 是静态分配的(定义在全局,或加了 static 关键字修饰),可以直接使用宏进行初始化。

    pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;

  2. 动态初始化:局部变量应采用动态初始化。

    pthread_mutex_init(&mutex, NULL)

3.1 pthread_mutex_lock()函数

1
2
// 修改互斥锁的状态, 将其设定为锁定状态, 这个状态被写入到参数 mutex 中
int pthread_mutex_lock(pthread_mutex_t *mutex);

这个函数被调用,首先会判断参数 mutex 互斥锁中的状态是不是锁定状态:

  • 没被锁定,是打开的,这个线程可以加锁成功,这个这个锁中会记录是哪个线程加锁成功了
  • 如果被锁定了,其他线程加锁就失败了,这些线程都会阻塞在这把锁上
  • 当这把锁被解开之后,这些阻塞在锁上的线程就解除阻塞了,并且这些线程是通过竞争的方式对这把锁加锁,没抢到锁的线程继续阻塞

3.2 pthread_mutex_trylock()函数

1
2
// 尝试加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex);

调用这个函数对互斥锁变量加锁还是有两种情况:

  • 如果这把锁没有被锁定是打开的,线程加锁成功
  • 如果锁变量被锁住了,调用这个函数加锁的线程,不会被阻塞,加锁失败直接返回错误号

3.3 pthread_mutex_unlock()函数

1
2
// 对互斥锁解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);

不是所有的线程都可以对互斥锁解锁,哪个线程加的锁,哪个线程才能解锁成功。

3.4 互斥锁使用

案例:两个线程交替数数(每个线程数 50 个数,交替数到 100)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

#define MAX 100
// 全局变量
int number;

// 创建一把互斥锁
// 全局变量, 多个线程共享
pthread_mutex_t mutex;

// 线程处理函数
void* funcA_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
// 如果线程A加锁成功, 不阻塞
// 如果B加锁成功, 线程A阻塞
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
usleep(10);
number = cur;
pthread_mutex_unlock(&mutex);
printf("Thread A, id = %lu, number = %d\n", pthread_self(), number);
}

return NULL;
}

void* funcB_num(void* arg)
{
for(int i=0; i<MAX; ++i)
{
// a加锁成功, b线程访问这把锁的时候是锁定的
// 线程B先阻塞, a线程解锁之后阻塞解除
// 线程B加锁成功了
pthread_mutex_lock(&mutex);
int cur = number;
cur++;
number = cur;
pthread_mutex_unlock(&mutex);
printf("Thread B, id = %lu, number = %d\n", pthread_self(), number);
usleep(5);
}

return NULL;
}

int main(int argc, const char* argv[])
{
pthread_t p1, p2;

// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);

// 创建两个子线程
pthread_create(&p1, NULL, funcA_num, NULL);
pthread_create(&p2, NULL, funcB_num, NULL);

// 阻塞,资源回收
pthread_join(p1, NULL);
pthread_join(p2, NULL);

// 销毁互斥锁
// 线程销毁之后, 再去释放互斥锁
pthread_mutex_destroy(&mutex);

return 0;
}

互斥锁使用技巧:

注意事项:

​ 尽量保证锁的粒度, 越小越好。(访问共享数据前,加锁。访问结束立即解锁。)

​ 互斥锁,本质是结构体。 我们可以看成整数。 初值为 1。(pthread_mutex_init() 函数调用成功。)

​ 加锁: –操作, 小于0,阻塞线程。

​ 解锁: ++操作, 唤醒阻塞在锁上的线程。

​ try锁:尝试加锁,成功–。失败,返回直接返回错误号

4. 死锁

当多个线程访问共享资源,需要加锁,如果锁使用不当,就会造成死锁这种现象。如果线程死锁造成的后果是:所有的线程都被阻塞,并且线程的阻塞是无法解开的(因为可以解锁的线程也被阻塞了)。

造成死锁的场景有如下几种:

  1. 线程试图对同一个互斥量 A 加锁两次。
  2. 线程 1 拥有 A 锁,请求获得 B 锁;线程 2 拥有 B 锁,请求获得 A 锁
  3. 加锁之后忘记解锁

02.png

5.读写锁

与互斥量类似,但读写锁允许更高的并行性。其特性为:==写独占,读共享。==

读写锁的状态:

特别强调:==读写锁只有一把==,但其具备两种状态:

  1. 读模式下加锁状态 (读锁)

  2. 写模式下加锁状态 (写锁)

读写锁的特性

  1. 读写锁是“写模式加锁”时, 解锁前,所有对该锁加锁的线程都会被阻塞。
  2. 读写锁是“读模式加锁”时, 如果线程以读模式对其加锁会成功;如果线程以写模式加锁会阻塞。
  3. 读写锁是“读模式加锁”时, 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求。优先满足写模式锁。==读锁、写锁并行阻塞,写锁优先级高==

读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独 占模式锁住的。写独占、读共享。
读写锁非常适合于对数据结构读的次数远大于写的情况。

主要应用的函数

  • pthread_rwlock_init 函数
  • pthread_rwlock_destroy 函数
  • pthread_rwlock_rdlock 函数
  • pthread_rwlock_wrlock 函数
  • pthread_rwlock_tryrdlock 函数
  • pthread_rwlock_trywrlock 函数
  • pthread_rwlock_unlock 函数

以上 7 个函数的返回值都是:成功返回 0, 失败直接返回错误号。下面具体介绍这些函数

读写锁是一把锁,锁的类型为 pthread_rwlock_t,有了类型之后就可以创建一把互斥锁了:

1
pthread_rwlock_t rwlock;

5.1 初始化和销毁函数

1
2
3
4
5
6
7
#include <pthread.h>
pthread_rwlock_t rwlock;
// 初始化读写锁
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
// 释放读写锁占用的系统资源
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
  • 参数:
    • rwlock: 读写锁的地址,传出参数
    • attr: 读写锁属性,一般使用默认属性,指定为 NULL

5.2 pthread_rwlock_rdlock()函数

1
2
// 在程序中对读写锁加读锁, 锁定的是读操作
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;如果读写锁已经锁定了写操作,调用这个函数的线程会被阻塞。

5.3 pthread_rwlock_tryrdlock()函数

1
2
3
// 这个函数可以有效的避免死锁
// 如果加读锁失败, 不会阻塞当前线程, 直接返回错误号
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作,调用这个函数依然可以加锁成功,因为读锁是共享的;如果读写锁已经锁定了写操作,调用这个函数加锁失败,对应的线程不会被阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

5.4 pthread_rwlock_wrlock()函数

1
2
// 在程序中对读写锁加写锁, 锁定的是写操作
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者锁定了写操作,调用这个函数的线程会被阻塞。

5.5 pthread_rwlock_trywrlock()函数

1
2
3
// 这个函数可以有效的避免死锁
// 如果加写锁失败, 不会阻塞当前线程, 直接返回错误号
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

调用这个函数,如果读写锁是打开的,那么加锁成功;如果读写锁已经锁定了读操作或者锁定了写操作,调用这个函数加锁失败,但是线程不会阻塞,可以在程序中对函数返回值进行判断,添加加锁失败之后的处理动作。

5.6 pthread_rwlock_unlock()函数

1
2
// 解锁, 不管锁定了读还是写都可用解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

5.7 读写锁的使用

8 个线程操作同一个全局变量,3 个线程不定时写同一全局资源,5 个线程不定时读同一全局资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

// 全局变量
int number = 0;

// 定义读写锁
pthread_rwlock_t rwlock;

// 写的线程的处理函数
void* writeNum(void* arg)
{
while(1)
{
pthread_rwlock_wrlock(&rwlock);
int cur = number;
cur ++;
number = cur;
printf("++写操作完毕, number : %d, tid = %ld\n", number, pthread_self());
pthread_rwlock_unlock(&rwlock);
// 添加sleep目的是要看到多个线程交替工作
usleep(rand() % 100);
}

return NULL;
}

// 读线程的处理函数
// 多个线程可以如果处理动作相同, 可以使用相同的处理函数
// 每个线程中的栈资源是独享
void* readNum(void* arg)
{
while(1)
{
pthread_rwlock_rdlock(&rwlock);
printf("--全局变量number = %d, tid = %ld\n", number, pthread_self());
pthread_rwlock_unlock(&rwlock);
usleep(rand() % 100);
}
return NULL;
}

int main()
{
// 初始化读写锁
pthread_rwlock_init(&rwlock, NULL);

// 3个写线程, 5个读的线程
pthread_t wtid[3];
pthread_t rtid[5];
for(int i=0; i<3; ++i)
{
pthread_create(&wtid[i], NULL, writeNum, NULL);
}

for(int i=0; i<5; ++i)
{
pthread_create(&rtid[i], NULL, readNum, NULL);
}

// 释放资源
for(int i=0; i<3; ++i)
{
pthread_join(wtid[i], NULL);
}

for(int i=0; i<5; ++i)
{
pthread_join(rtid[i], NULL);
}

// 销毁读写锁
pthread_rwlock_destroy(&rwlock);

return 0;
}

6.条件变量

条件变量本身不是锁!但它也可以造成线程阻塞。通常与互斥锁配合使用。给多线程提供一个会合的场所。

主要应用的函数

  • pthread_cond_init 函数
  • pthread_cond_destroy 函数
  • pthread_cond_wait 函数
  • pthread_cond_timedwait 函数
  • pthread_cond_signal 函数
  • pthread_cond_broadcast 函数

以上 6 个函数的返回值都是:成功返回 0, 失败直接返回错误号。下面具体介绍这些函数

条件变量类型对应的类型为 pthread_cond_t,这样就可以定义一个条件变量类型的变量了:

1
pthread_cond_t cond;

被条件变量阻塞的线程的线程信息会被记录到这个变量中,以便在解除阻塞的时候使用。

6.1 初始化和销毁函数

1
2
3
4
5
6
7
#include <pthread.h>
pthread_cond_t cond;
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
// 销毁释放资源
int pthread_cond_destroy(pthread_cond_t *cond);
  • 参数:
    • cond: 条件变量的地址
    • attr: 条件变量属性,一般使用默认属性,指定为 NULL
  1. 静态初始化:可以直接使用宏进行初始化。pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  2. 动态初始化:局部变量应采用动态初始化。pthread_mutex_init(&cond, NULL)

6.2 pthread_cond_wait()函数

1
2
// 线程阻塞函数, 哪个线程调用这个函数, 哪个线程就会被阻塞
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

函数作用:

  1. 阻塞等待条件变量 cond(参 1)满足
  2. 释放已掌握的互斥锁(解锁互斥量)相当于 pthread_mutex_unlock(&mutex); 1.2.两步为一个原子操作。
  3. 当被唤醒,pthread_cond_wait 函数返回时,解除阻塞并重新申请获取互斥锁 pthread_mutex_lock(&mutex);

03.png

6.3 pthread_cond_timedwait()函数

1
2
3
4
5
6
7
8
// 表示的时间是从1971.1.1到某个时间点的时间, 总长度使用秒/纳秒表示
struct timespec {
time_t tv_sec; /* Seconds 秒*/
long tv_nsec; /* Nanoseconds 纳秒 */
};
// 将线程阻塞一定的时间长度, 时间到达之后, 线程就解除阻塞了
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

这个函数的前两个参数和 pthread_cond_wait 函数是一样的,第三个参数表示线程阻塞的时长,但是需要额外注意一点:struct timespec 这个结构体中记录的时间是从1971.1.1到某个时间点的时间,总长度使用秒/纳秒表示。因此赋值方式相对要麻烦一点:

1
2
3
4
5
6
7
8
如:time(NULL)返回的就是绝对时间。而 alarm(1)是相对时间,相对当前时间定时 1 秒钟。 
struct timespec t = {1, 0};
pthread_cond_timedwait (&cond, &mutex, &t); 只能定时到 19701100:00:01 秒(早已经过去)
正确用法:
time_t cur = time(NULL); 获取当前时间。
struct timespec t; 定义 timespec 结构体变量t
t.tv_sec = cur+1; 定时 1
pthread_cond_timedwait (&cond, &mutex, &t); 传参

6.4 唤醒函数

1
2
3
4
// 唤醒阻塞在条件变量上的线程, 至少有一个被解除阻塞
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒阻塞在条件变量上的线程, 被阻塞的线程全部解除阻塞
int pthread_cond_broadcast(pthread_cond_t *cond);

调用上面两个函数中的任意一个,都可以唤醒被 pthread_cond_wait 或者 pthread_cond_timedwait 阻塞的线程,区别就在于 pthread_cond_signal 是唤醒至少一个被阻塞的线程(总个数不定),pthread_cond_broadcast 是唤醒所有被阻塞的线程。

6.5 生产者消费者条件变量模型

线程同步典型的案例即为生产者消费者模型,而借助条件变量来实现这一模型,是比较常见的一种方法。假定 有两个线程,一个模拟生产者行为,一个模拟消费者行为。两个线程同时操作一个共享资源(一般称之为汇聚),生产向其中添加产品,消费者从中消费掉产品。

04.png

小案例:使用条件变量实现生产者和消费者模型,生产者有 5 个,往链表头部添加节点,消费者也有 5 个,删除链表头部的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

// 链表的节点
struct Node
{
int number;
struct Node* next;
};

// 定义条件变量, 控制消费者线程
pthread_cond_t cond;
// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针
struct Node * head = NULL;

// 生产者的回调函数
void* producer(void* arg)
{
// 一直生产
while(1)
{
pthread_mutex_lock(&mutex);
// 创建一个链表的新节点
struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
// 节点初始化
pnew->number = rand() % 1000;
// 节点的连接, 添加到链表的头部, 新节点就新的头结点
pnew->next = head;
// head指针前移
head = pnew;
printf("+++producer, number = %d, tid = %ld\n", pnew->number, pthread_self());
pthread_mutex_unlock(&mutex);

// 生产了任务, 通知消费者消费
pthread_cond_broadcast(&cond);

// 生产慢一点
sleep(rand() % 3);
}
return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)
{
while(1)
{
pthread_mutex_lock(&mutex);
// 一直消费, 删除链表中的一个节点
// if(head == NULL) // 这样写有bug
while(head == NULL)
{
// 任务队列, 也就是链表中已经没有节点可以消费了
// 消费者线程需要阻塞
// 线程加互斥锁成功, 但是线程阻塞在这行代码上, 锁还没解开
// 其他线程在访问这把锁的时候也会阻塞, 生产者也会阻塞 ==> 死锁
// 这函数会自动将线程拥有的锁解开
pthread_cond_wait(&cond, &mutex);
// 当消费者线程解除阻塞之后, 会自动将这把锁锁上
// 这时候当前这个线程又重新拥有了这把互斥锁
}
// 取出链表的头结点, 将其删除
struct Node* pnode = head;
printf("--consumer: number: %d, tid = %ld\n", pnode->number, pthread_self());
head = pnode->next;
free(pnode);
pthread_mutex_unlock(&mutex);

sleep(rand() % 3);
}
return NULL;
}

int main()
{
// 初始化条件变量
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);

// 创建5个生产者, 5个消费者
pthread_t ptid[5];
pthread_t ctid[5];
for(int i=0; i<5; ++i)
{
pthread_create(&ptid[i], NULL, producer, NULL);
}

for(int i=0; i<5; ++i)
{
pthread_create(&ctid[i], NULL, consumer, NULL);
}

// 释放资源
for(int i=0; i<5; ++i)
{
// 阻塞等待子线程退出
pthread_join(ptid[i], NULL);
}

for(int i=0; i<5; ++i)
{
pthread_join(ctid[i], NULL);
}

// 销毁条件变量
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);

return 0;
}

6.6 条件变量的优点:

相较于mutex 而言,条件变量可以减少竞争。 如直接使用mutex,除了生产者、消费者之间要竞争互斥量以外,消费者之间也需要竞争互斥量,但如果汇聚(链表)中没有数据,消费者之间竞争互斥锁是无意义的。有了条件变量机制以后,只有生产者完成生产,才会引
起消费者之间的竞争。提高了程序效率。

7.信号量

进化版的互斥锁(1 –> N) 由于互斥锁的粒度比较大,如果我们希望在多个线程间对某一对象的部分数据进行共享,使用互斥锁是没有办法实现的,只能将整个数据对象锁住。这样虽然达到了多线程操作共享数据时保证数据正确性的目的,却无形中导致线程的并发性下降。线程从并行执行,变成了串行执行。与直接使用单进程无异。 信号量,是相对折中的一种处理方式,既能保证同步,数据不混乱,又能提高线程并发。

主要应用函数

  • sem_init 函数
  • sem_destroy 函数
  • sem_wait 函数
  • sem_trywait 函数
  • sem_timedwait 函数
  • sem_post 函数

以上 6 个函数的返回值都是:成功返回 0, 失败返回-1,同时设置 errno。(注意,它们没有 pthread 前缀)

信号量(信号灯)与互斥锁和条件变量的主要不同在于” 灯” 的概念,灯亮则意味着资源可用,灯灭则意味着不可用。信号量主要阻塞线程,不能完全保证线程安全,如果要保证线程安全,需要信号量和互斥锁一起使用。

信号量和条件变量一样用于处理生产者和消费者模型,用于阻塞生产者线程或者消费者线程的运行。信号的类型为sem_t对应的头文件为 <semaphore.h>

1
2
#include <semaphore.h>
sem_t sem;

7.1 初始化和销毁函数

1
2
3
4
5
6
#include <semaphore.h>
// 初始化信号量/信号灯
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 资源释放, 线程销毁之后调用这个函数即可
// 参数 sem 就是 sem_init() 的第一个参数
int sem_destroy(sem_t *sem);
  • 参数:
    • sem:信号量变量地址
    • pshared
      • 0:线程同步
      • 非 0:进程同步
    • value:初始化当前信号量拥有的资源数(>=0),如果资源数为 0,线程就会被阻塞了。

7.2 sem_wait()函数

1
2
3
// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用sem中的资源就会被消耗1个, 资源数-1
int sem_wait(sem_t *sem);

当线程调用这个函数,并且 sem 中的资源数 >0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 - 1,直到 sem 中的资源数减为 0 时,资源被耗尽,因此线程也就被阻塞了。

7.3 sem_trywait()函数

1
2
3
// 参数 sem 就是 sem_init() 的第一个参数  
// 函数被调用sem中的资源就会被消耗1个, 资源数-1
int sem_trywait(sem_t *sem);

当线程调用这个函数,并且 sem 中的资源数 >0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 - 1,直到sem中的资源数减为 0 时,资源被耗尽,但是线程不会被阻塞,直接返回错误号,因此可以在程序中添加判断分支,用于处理获取资源失败之后的情况。

7.4 sem_timedwait()函数

1
2
3
4
5
6
7
8
// 表示的时间是从1971.1.1到某个时间点的时间, 总长度使用秒/纳秒表示
struct timespec {
time_t tv_sec; /* Seconds */
long tv_nsec; /* Nanoseconds [0 .. 999999999] */
};
// 调用该函数线程获取sem中的一个资源,当资源数为0时,线程阻塞,在阻塞abs_timeout对应的时长之后,解除阻塞。
// abs_timeout: 阻塞的时间长度, 单位是s, 是从1970.1.1开始计算的
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

该函数的参数 abs_timeout pthread_cond_timedwait 的最后一个参数是一样的,使用方法不再过多赘述。当线程调用这个函数,并且 sem 中的资源数 >0,线程不会阻塞,线程会占用 sem 中的一个资源,因此资源数 - 1,直到 sem 中的资源数减为 0 时,资源被耗尽,线程被阻塞,当阻塞指定的时长之后,线程解除阻塞。

7.5 sem_post()函数

1
2
// 调用该函数给sem中的资源数+1
int sem_post(sem_t *sem);

调用该函数会将 sem 中的资源数 +1,如果有线程在调用 sem_waitsem_trywaitsem_timedwait 时因为 sem 中的资源数为 0 被阻塞了,这时这些线程会解除阻塞,获取到资源之后继续向下运行。

7.6 生产者消费者信号量模型

小案例:使用信号量实现生产者和消费者模型,生产者有 5 个,往链表头部添加节点,消费者也有 5 个,删除链表头部的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>

// 链表的节点
struct Node
{
int number;
struct Node* next;
};

// 生产者线程信号量
sem_t psem;
// 消费者线程信号量
sem_t csem;

// 互斥锁变量
pthread_mutex_t mutex;
// 指向头结点的指针
struct Node * head = NULL;

// 生产者的回调函数
void* producer(void* arg)
{
// 一直生产
while(1)
{
// 生产者拿一个信号灯
sem_wait(&psem);
// 加锁, 这句代码放到 sem_wait()上边, 有可能会造成死锁
pthread_mutex_lock(&mutex);
// 创建一个链表的新节点
struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
// 节点初始化
pnew->number = rand() % 1000;
// 节点的连接, 添加到链表的头部, 新节点就新的头结点
pnew->next = head;
// head指针前移
head = pnew;
printf("+++producer, number = %d, tid = %ld\n", pnew->number, pthread_self());
pthread_mutex_unlock(&mutex);

// 通知消费者消费
sem_post(&csem);

// 生产慢一点
sleep(rand() % 3);
}
return NULL;
}

// 消费者的回调函数
void* consumer(void* arg)
{
while(1)
{
sem_wait(&csem);
pthread_mutex_lock(&mutex);
struct Node* pnode = head;
printf("--consumer: number: %d, tid = %ld\n", pnode->number, pthread_self());
head = pnode->next;
// 取出链表的头结点, 将其删除
free(pnode);
pthread_mutex_unlock(&mutex);
// 通知生产者生成, 给生产者加信号灯
sem_post(&psem);

sleep(rand() % 3);
}
return NULL;
}

int main()
{
// 初始化信号量
sem_init(&psem, 0, 5); // 生成者线程一共有5个信号灯
sem_init(&csem, 0, 0); // 消费者线程一共有0个信号灯
// 初始化互斥锁
pthread_mutex_init(&mutex, NULL);

// 创建5个生产者, 5个消费者
pthread_t ptid[5];
pthread_t ctid[5];
for(int i=0; i<5; ++i)
{
pthread_create(&ptid[i], NULL, producer, NULL);
}

for(int i=0; i<5; ++i)
{
pthread_create(&ctid[i], NULL, consumer, NULL);
}

// 释放资源
for(int i=0; i<5; ++i)
{
pthread_join(ptid[i], NULL);
}

for(int i=0; i<5; ++i)
{
pthread_join(ctid[i], NULL);
}

sem_destroy(&psem);
sem_destroy(&csem);
pthread_mutex_destroy(&mutex);

return 0;
}