本文介绍了asio :: io_service和thread_group生命周期问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

查看,我们可以执行以下操作:

  boost :: asio :: io_service ioService; 
boost :: thread_group threadpool;
{
boost :: asio :: io_service ::工作工作(ioService);
threadpool.create_thread(boost :: bind(& boost :: asio :: io_service :: run,ioService));
threadpool.create_thread(boost :: bind(& boost :: asio :: io_service :: run,& ioService));
ioService.post(boost :: bind(...));
ioService.post(boost :: bind(...));
ioService.post(boost :: bind(...));
}
threadpool.join_all();然而,在我的情况下,我想做一些像:







$ b b $ b

  while(condition)
{
ioService.post(boost :: bind(...));
ioService.post(boost :: bind(...));
ioService.post(boost :: bind(...));
threadpool.join_all();

//与结果相同
}

boost :: asio :: io_service ::工作(ioService)行是不合适的,就我所见,我不能重新创建它,而不需要创建



在我的代码中,线程创建开销似乎可以忽略(实际上比以前的基于互斥体的代码更好的性能),但是有一个更干净

$

  while(条件)
{
// ... stuff
threadpool.join_all();

// ...
}

做任何意义,因为你只能加入线程一次。一旦加入,他们就走了。你不想一直开始新的线程(使用线程池+任务队列¹)。



因为你不想实际停止线程,你可能不想破坏工作。如果你坚持,一个 shared_ptr< work> 可选< work> my_work.reset() it)



¹更新建议:




  • 简单 thread_pool 与任务队列:(在)

  • 基于 io_service 本身的队列(使用 work



UPDATE



SOLUTION#2的简单扩展可以等待所有任务完成,加入工人/破坏池:

  void drain(){
unique_lock< mutex> lk(mx);
namespace phx = boost :: phoenix;
cv.wait(lk,phx :: empty(phx :: ref(_queue)));
}

注意,为了可靠的操作,队列:

  cv.notify_all(); //为了信号泄漏



CAVEATS




  1. 这是一个接口邀请竞争条件(队列可以接受来自许多线程的作业,所以一旦 drain()可能已经发布了一个新任务)


  2. 这表示队列为空时,而不是任务完成时。队列不能知道这个,如果你需要这个,使用屏障/信号从任务内的条件( the_work 在这个例子中)。




DEMO



  #include< boost / thread.hpp> 
#include< boost / phoenix.hpp>
#include< boost / optional.hpp>

使用命名空间boost;
using namespace boost :: phoenix :: arg_names;

class thread_pool
{
private:
mutex mx;
condition_variable cv;

typedef function< void()> job_t;
std :: deque< job_t> _队列;

thread_group pool;

boost :: atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while(auto job = q.dequeue())
(* job)();
}

public:
thread_pool():shutdown(false){
for(unsigned i = 0; i< boost :: thread :: hardware_concurrency ); ++ i)
pool.create_thread(bind(worker_thread,ref(* this)));
}

void enqueue(job_t job)
{
lock_guard< mutex> lk(mx);
_queue.push_back(std :: move(job));

cv.notify_one();
}

void drain(){
unique_lock< mutex> lk(mx);
namespace phx = boost :: phoenix;
cv.wait(lk,phx :: empty(phx :: ref(_queue)));
}

可选< job_t> dequeue()
{
unique_lock< mutex> lk(mx);
namespace phx = boost :: phoenix;

cv.wait(lk,phx :: ref(shutdown)||!phx :: empty(phx :: ref(_queue)));

if(_queue.empty())
return none;

auto job = std :: move(_queue.front());
_queue.pop_front();

cv.notify_all(); //为了信号泄漏

return std :: move(job);
} bb
$ b〜thread_pool()
{
shutdown = true;
{
lock_guard< mutex> lk(mx);
cv.notify_all();
}

pool.join_all();
}
};

void the_work(int id)
{
std :: cout< worker< id<< entered\\\
;

//不再同步池大小确定最大并发
std :: cout<< worker< id<< start work\\\
;
this_thread :: sleep_for(chrono :: milliseconds(2));
std :: cout<< worker< id<< done\\\
;
}

int main()
{
thread_pool pool; //每个核心使用一个线程

for(auto i = 0ull; i for(int i = 0; i pool.enqueue(bind(the_work,i));

pool.drain(); //使队列为空,留下线程
std :: cout<< Queue empty\\\
;
}

//破坏池连接工作线程
}


Looking at answers like this one, we can do stuff like:

boost::asio::io_service ioService;
boost::thread_group threadpool;
{
    boost::asio::io_service::work work(ioService);
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService));
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
}
threadpool.join_all();

However, in my case I want to do something like:

while (condition)
{
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    threadpool.join_all();

    // DO SOMETHING WITH RESULTS
}

However, the boost::asio::io_service::work work(ioService) line is out of place, and as far as I can see, I cannot recreate it without needing to create every thread in the pool again.

In my code, the thread creation overhead seems negligible (and actually better performance than previous mutex-based code), but is there a cleaner way to do this?

解决方案
while (condition)
{
    //... stuff
    threadpool.join_all();

    //... 
}

Doesn't make any sense, because you can only join threads once. Once joined, they are gone. You don't want to be starting new threads all the time (use a thread pool + task queue¹).

Since you don't want to actually stop the threads, you probably don't want to destruct the work. If you insist, a shared_ptr<work> or optional<work> works nicely (just my_work.reset() it)

¹ Update Suggestion:

UPDATE

A simple extension to "SOLUTION #2" would make it possible to wait for all tasks to have been completed, without joining the workers/destroying the pool:

  void drain() {
      unique_lock<mutex> lk(mx);
      namespace phx = boost::phoenix;
      cv.wait(lk, phx::empty(phx::ref(_queue)));
  }

Note that for reliable operation, one needs to signal the condition variable on de-queue as well:

      cv.notify_all(); // in order to signal drain

CAVEATS

  1. It's an interface inviting race conditions (the queue could accept jobs from many threads, so once drain() returns, another thread could have posted a new task already)

  2. This signals when the queue is empty, not when the task is completed. The queue cannot know about this, if you need this, use a barrier/signal a condition from within the task (the_work in this example). The mechanism for queuing/scheduling is not relevant there.

DEMO

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      void drain() {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::empty(phx::ref(_queue)));
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          cv.notify_all(); // in order to signal drain

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::milliseconds(2));
    std::cout << "worker " << id << " done\n";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (auto i = 0ull; i < 20; ++i) {
        for (int i = 0; i < 10; ++i)
            pool.enqueue(bind(the_work, i));

        pool.drain(); // make the queue empty, leave the threads
        std::cout << "Queue empty\n";
    }

    // destructing pool joins the worker threads
}

这篇关于asio :: io_service和thread_group生命周期问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-27 20:47