线程同步

线程仅仅互斥,是可以保证线程安全的。但是,这不合理!如果一个线程竞争到了锁,那么再它释放后它依然可以竞争个锁。因为CPU此时正在执行当前线程,所以该线程又可以继续竞争锁。 这样就会造成一个问题,有5个线程抢10000张票,可是线程1就抢了9000张,线程2抢了1000张。这就会造成其他另外三个线程一直申请锁却申请不到的情况。这个问题也被称为饥饿问题。

举个例子:

一个自习室只有一把钥匙,而每次这个自习室只能待一个人,且这个人持有钥匙。有一天你凌晨2点就起来去抢自习室,你抢到了并把自习室钥匙放口袋里进去自习,这时外面逐渐有人来自习,但是他们没有这个自习室的钥匙。所以他们只能在外面等待,这时你突然想出去吃个饭,上个厕所。那么你带着钥匙出去,并在外面把门反锁。这种行为就是线程持有锁挂起了。 而不久后你回到自习室,不太想学了,所以你就走到门口,把钥匙挂到墙上。可是你突然转念一想 : “算了,还是再学会吧。”。因为此时你离这个钥匙最近,所以你竞争的能力非常强。你就又拿到钥匙,进去自习了一份,你又想走了。于是又来到门口把钥匙挂门口隔壁的墙上,然后刚挂上去,你又想继续学了。又把钥匙拿下来继续进去学习… 如此反复。就造成了门外等待自习室的人的饥饿问题。而当你学完时真正准备走时,你把钥匙挂墙上后。结果外面的人争先恐后,毫无秩序的冲过来抢夺钥匙。

这里的你和外面自习室的人都是一个个线程,而钥匙就是这把锁,自习室就是临界资源。只有持有锁才能访问临界资源。可是因为锁刚从你当前的线程中释放,那么竞争锁能力最强的也是你当前的线程。 那么这样就很容易会造成其他线程的饥饿问题。 但是这个线程它有错吗??? 它没错!但是这不合理!!

那么怎么让这合理起来呢?

要合理起来,那么就让释放锁的线程,跑到最后面去排队。 还是上面那个例子,一旦你把钥匙挂墙上。那么你就必须老老实实到后面排队。这样可以确保一定的顺序时,但是还会面临一个问题。 有人插队怎么办???

从线程的角度来说,线程A已经把锁释放了。随后线程A跑到了最后的位置等待调度。这样可以保证一定的顺序的问题。接下来CPU准备调度线程B了,可是线程C它想配合,直接插队到了线程B前面。所以CPU就先调度线程C了。

CPU,线程A,B,C,它们有错吗? CPU只负责调度,谁来了调度谁,CPU没错。A,B,C线程都在尽心尽力的竞争锁。因为锁本来就是临界资源,它们也没错。但是,这不合理!!

于是就有了条件变量,可以保证线程同步。

条件变量

条件变量如何保证线程同步呢?还是自习室的例子,当你在自习室。外面等待的人都在睡觉(线程挂起),当你要走了挂回钥匙时(释放锁),外面的人就全部醒过来(线程唤醒)竞争锁。那么我们不要让他们全部醒过来,那么我们加一个管理员,让管理员每次只喊醒队伍最前面的那个,然后再让刚刚退出自习室的人到最后面去排队。 这样依次下去,是不是就能保证一定的顺序性了?

同步的情况下,想要每个线程先竞争锁 -> 竞争锁后检测是否满足访问临界资源的条件 -> 满足则访问 -> 不满足就在条件变量下等待 -> 等待之前释放锁 -> 唤醒回来重新获得锁 -> 访问临界资源

而让线程等待和唤醒线程。我们都需要用到条件变量。

条件变量和锁一样是一个变量,我们可以用条件变量让线程在该条件变量下等待。然后让主线程去唤醒条件变量。

条件变量相关函数:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //如果条件变量是全局的,可以这样初始化

//初始化全局变量,第一个参数是条件变量的地址,第二参数是条件变量的属性
int pthread_cond_init(pthread_cond_t *restrict cond,
                      const pthread_condattr_t *restrict attr);

//销毁条件变量,传条件变量地址
int pthread_cond_destroy(pthread_cond_t *cond);

//在条件变量下等待,第一个参数是条件变量地址,第二个参数是锁的地址,第三个参数是要等待的时间
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
              pthread_mutex_t *restrict mutex,
              const struct timespec *restrict abstime);

//在条件变量下等待,第一个参数是要等待的条件变量地址,第二个参数是锁
int pthread_cond_wait(pthread_cond_t *restrict cond,
              pthread_mutex_t *restrict mutex);

//依次唤醒所有在条件变量下等待的线程
int pthread_cond_broadcast(pthread_cond_t *cond);

//唤醒单个在条件变量下等待的线程
int pthread_cond_signal(pthread_cond_t *cond);

代码测试:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>

#define TNUM 4

typedef void (*func_t)(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond);

int tickets = 5;
bool flag = true;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *mutex, pthread_cond_t *cond) : _name(name), _func(func), _mutex(mutex), _cond(cond) {}

public:
    std::string _name;
    func_t _func;
    pthread_mutex_t *_mutex;
    pthread_cond_t *_cond;
};

void func1(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....下载" << std::endl; 
        pthread_mutex_unlock(mutex);
    }
}
void func2(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....查看用户" << std::endl; 
        pthread_mutex_unlock(mutex);
    }
}
void func3(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....扫描" << std::endl; 
        pthread_mutex_unlock(mutex);
    }
}
void func4(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(cond, mutex); //在条件变量下等待
        std::cout<< name  << " runing ....广播" << std::endl; 
        pthread_mutex_unlock(mutex);
    }
}

void *Entry(void *args)
{
    ThreadData *td = (ThreadData *)args;
    td->_func(td->_name, td->_mutex, td->_cond); //调用线程绑定的函数
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx; // 互斥锁
    pthread_cond_t cond; // 条件变量

    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    func_t funs[4] = {func1, func2, func3, func4}; //函数指针数组,存储上面4个函数
    pthread_t tids[4];

    // 创建线程
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funs[i], &mtx, &cond); //创建线程数据对象,存储线程的数据以及锁,条件变量信息
        pthread_create(tids + i, nullptr, Entry, (void *)td);
    }
    sleep(5);
    int cnt = 10;
    while (cnt)
    {
        //每隔一秒唤醒一个线程
        std::cout << "wakeup thread ......  " << cnt--  << std::endl; 
        pthread_cond_signal(&cond); //唤醒一个线程
        sleep(1);
    }   
    std::cout << "ctrl done" << std::endl;
    flag = false; //结束线程内的循环
    //走到这里,所有线程依旧处于wait状态,在这里需要再唤醒一次
    pthread_cond_broadcast(&cond); //唤醒所有线程
    
    for (int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread " << i + 1 << "   quit....." << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

这个代码的逻辑是1个主线程负责唤醒条件变量下等待的线程。其他四个线程负责输出一条打印语句,然后进入等待。

运行结果:

【Linux】线程同步-LMLPHP

我们可以发现明显的顺序性,但第一次的顺序是无法确定的。因为CPU先调度的线程会先等待,先等待的会被先唤醒。但是后面的次序都和第一次的次序一样。这就保证了线程的同步。

注意!!!

pthread_cond_wait 必须在加锁和解锁之间等待!!因为pthread_cond_wait函数会让线程在等待之前释放锁,其而让其他线程进入临界资源。等到被唤醒时,又会重新获取锁。这也就是为什么 要在加锁和解锁之间wait。如果不在加锁和解锁之间wait,那么在最后想要在唤醒所有线程的时候就会产生死锁!!因为pthread_cond_wait的第二个参数就是一把锁,wait后会释放锁,被唤醒后重新获得锁。所以当最后一次唤醒时,被唤醒的线程就持有锁结束了。而其他线程就会在条件变量下等待锁,但是持有锁的线程已经释放了。所以就产生了死锁。

错误代码代表:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>

#define TNUM 2

typedef void (*func_t)(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond);

int tickets = 5;
bool flag = true;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *mutex, pthread_cond_t *cond) : _name(name), _func(func), _mutex(mutex), _cond(cond) {}

public:
    std::string _name;
    func_t _func;
    pthread_mutex_t *_mutex;
    pthread_cond_t *_cond;
};

void func1(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        //wait不加锁
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....下载" << std::endl; 
    }
}
void func2(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        //wait不加锁
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....查看用户" << std::endl; 
    }
}

void *Entry(void *args)
{
    ThreadData *td = (ThreadData *)args;
    td->_func(td->_name, td->_mutex, td->_cond); //调用线程绑定的函数
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx; // 互斥锁
    pthread_cond_t cond; // 条件变量

    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funs[TNUM] = {func1,func2};

    // 创建线程
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funs[i], &mtx, &cond); //创建线程数据对象,存储线程的数据以及锁,条件变量信息
        pthread_create(tids + i, nullptr, Entry, (void *)td);
    }
    sleep(5);
    int cnt = 10;
    while (cnt)
    {
        //每隔一秒唤醒一个线程
        std::cout << "wakeup thread ......  " << cnt--  << std::endl; 
        pthread_cond_signal(&cond); //唤醒一个线程
        sleep(1);
    }   
    std::cout << "ctrl done" << std::endl;
    flag = false; //结束线程内的循环
    //走到这里,所有线程依旧处于wait状态,在这里需要再唤醒一次
    pthread_cond_broadcast(&cond); //唤醒所有线程
    // pthread_cond_signal(&cond); //唤醒一个线程
    //pthread_cond_signal(&cond); //唤醒一个线程
    
    std::cout << "--------------------------" << std::endl;
    for (int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread " << i + 1 << "   quit....." << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

最后的运行结果就是10秒后最后一次唤醒时,线程1持有锁结束了。而线程2还在等待锁被唤醒,所以就变成了死锁。这时候就无法跳出来了。

【Linux】线程同步-LMLPHP

而在循环内之所可以那是因为在singal之后。wait又重新获得了锁,然后经过了一次循环又来到wait,wait等待前会先释放锁。所以这时候其他线程又可以争夺锁,但是在最后一次的时候,线程1singal后不再wait。那么也就是在前一次wait之后获得了锁,随后线程结束时还持有锁,而线程2还在等待锁被唤醒。

而在线程结束前释放锁,又可以正常结束了:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string>

#define TNUM 2

typedef void (*func_t)(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond);

int tickets = 5;
bool flag = true;

class ThreadData
{
public:
    ThreadData(const std::string &name, func_t func, pthread_mutex_t *mutex, pthread_cond_t *cond) : _name(name), _func(func), _mutex(mutex), _cond(cond) {}

public:
    std::string _name;
    func_t _func;
    pthread_mutex_t *_mutex;
    pthread_cond_t *_cond;
};

void func1(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....下载" << std::endl; 
        pthread_mutex_unlock(mutex); //临走前释放锁
    }
}
void func2(const std::string &name, pthread_mutex_t *mutex, pthread_cond_t *cond)
{
    while (flag)
    {
        pthread_cond_wait(cond, mutex);  //在条件变量下等待
        std::cout<< name  << " runing ....查看用户" << std::endl; 
        pthread_mutex_unlock(mutex);//临走前释放锁

    }
}

void *Entry(void *args)
{
    ThreadData *td = (ThreadData *)args;
    td->_func(td->_name, td->_mutex, td->_cond); //调用线程绑定的函数
    delete td;
    return nullptr;
}

int main()
{
    pthread_mutex_t mtx; // 互斥锁
    pthread_cond_t cond; // 条件变量

    pthread_mutex_init(&mtx, nullptr);
    pthread_cond_init(&cond, nullptr);

    pthread_t tids[TNUM];
    func_t funs[TNUM] = {func1,func2};

    // 创建线程
    for (int i = 0; i < TNUM; i++)
    {
        std::string name = "thread ";
        name += std::to_string(i + 1);
        ThreadData *td = new ThreadData(name, funs[i], &mtx, &cond); //创建线程数据对象,存储线程的数据以及锁,条件变量信息
        pthread_create(tids + i, nullptr, Entry, (void *)td);
    }
    sleep(5);
    int cnt = 10;
    while (cnt)
    {
        //每隔一秒唤醒一个线程
        std::cout << "wakeup thread ......  " << cnt--  << std::endl; 
        pthread_cond_signal(&cond); //唤醒一个线程
        sleep(1);
    }   
    std::cout << "ctrl done" << std::endl;
    flag = false; //结束线程内的循环
    //走到这里,所有线程依旧处于wait状态,在这里需要再唤醒一次
    pthread_cond_broadcast(&cond); //唤醒所有线程
    //pthread_cond_signal(&cond); //唤醒一个线程
    //pthread_cond_signal(&cond); //唤醒一个线程
    
    std::cout << "--------------------------" << std::endl;
    for (int i = 0; i < TNUM; i++)
    {
        pthread_join(tids[i], nullptr);
        std::cout << "thread " << i + 1 << "   quit....." << std::endl;
    }

    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&cond);

    return 0;
}

【Linux】线程同步-LMLPHP

但是!!这是一种错误的写法!!请务必哪里有加锁,就哪里有解锁,wait在加锁和解锁之间完成。因为wait是对临界资源的条件检测,所以wait本身也应该在临界区之内。

10-26 22:40