线程池设计

TaskQueue

#pragma once
#include <queue>
#include <pthread.h>

using callback = void (*)(void *arg);
// 任务结构体
template <typename T>
struct Task
{
    Task()
    {
        function = nullptr;
        arg = nullptr;
    }
    Task(callback f, void *arg)
    {
        this->arg = static_cast<T *>(arg);
        function = f;
    }
    callback function;
    T *arg;
};

template <typename T>
class TaskQueue
{
public:
    TaskQueue();
    ~TaskQueue();

    // 添加任务
    void addTask(Task<T> task);
    void addTask(callback f, void *arg);
    // 取出任务
    Task<T> takeTask();
    // 获取当前任务的个数
    inline size_t TaskNumber()
    {
        return m_TaskQ.size();
    }

private:
    pthread_mutex_t m_mutex;
    std::queue<Task<T>> m_TaskQ;
};

template <typename T>
TaskQueue<T>::TaskQueue()
{
    pthread_mutex_init(&m_mutex, NULL);
}

template <typename T>
TaskQueue<T>::~TaskQueue()
{
    pthread_mutex_destroy(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(Task<T> Task)
{
    pthread_mutex_lock(&m_mutex);
    m_TaskQ.push(Task);
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
void TaskQueue<T>::addTask(callback f, void *arg)
{
    pthread_mutex_lock(&m_mutex);
    m_TaskQ.push(Task<T>(f, arg));
    pthread_mutex_unlock(&m_mutex);
}

template <typename T>
Task<T> TaskQueue<T>::takeTask()
{
    Task<T> tmp;
    pthread_mutex_lock(&m_mutex);
    if (!m_TaskQ.empty())
    {
        tmp = m_TaskQ.front();
        m_TaskQ.pop();
    }
    pthread_mutex_unlock(&m_mutex);
    return tmp;
}

这是一个C++模板类的实现,用于存储和管理任务队列。任务队列中的每个任务都是一个Task结构体,包含一个回调函数和一个指向参数的指针。TaskQueue类提供了添加任务、获取任务和获取任务数量等功能。

以下是TaskQueue类的详细解释:

  1. Task结构体:包含一个回调函数function和一个指向参数的指针arg。回调函数的类型为void (*)(void *arg),即接受一个void *类型的参数并返回void的函数指针。
  2. TaskQueue类:包含一个互斥锁m_mutex和一个任务队列m_TaskQ。互斥锁用于保护任务队列的访问,确保在多线程环境下的安全性。
  3. 构造函数TaskQueue():初始化互斥锁m_mutex
  4. 析构函数~TaskQueue():销毁互斥锁m_mutex
  5. addTask(Task task):添加一个任务到任务队列。首先锁定互斥锁,然后将任务添加到队列,最后解锁互斥锁。
  6. addTask(callback f, void *arg):创建一个新的Task结构体,并将其添加到任务队列。首先锁定互斥锁,然后创建一个新的Task结构体并将其添加到队列,最后解锁互斥锁。
  7. takeTask():从任务队列中取出一个任务。首先锁定互斥锁,然后检查队列是否为空,如果不为空,则取出队列中的第一个任务并将其从队列中移除,最后解锁互斥锁。返回取出的任务。
  8. TaskNumber():获取任务队列中的任务数量。直接返回队列的大小。

这个TaskQueue类可以线程池的任务调度和管理。通过将任务添加到任务队列,线程可以从队列中获取任务并执行。

ThreadPool

#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include "TaskQueue.hpp"
#include <iostream>
#include <string.h>
#include <unistd.h>

template <class T>
class ThreadPool
{
public:
    // 创建线程池并进行初始化
    ThreadPool(int min, int max);

    // 销毁线程池
    ~ThreadPool();

    // 给线程池添加任务
    void addTask(Task<T> Task);

    // 获取到线程池中工作线程的个数
    int getBusyNum();

    // 获取到线程池中活着的线程个数
    int getLiveNum();

private:
    // 工作的线程(消费者线程)任务函数
    static void *worker(void *arg);

    // 管理者线程任务函数
    static void *manager(void *arg);

    // 单个线程退出
    void threadExit();

private:
    // 任务队列
    TaskQueue<T> *TaskQ;

    pthread_t managerID;  // 管理线程的ID
    pthread_t *threadIDs; // 线程数组的ID
    int minNum;           // 最小线程的个数
    int maxNum;           // 最大线程的个数
    int liveNum;          // 存活的线程个数
    int busyNum;          // 忙碌的线程个数
    int exitNum;          // 销毁的线程个数

    pthread_mutex_t mutexPool; // 锁整个线程池
    pthread_cond_t notEmpty;   // 判断队列是否为空

    bool shutdown; // 是否要销毁线程池,销毁为ture,不销毁为false

    static const int NUMBER = 2; // 添加线程的个数
};

template <class T>
ThreadPool<T>::ThreadPool(int min, int max)
{
    // 实例化任务队列
    TaskQ = new TaskQueue<T>;
    do
    {
        if (nullptr == TaskQ)
        {
            std::cout << "TaskQ malloc failed...\n";
            break;
        }
        threadIDs = new pthread_t[max];
        if (nullptr == threadIDs)
        {
            std::cout << "threadIDs malloc failed..." << '\n';
            break;
        }
        memset(threadIDs, 0, sizeof(pthread_t) * max);
        minNum = min;
        maxNum = max;
        busyNum = 0;
        liveNum = min;
        exitNum = 0;

        if (pthread_mutex_init(&mutexPool, NULL) != 0 ||
            pthread_cond_init(&notEmpty, NULL) != 0)
        {
            std::cout << "mutex or condition init failed...\n";
            break;
        }

        shutdown = false;

        // 创建线程
        pthread_create(&managerID, NULL, manager, this);
        for (int i = 0; i < min; ++i)
        {
            pthread_create(&threadIDs[i], NULL, worker, this);
        }
        return;
    } while (0);
    // 申请失败释放资源
    if (threadIDs)
        delete[] threadIDs;
    if (TaskQ)
        delete TaskQ;
}
template <class T>
ThreadPool<T>::~ThreadPool()
{
    // 关闭线程池
    std::cout << __FILE__ << ":" << __func__ << ":" << __LINE__ << std::endl;
    shutdown = true;
    // 阻塞回收管理者线程
    pthread_join(managerID, NULL);
    // 唤醒阻塞的消费者线程
    for (int i = 0; i < liveNum; ++i)
    {
        pthread_cond_signal(&notEmpty);
    }
    // 释放内存
    if (TaskQ)
    {
        delete TaskQ;
    }
    if (threadIDs)
    {
        delete[] threadIDs;
    }
    pthread_mutex_destroy(&mutexPool);
    pthread_cond_destroy(&notEmpty);
}

template <class T>
void ThreadPool<T>::addTask(Task<T> Task)
{
    if (shutdown)
        return;
    // 添加任务
    TaskQ->addTask(Task);
    pthread_cond_signal(&notEmpty);
}

template <class T>
int ThreadPool<T>::getBusyNum()
{
    pthread_mutex_lock(&mutexPool);
    int busyNum = this->busyNum;
    pthread_mutex_unlock(&mutexPool);
    return busyNum;
}

template <class T>
int ThreadPool<T>::getLiveNum()
{
    pthread_mutex_lock(&mutexPool);
    int liveNum = this->liveNum;
    pthread_mutex_unlock(&mutexPool);
    return liveNum;
}

template <class T>
void *ThreadPool<T>::worker(void *arg)
{
    ThreadPool *pool = static_cast<ThreadPool *>(arg);
    while (true)
    {
        pthread_mutex_lock(&pool->mutexPool);
        // 当前任务队列是否为空
        if (0 == pool->TaskQ->TaskNumber() && !pool->shutdown)
        {
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if (0 < pool->exitNum)
            {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    pool->threadExit();
                }
            }
        }

        // 判断线程池是否被关闭了
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexPool);
            pool->threadExit();
        }

        // 从任务队列中取出一个任务
        Task<T> Task = pool->TaskQ->takeTask();

        pool->busyNum++;

        // 解锁
        pthread_mutex_unlock(&pool->mutexPool);

        std::cout << "thread" << std::to_string(pthread_self()) << "start working...\n";

        Task.function(Task.arg);
        delete Task.arg;
        Task.arg = nullptr;

        std::cout << "thread" << std::to_string(pthread_self()) << "end working...\n";
        pthread_mutex_lock(&pool->mutexPool);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexPool);
    }
    return nullptr;
}

template <class T>
void *ThreadPool<T>::manager(void *arg)
{
    ThreadPool *pool = static_cast<ThreadPool *>(arg);
    while (!pool->shutdown)
    {
        // 每三秒检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量和工作线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->TaskQ->TaskNumber();
        int liveNum = pool->liveNum;
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 添加线程
        // 任务个数 > 存活的线程个数 && 存活的线程个数 < 最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            for (int i = 0; i < pool->maxNum && counter < pool->NUMBER && pool->liveNum < pool->maxNum; ++i)
            {
                if (0 == pool->threadIDs[i])
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }

        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程数 > 最小的线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            for (int i = 0; i < NUMBER; ++i)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return nullptr;
}

template <class T>
void ThreadPool<T>::threadExit()
{
    pthread_t tid = pthread_self();
    pthread_mutex_lock(&mutexPool);
    for (int i = 0; i < maxNum; ++i)
    {
        if (tid == threadIDs[i])
        {
            threadIDs[i] == 0;
            std::cout << "threadExit() called , " << std::to_string(tid) << "    exiting... \n";
            break;
        }
    }
    pthread_mutex_unlock(&mutexPool);
    pthread_exit(NULL);
}

#endif

这是一个C++模板类的实现,用于创建和管理一个线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池类似于一个任务处理的工厂,将任务添加到队列,然后创建线程来处理这些任务。线程池包含工作线程(worker threads)和一个管理线程(manager thread)。工作线程负责执行任务队列中的任务,管理线程负责根据任务队列的大小动态调整线程池中的线程数量,可以有效地控制线程的数量,避免大量线程之间的竞争导致系统性能下降。

以下是ThreadPool类的详细解释:

  1. 构造函数ThreadPool(int min, int max):创建一个线程池并进行初始化。参数minmax分别表示线程池中的最小线程数和最大线程数。构造函数中完成任务队列的实例化、线程ID数组的分配、互斥锁和条件变量的初始化、管理线程的创建以及最小数量的工作线程的创建。
  2. 析构函数~ThreadPool():销毁线程池。设置线程池关闭标志,回收管理线程,唤醒阻塞的工作线程,释放任务队列和线程ID数组的内存,销毁互斥锁和条件变量。
  3. addTask(Task Task):向线程池中添加任务。将任务添加到任务队列,并发出条件变量信号以唤醒阻塞的工作线程。
  4. getBusyNum():获取线程池中忙碌线程的数量。
  5. getLiveNum():获取线程池中存活线程的数量。
  6. worker(void *arg):工作线程的任务函数。循环执行以下操作:锁定线程池互斥锁,检查任务队列是否为空,如果为空且线程池未关闭,则阻塞等待任务到来;检查是否需要销毁线程,如果需要且当前存活线程数大于最小线程数,则销毁线程;从任务队列中取出任务并执行,执行完毕后更新忙碌线程数。
  7. manager(void *arg):管理线程的任务函数。循环执行以下操作:每隔3秒检查一次任务队列的大小、存活线程数和忙碌线程数;根据任务队列的大小和存活线程数判断是否需要添加线程,如果需要则创建新的工作线程;根据忙碌线程数和存活线程数判断是否需要销毁线程,如果需要则设置退出线程的数量,并发出条件变量信号以唤醒阻塞的工作线程。
  8. threadExit():单个线程退出函数。获取当前线程ID,遍历线程ID数组,找到对应的线程并将其设置为0,然后退出线程。

线程池类还包含以下私有成员变量:

  1. TaskQueue:任务队列。
  2. managerID:管理线程的ID。
  3. threadIDs:线程数组的ID。
  4. minNum:最小线程的个数。
  5. maxNum:最大线程的个数。
  6. liveNum:存活的线程个数。
  7. busyNum:忙碌的线程个数。
  8. exitNum:销毁的线程个数。
  9. mutexPool:锁整个线程池。
  10. notEmpty:判断队列是否为空的条件变量。
  11. shutdown:是否要销毁线程池,销毁为true,不销毁为false。
  12. NUMBER:添加线程的个数,常量。

线程池的工作原理:

  1. 创建线程池时,会创建一个管理者线程和一定数量的工作线程。
  2. 管理者线程负责监控任务队列和工作线程的数量,根据任务数量动态地创建或销毁工作线程。
  3. 工作线程从任务队列中取出任务并执行,执行完毕后继续等待新任务。
  4. 当线程池被销毁时,管理者线程会通知所有工作线程退出,并等待它们退出后释放资源。

main.cpp

#include "ThreadPool.hpp"

void TaskFunc(void* arg)
{
    int num = *(int*)arg;
    std::cout << "thread" << std::to_string(pthread_self()) << "is working, number = "<< num << '\n';
    sleep(1);
}

int main()
{
    // 创建线程池
    ThreadPool<int>* pool =new ThreadPool<int>(3, 5);
    for (int i = 0; i < 20; ++i)
    {
        int* num = new int(i+100);
        pool->addTask(Task<int>(TaskFunc, num));
    }
    sleep(10);
    delete pool;
    return 0;
}

解决线程处理函数传参问题

使用POSIX线程库(pthread)时,线程处理函数只能有一个参数:

#include <stdio.h>
#include <pthread.h>

typedef struct {
    int a;
    int b;
} thread_args;

void *thread_function(void *arg) {
    thread_args *args = (thread_args *)arg;
    printf("Thread function called with %d and %d\n", args->a, args->b);
    return NULL;
}

int main() {
    pthread_t t;
    thread_args args = {42, 24};
    pthread_create(&t, NULL, thread_function, (void *)&args);
    pthread_join(t, NULL);
    return 0;
}

在这个例子中,线程处理函数thread_function只能有一个参数,因此需要将多个参数封装到一个结构体中,并将结构体指针传递给线程处理函数。

线程处理函数的参数数量取决于所使用的编程语言和线程库。在某些情况下,线程处理函数只能有一个参数,但在其他情况下,线程处理函数可以有多个参数。如果线程处理函数只能有一个参数,可以通过将多个参数封装到一个结构体或类中来实现传递多个参数。

06-30 13:29