#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

class ThreadPool {
public:
    ThreadPool(size_t num_threads) : stop(false) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back(
                [this] {
                    for (;;) {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty()) {
                            return;
                        }
                        auto task = std::move(this->tasks.front());
                        this->tasks.pop();
                        lock.unlock();
                        task();
                    }
                }
            );
        }
    }

    template <typename Func, typename... Args>
    auto enqueue(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
        using return_type = typename std::result_of<Func(Args...)>::type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
        );
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) {
                throw std::runtime_error("enqueue on stopped threadpool");
            }
            tasks.emplace([task](){ (*task)(); });
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
        for (std::thread& worker : workers) {
            worker.join();
        }
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

该线程池类使用了C++11的多线程库,包括std::thread、std::mutex、std::condition_variable和std::future。该类使用一个vector存储工作线程,一个队列存储任务。

在构造函数中,创建了指定数量的工作线程,并将每个线程的执行函数设置为一个无限循环,等待任务的到来。在任务队列中有任务时,线程会从队列中获取任务并执行。

在enqueue函数中,将任务包装成一个std::packaged_task对象,并将其加入到任务队列中。std::packaged_task是一个可以异步执行的函数或函数对象的封装器,它可以与std::future一起使用,使得可以获取函数的返回值。

在析构函数中,将stop标志设置为true,以通知所有工作线程停止执行。然后,通过调用notify_all()函数,通知所有正在等待的线程,以便它们可以终止运行。

使用线程池时,可以使用enqueue函数添加任务,该函数会将任务添加到任务队列中,并通知等待的线程开始执行任务。可以使用std::future对象获取函数的返回值。

线程池的工作原理

线程池是一种常见的多线程编程技术,它可以有效地管理和复用线程资源,从而提高程序的性能和响应能力。

线程池通常包括一个固定数量的工作线程,每个线程都会从任务队列中获取任务并执行。当线程完成任务时,它将返回到线程池中,等待下一个任务的到来。这样,线程池可以避免频繁地创建和销毁线程,从而提高程序的性能和可维护性。

线程池的工作原理可以简单地描述为以下几个步骤:

  1. 创建一个固定数量的工作线程,并将它们设置为等待任务的到来。

  2. 将任务加入到任务队列中。

  3. 当有任务到来时,线程池中的线程会从任务队列中获取任务并执行。

  4. 线程执行完任务后,它将返回到线程池中,等待下一个任务的到来。

  5. 当线程池不再需要使用时,将任务队列中的所有任务都完成执行后,关闭线程池,并销毁所有线程。

在线程池中,任务队列通常使用线程安全的队列来实现。在添加任务时,需要先将任务包装成可执行的函数或函数对象,然后将其加入到任务队列中。在线程池中,每个工作线程会不断地从任务队列中获取任务并执行,直到线程池关闭或任务队列为空为止。

线程池的好处在于它可以帮助开发人员管理线程资源,减少线程创建和销毁的开销,并提高程序的性能和可维护性。

04-09 23:23