Linux 线程同步
之前讲过线程互斥,互斥解决了临界资源的问题,但是又会引入新的问题,这时候同步机制就诞生出来了。所以,为什么会有新的问题?同步又是什么呢?怎么去操作呢?今天一起学习这原理吧~目录为什么要有同步机制条件变量函数接口pthread_cond_initpthread_cond_destroypthread_cond_t cond = PTHREAD_COND_INITIALIZERpthread_cond
之前讲过线程互斥,互斥解决了临界资源的问题,但是又会引入新的问题,这时候同步机制就诞生出来了。所以,为什么会有新的问题?同步又是什么呢?怎么去操作呢?今天一起学习这原理吧~
目录
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
为什么要有同步机制
互斥锁解决了访问临界资源的问题,但是如果一个线程竞争能力很强,它就会一直竞争锁、释放锁。这样的话其它线程就没能力竞争到锁,也就长时间不能不能执行任务,一直被挂起等待,这样就造成了"饥饿问题"。
基于上面的问题,所以要引入同步机制,在保证数据安全的前提下(通常是互斥和原子性),让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。
条件变量
函数接口
pthread_cond_init
pthread_cond_t* cond:定义的的条件变量的地址,进行初始化
const pthread_condattr_t* restrict attr:相关属性,一般设置成nullptr
pthread_cond_destroy
pthread_cond_t* cond:传条件变量的地址,进行销毁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER
定义静态、全局的条件变量,不用手动初始化和销毁。
pthread_cond_wait
pthread_cond_t* restrict cond:传条件变量的地址,让该条件变量进行等待
pthread_mutex_t* restrict mutex:传入互斥锁的地址
调用该函数后,该条件变量进行等待,同时把锁给释放掉,当该条件变量被唤醒后,会主动竞争锁然后返回。
pthread_cond_signal
pthread_cond_t* cond:传该条件变量的地址,唤醒该条件变量下的等待的线程。
pthread_cond_broadcast
pthread_cond_t* cond:唤醒该条件变量下的所有线程
代码练习:
#include<iostream>
#include<string>
#include<unistd.h>
#include<pthread.h>
using namespace std;
pthread_mutex_t mtx; //互斥锁
pthread_cond_t cond; //条件变量
void* ctrl(void* args)
{
string name = (char*)args;
while(true)
{
cout << "master say: begin work" << endl;
pthread_cond_signal(&cond);
sleep(3);
}
}
void* work(void* args)
{
int number = *(int*)args;
delete (int*)args;
while(true)
{
pthread_cond_wait(&cond, &mtx);
cout << "work: " << number << " is working..." << endl;
}
}
int main()
{
#define NUM 3
pthread_mutex_init(&mtx, nullptr); //初始化锁
pthread_cond_init(&cond, nullptr); //初始化条件变量
pthread_t master;
pthread_t worker[NUM];
pthread_create(&master, nullptr, ctrl, (void*)"master");
for(int i = 0; i < NUM; i++)
{
int* number = new int(i);
pthread_create(worker+i ,nullptr, work, (void*)number);
}
pthread_join(master, nullptr);
for(int i = 0; i < NUM; i++)
{
pthread_join(worker[i], nullptr);
}
pthread_mutex_destroy(&mtx); //释放锁
pthread_cond_destroy(&cond); //释放条件变量
return 0;
}
运行结果:
我们发现work工作是按照一定的规律(不同环境下次序不同),说明每当一个线程等待时,它是放在队列中。 这个队列也就是接下来要讲的阻塞队列~
我们把ctrl中的pthread_cond_signal换成pthread_cond_broadcast试试:
运行结果:
我们发现broadcast这样就是一次唤醒该条件变量下等待的所有线程了。
生产者消费者模型
第一层理解
第二层理解
第三层理解
超市收集需求,减少消费者的购买成本,同时将生产环节和消费环节进行"解耦"。
总结
生产者消费者模型:
1、将消费环节和生产环节进行了"解耦",提高效率。
2、"321原则",3种关系,2种角色,1个交易场所。
3、优势在于多生产和多消费的获取和处理任务,而不是放任务和拿任务
基于阻塞队列的多生产和多消费者模型
之前讲过的进程间通信:
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
BlockQueue.hpp
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
//.h .cc .cpp
//.hpp -> 一般在开源软件使用 -> 声明和定义放在一个文件里
namespace ns_blockqueue
{
const int default_cap = 5;
template<class T>
class BlockQueue
{
private:
bool IsFull()
{
return _bq.size() == _cap;
}
bool IsEmpty()
{
return _bq.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mtx);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mtx);
}
void ProducterWait()
{
//pthread_cond_wait 为什么要传&_mtx(锁)?
//1、调用的时候,会首先自动释放_mtx!,然后挂起自己(不能抱着锁挂起,这样就成死锁了)
//2、返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
pthread_cond_wait(&_is_empty,&_mtx);
}
void ConsumerWait()
{
pthread_cond_wait(&_is_full,&_mtx);
}
void WakeupComsumer()
{
pthread_cond_signal(&_is_full);
}
void WakeupProducter()
{
pthread_cond_signal(&_is_empty); //把空条件变量唤醒,
}
public:
BlockQueue(int cap = default_cap)
:_cap(cap)
{
pthread_mutex_init(&_mtx,nullptr);
pthread_cond_init(&_is_empty,nullptr);
pthread_cond_init(&_is_full,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_is_empty);
pthread_cond_destroy(&_is_full);
}
//const &: 输入
//*: 输出
//&: 输入输出
void Push(const T& in)
{
LockQueue();
//临界区
//if(IsFull()) //bug? 满了,就不放数据,该线程进行等待~
/*我们需要进行条件检测的的时候,这里需要使用循环的方式
来保证退出循环—定是因为条件不满足导致的!*/
while(IsFull()) //bug? 满了,就不放数据,该线程进行等待~
{
//等待的,把线程挂起,我们当前是持有锁的!!
ProducterWait(); //不能用break
}
//不满时,向队列中放数据,生产函数
_bq.push(in);
//生产者知道什么时候消费
if(_bq.size() > _cap/2)//如果当前队列数据超过一半,就唤醒消费者的线程
{
WakeupComsumer();
}
UnlockQueue();
}
void Pop(T* out) //删除同时,拿到队头数据
{
LockQueue();
//从队列中拿数据,消费函数
//if(IsEmpty())//bug? 空了,就不拿数据,该线程进行等待
/*我们需要进行条件检测的的时候,这里需要使用循环的方式
来保证退出循环—定是因为条件不满足导致的!*/
while(IsEmpty())//bug? 空了,就不拿数据,该线程进行等待
{
ConsumerWait();
}
*out = _bq.front();
_bq.pop();
//消费者知道什么时候生产
if(_bq.size() < _cap/2)//如果当前队列中数据小于一半,就唤醒生产者
{
WakeupProducter();
}
UnlockQueue();
}
private:
queue<T> _bq; //我们的阻塞队列
int _cap; //队列的元素上限
pthread_mutex_t _mtx;//保护临界资源的锁
//1、当生产者满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费
//2、当消费空了,就不应该消费(不要竞争锁了),应该让生产者来进行生产
pthread_cond_t _is_full; //_bq满的,消费者在该条件变量下等待
pthread_cond_t _is_empty; //_bq空的,生产者在该条件变量下等待
};
}
cp_test.cc
#include "BlockQueue.hpp"
#include<stdlib.h>
#include<unistd.h>
#include<time.h>
using namespace std;
using namespace ns_blockqueue;
void* consumer(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
sleep(2);
int data = 0;
bq->Pop(&data);
cout << "消费者消费了一个数据:" << data << endl;
}
}
void* producter(void* args)
{
BlockQueue<int>* bq = (BlockQueue<int>*)args;
while(true)
{
//sleep(2);
//1、制造数据,生产者的数据(task)从哪里来?
int data = rand()%20 + 1;
cout << "生产者生产数据:" << data << endl;
bq->Push(data);
}
}
int main()
{
srand((unsigned int)time(NULL));
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t c,p;
pthread_create(&c, nullptr, consumer, (void*)bq);
pthread_create(&p, nullptr, producter, (void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
运行结果:
bug??
再来谈pthread_condwait
pthread_cond_wait 为什么要传&_mtx(锁)?
1、调用的时候,会首先自动释放_mtx!,然后挂起自己(不能抱着锁挂起,这样就成死锁了)
2、返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
注意
生产者消费者模型的矛盾是生产者生产数据的时间,消费者处理数据的时间。消费者处理数据同时,生产者生产数据,放到运行队列中,本质上这体现了并发性。进而提高了效率!
信号量
概念
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
实际上对一块临界资源我们可以进行细分,分成不同的小块,这时候就有可能让不同的执行流就可以访问不同区域从而实现并发,而信号量就可以用来描述分成小块的个数,从而控制执行流进入临界资源的数目。
信号量函数接口
sem_init
sem_t* sem:传信号量的地址
int pshared:表示是否进程间共享信号量,我们一般设置成0,表示不共享。
unsigned int value:定义信号量的初始值
sem_destroy
sem_t* sem:传信号量的地址,进行释放该信号量
sem_wait
sem_t* sem:P()操作,对信号量原子性的进行减减
sem_post
sem_t* sem:对该信号量进行原子性的加加
基于环形队列的生产者消费者模型
ring_cp.cc
#include"ring_queue.hpp"
#include"task.hpp"
#include<pthread.h>
#include<time.h>
#include<unistd.h>
using namespace ns_ring_queue;
using namespace ns_task;
void* consumer(void* args)
{
RingQueue<Task>* rq = (RingQueue<Task>*)args;
while(true)
{
Task t;
rq->Pop(&t);
t(); //比较耗时
//sleep(1);
}
}
void* producter(void* args)
{
RingQueue<Task>* rq = (RingQueue<Task>*)args;
string ops = "+-*/%";
while(true)
{
int x = rand()%20 + 1;
int y = rand()%10 + 1;
char op = ops[rand()%5];
Task t(x, y, op);
rq->Push(t);
cout << "生产数据是: " << t.Show() << "我是: " << pthread_self() << endl;
sleep(1);
}
}
int main()
{
srand((unsigned int)time(nullptr));
RingQueue<Task>* rq = new RingQueue<Task>;
pthread_t c0,c1,c2,c3,p0,p1,p2;
pthread_create(&c0, nullptr, consumer, (void*)rq);
pthread_create(&c1, nullptr, consumer, (void*)rq);
pthread_create(&c2, nullptr, consumer, (void*)rq);
pthread_create(&c3, nullptr, consumer, (void*)rq);
pthread_create(&p0, nullptr, producter, (void*)rq);
pthread_create(&p1, nullptr, producter, (void*)rq);
pthread_create(&p2, nullptr, producter, (void*)rq);
pthread_join(c0, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p0,nullptr);
pthread_join(p1,nullptr);
pthread_join(p2,nullptr);
return 0;
}
ring_queue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>
#include<pthread.h>
using namespace std;
namespace ns_ring_queue
{
const int g_cap_default = 10;
template<class T>
class RingQueue
{
public:
RingQueue(int cap = g_cap_default)
:_ring_queue(cap) //初始化cap个T()
,_cap(cap)
{
sem_init(&_blank_sem,0,cap);
sem_init(&_data_sem,0,0);
_c_step = _p_step = 0;
pthread_mutex_init(&_c_mtx, nullptr);
pthread_mutex_init(&_p_mtx, nullptr);
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_c_mtx);
pthread_mutex_destroy(&_p_mtx);
}
//目前高优先级的先实现单生产和单消费
void Push(const T& in)
{
//生产接口
//Lock();// 方法一 放在它前面效率太低
/*如果申请锁成功,而申请信号量失败时,那么该线程就抱着锁
挂起了,只有消费者消费才会申请信号量成功,实际上,一次只有
一个执行流过来,就算写成多执行流其实和单执行流没有差别*/
sem_wait(&_blank_sem); //P(空位置) --
pthread_mutex_lock(&_p_mtx);
//Lock(); //方法二
/* 对于多执行流情况,多个执行流可以同时申请信号量(原子性的,没有问题)
也就是提前准备信号量,但是只能有一个执行流申请锁进入临界资源,当它释放锁
后其他执行流可以立马申请锁了。效率高
*/
//可以生产了,可是往哪个位置生产呢?
_ring_queue[_p_step] = in;
_p_step++;
_p_step %= _cap; //防止越界
pthread_mutex_unlock(&_p_mtx);
sem_post(&_data_sem); //V(数据) ++
}
void Pop(T* out)
{
//消费者接口
sem_wait(&_data_sem); //P(数据) --
pthread_mutex_lock(&_c_mtx);
*out = _ring_queue[_c_step]; //取队头数据
_c_step++;
_c_step %= _cap; //防止越界
pthread_mutex_unlock(&_c_mtx);
sem_post(&_blank_sem); //V(空位置) ++
}
private:
vector<T> _ring_queue;
int _cap;
sem_t _blank_sem; //生产者关心空位置资源
sem_t _data_sem; //消费者关心数据资源
//记录消费者和生产者各自的下标位置
int _c_step; //消费者
int _p_step; //生产者
pthread_mutex_t _c_mtx;
pthread_mutex_t _p_mtx;
};
}
task.hpp
#pragma once
#include <iostream>
#include <pthread.h>
using namespace std;
namespace ns_task
{
class Task
{
public:
Task()
{}
Task(int x, int y, char op)
: _x(x), _y(y), _op(op)
{}
string Show()
{
string message = to_string(_x);
message += _op;
message += to_string(_y);
message += "=?";
return message;
}
int Run()
{
int res = 0;
switch (_op)
{
case '+':
res = _x + _y;
break;
case '-':
res = _x - _y;
break;
case '*':
res = _x * _y;
break;
case '/':
res = _x / _y;
break;
case '%':
res = _x % _y;
break;
default:
cout << "bug??" << endl;
break;
}
cout << "当前任务正在被: " << pthread_self() << " 处理: " << _x << _op << _y << "=" << res << endl;
return res;
}
int operator()()
{
return Run();
}
private:
int _x;
int _y;
char _op; //+-*/%
};
}
运行结果:
这时候就可以让生产者消费者进行任务处理。
处理数据本身是比较耗时的,但是在处理数据的同时生产者也在生产数据。在多生产和多消费情况下,多个消费者拿到数据(拿数据的过程一次只能有一个线程进入临界区)后可以一起处理数据。多生产者也是可以共同生产任务,这样实现了多生产和多消费的并发性。
放数据和拿数据的过程,生产者和消费者两方各自一次只允许一个执行流操作,这是互斥实现的。
多生产者多消费者生产和处理任务是同步机制实现的。
总结
条件变量(一把锁)解决了互斥条件下,一个执行流不断竞争锁造成其它线程"饥饿问题"的不合理情况。让每一个执行流按照一种顺序(比如队列性质的顺序)来竞争锁资源。
信号量(两把锁)实现了多线程的并发性。要访问临界资源,就必须要申请信号量,信号量的计算(++、--)是原子性的!
注意:
信号量本身的PV操作就可以进行原子性的加加、减减计算,这时候多个执行流可以共同进行PV操作加减计数器。而对于互斥来说加加、减减计算就需要控制一次只有一个执行流访问临界资源(计数器)。上面代码信号量加锁是为了控制queue的插入删除的非原子性操作。
看到这里,支持博主一下吧~
更多推荐
所有评论(0)