一.条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

创建条件变量

【Linux】:线程(三)同步和消费者模型-LMLPHP

cond:要初始化的条件变量。
attr:NULL。

等待条件满足

【Linux】:线程(三)同步和消费者模型-LMLPHP

唤醒等待

【Linux】:线程(三)同步和消费者模型-LMLPHP

一个示例

我想让每一个线程依次对一个全局变量cnt做++操作。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sched.h>
#include <iostream>
using namespace std;
pthread_mutex_t mutex=PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP;//初始化锁
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;//初始化条件变量

int cnt=0;

void *route(void *arg)
{
  char *id =(char*)arg;
  while(1) 
  {
    pthread_mutex_lock(&mutex);//上锁
    pthread_cond_wait(&cond,&mutex);//为什么在这等待?因为它在让线程等待时会自动释放锁

    cout<<id<<",cnt:"<<cnt++<<endl;

    pthread_mutex_unlock(&mutex);//解锁
  }
} 
int main()
{
  pthread_t t1, t2, t3, t4;
  pthread_create(&t1, NULL, route, (void*)"thread 1");
  pthread_create(&t2, NULL, route, (void*)"thread 2");
  pthread_create(&t3, NULL, route, (void*)"thread 3");
  pthread_create(&t4, NULL, route, (void*)"thread 4");
  sleep(1);
  cout<<"main thread begin:"<<endl;

  while(1)
  {
    sleep(1);
    pthread_cond_signal(&cond);//唤醒正在等待的一个线程,默认是第一个
    //pthread_cond_broadcast(&cond);//唤醒所有线程
    cout<<"signal one pthread...."<<endl;
  }

  pthread_join(t1, NULL);
  pthread_join(t2, NULL);
  pthread_join(t3, NULL);
  pthread_join(t4, NULL);

  pthread_mutex_destroy(&mutex);//销毁

  return 0;
}

【Linux】:线程(三)同步和消费者模型-LMLPHP

可以看到到此,所创建的线程均以一定顺序对cnt进行了++操作。注意这里的顺序不一定是1234,也有可能是2341…但它一定呈现周期性。

二.生产者和消费者模型

1.概念和特点

【Linux】:线程(三)同步和消费者模型-LMLPHP

模型特点:

3种关系:消费者和消费者,生产者和生产者,生产者和消费者。

2种角色:消费者和生产者。

1个交易场所:特定的内存空间。

【Linux】:线程(三)同步和消费者模型-LMLPHP

2.实现基于阻塞队列的生产者消费者模型

【Linux】:线程(三)同步和消费者模型-LMLPHP

实现一个cp模型

设置水平线,当产品数量低于水平线时开始生产,当产品数量高于水平线时开始消费。

BlockQueue.hpp

#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>

template<class T>
class Blockqueue
{
public:
  Blockqueue(int capacity=20)
    :capacity_(capacity)//默认容量是20
  {
    pthread_mutex_init(&mutex_, nullptr);//初始化锁
    pthread_cond_init(&c_cond_, nullptr);//初始化消费者条件变量
    pthread_cond_init(&p_cond_, nullptr);//初始化生产者条件变量
    water_=capacity_/3;//水平线
  }

  T pop()
  {
    pthread_mutex_lock(&mutex_);//上锁
    if(q_.size()==0)
    {
      //如果没有产品,消费者开始等待
      pthread_cond_wait(&c_cond_,&mutex_);
    }
    T out=q_.front();
    q_.pop();
    if(q_.size()<water_)
    {
      //如果数量小于水平线,唤醒生产者
      pthread_cond_signal(&p_cond_);
    }

    pthread_mutex_unlock(&mutex_);//解锁
    return out;
  }

  void push(const T&in)
  {
    pthread_mutex_lock(&mutex_);//上锁
    if(q_.size()>capacity_)
    { 
      //如果数量高于容量,生产者开始等待
      pthread_cond_wait(&p_cond_,&mutex_); 
    }
    q_.push(in);
    if(q_.size()>water_)
    {
      //如果数量高于水平线,唤醒消费者
      pthread_cond_signal(&c_cond_);
    }

    pthread_mutex_unlock(&mutex_);//解锁
  }

  ~Blockqueue()
  {
    pthread_mutex_destroy(&mutex_);
    pthread_cond_destroy(&c_cond_);
    pthread_cond_destroy(&p_cond_);
  }

private:
  std::queue<T> q_; // 共享资源
  int capacity_;     //容量
  pthread_mutex_t mutex_;
  pthread_cond_t c_cond_;
  pthread_cond_t p_cond_;

  int water_;
};

main.cc

#include"Blockqueue.hpp"

void*Constumer(void*args)//消费者
{
  Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);//强转

  while(1)
  {
    int out=bq->pop();
    std::cout<<"消费了一个数据:"<<out<<std::endl;
  }
}

void*Producer(void*args)//生产者
{
  Blockqueue<int> *bq=static_cast<Blockqueue<int>*>(args);
  int data=0;
  while(1)
  {
    sleep(1);
    bq->push(data);
    std::cout<<"生产了一个数据:"<<data++<<std::endl;
  }
}

int main()
{
  Blockqueue<int> *dp=new Blockqueue<int>();
  pthread_t t,c;
  pthread_create(&t,nullptr,Constumer,(void*)dp);
  pthread_create(&c,nullptr,Producer,(void*)dp);

  pthread_join(c,nullptr);
  pthread_join(t,nullptr);
  delete dp;
  return 0;
}


【Linux】:线程(三)同步和消费者模型-LMLPHP

12-14 09:44