C++实现简单的线程池

线程池编程简介:

在我们的服务端的程序中运用了大量关于池的概念,线程池、连接池、内存池、对象池等等。使用池的概念后可以高效利用服务器端的资源,比如没有大量的线程在系统中进行上下文的切换,一个数据库连接池,也只需要维护一定里的连接,而不是占用很多数据库连接资源。同时它们也避免了一些耗时的操作,比如创建一个线程,申请一个数据库连接,而且可能就只使用那么一次,然后就立刻释放刚申请的资源,效率很低。

在我的上一篇blog中已经实现一个线程基类了,在这里我们只需要实现一个线程池类ThreadPool和该线程池调度的工作线程类WorkThread即可,而且WorkThread是继承自Thread类的。

实现思路:

一个简单的线程池的实现思路一般如下:

  1. 在ThreadPool中创建多个线程(WorkThreadk对象),每个线程均处于阻塞状态,等待任务的到来
  1. ThreadPool提供一个提交任务的接口,如post_job(ProcCallback
    func, void* data); post_job后会立即返回,不会阻塞
  2. ThreadPool维护一个空闲线程队列,当客户程序调用post_job()后,如果空闲队列中有空闲线程,则取出一个线程句柄,并设置任务再给出新任务通知事件即可,处理等待的线程捕捉到事件信号后便开始执行任务,执行完后将该线程句柄重新push到空闲线程队列中
  3. 该线程池采用回调函数方式

首先我们实现一个WorkThread类:

         cb_func_    = NULL;
         param_        = NULL;
     }
               {
         cb_func_    = func;
         param_        = param;
         notify();                        {
                      cb_func_(param_);
     
                   param_        = NULL;
     
               }
     
      ThreadPool*    thr_pool_;           };

该WorkThread中,有一个回调函数指针和参数,当有新任务时,会在run()中被调用,执行完后会将该线程移动到空闲线程队列,等待下一次任务的提交。

ThreadPool类定义如下:

        WorkThread*    get_idle_thread();                                       size_t                thr_num_;                                     ThreadPool&     };
      线程池实现的关键是如何创建多个线程,并且当任务来临时可以从线程池中取一个线程(也就是去得到其中一个线程的指针),然后提交任务并执行。还有一点就是当任务执行完后,应该将该线程句柄重新加入到空闲线程队列,所以我们将ThreadPool的指针传入给了WorkThread,thr_pool_最后可以调用move_to_idle_que(this)来将该线程句柄移到空闲队列中。
ThreadPool中一些关键代码的实现:
                  {
             printf("start_thread_pool: failed when create a work thread: %d\n", i);
             delete pthr;
                      }
         append_idle_thread(pthr);        
     }
      }
  {
          {
         WorkThread* pthr = thr_vec_[i];
         pthr->join();
         delete pthr;
     }
     thr_vec_.clear();
     idle_que_.clear();
      }
  {
     stop_thread_pool();
 }
  {
     thr_vec_.push_back(pthread);
     idle_que_.push(pthread);
 }
  {
     idle_que_.push(idlethread);
 }
 WorkThread* ThreadPool::get_idle_thread()
 {
     WorkThread*    pthr = NULL;
              pthr = idle_que_.take();
      }
  {
     assert(func != NULL);
     WorkThread* pthr = get_idle_thread();
          {
         apr_sleep(500000);
         pthr = get_idle_thread();
     }
     pthr->set_job(func, data);
 }
ThreadPool中的BlockQueue<WorkThread*> 也就是一个线程安全的队列,即对std::deque做了一个包装,在插入和取出元素时加了一个读写锁。
使用示例:
//任务执行函数,必须是ProcCallback类型
void count(void* param)
{
// do some your work, like: 
int* pi = static_cast<int*>(param);
int val = *pi + 1;
printf("val=%d\n", val);
pelete pi;
}
//程序中使用如下:
ThreadPool* ptp = new ThreadPool();
ptp->start_thread_pool(3);
//启动3 个线程
ptp->post_job(count, new int(1));
//提交任务
ptp->post_job(count, new int(2));
ptp->post_job(count, new int(3));
//程序线束时
ptp->stop_thread_pool();
其实count()函数就是我们的业务实现代码,有任务时,可以提交给线程池去执行。
结尾:
    其实实现一个线程池或其它什么池并不难,当然线程安全和效率还是要从多写代码的经验中获取。像这个线程池也就是基于预创多个建线程,保保存好它们的线程句柄,当有新任务时取一个线程执行即可,执行完后一定要归还到空闲线程队列中,当然我们可以在线程池中增加一个任务队列,因为当post_job()时,若当时没有空闲线程,有两种方案,一是等待有空闲线程,二是加入到任务队列,当WorkThread线程执行完一个任务后,从任务队列中取一个任务继续执行即可,不会阻塞在post_job()中。
  另外,我们可以封装一些线程安全的队列和map什么的,这样在程序中就不用担心创建一个多线程共享的队列时,还必须创建一个锁,挺麻烦的,比如上面的BlockQueue<Type>直接拿来用就行了。

05-11 13:49