本文介绍了提高:: ASIO,线程池和线程监控的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我实现了使用的boost :: ASIO ,有些号码的boost ::线程对象的线程池调用提高:: ASIO :: io_service对象::运行()。但是,我一直在考虑一个要求是有一种方法来监视健康的所有线程。我的目的是使可通过线程池被传递一个简单的定点对象 - 如果它让事情的经过,那么我们可以假定线程仍在处理工作。

不过,鉴于我的实现,我不知道如何(如果)我可以监控池中可靠的所有线程。我只是委派线程函数的boost ::支持ASIO :: io_service对象::运行(),所以张贴定点对象插入 io_service对象实例将不能保证哪个线程实际上将获得定点和做的工作。

一种选择可能是只是定期插入定点,并希望它得到每个线程至少一次在一定的时间合理数量有所回升,但显然是不理想的。

看看下面的例子。由于该处理程序是$ C $光盘,在这种情况下,我们可以看到,每个线程将做相同的工作量,但在现实中我不会有处理程序实现的控制的方式,一些可以是,而其他长期运行将几乎立竿见影的。

 的#include<&iostream的GT;
#包括LT&;升压/ asio.hpp>
#包括LT&;矢量>
#包括LT&;升压/ thread.hpp>
#包括LT&;升压/ bind.hpp>无效的处理程序()
{
   性病::法院LT&;<提高:: this_thread :: get_id()<< \\ n;
   提高:: this_thread ::睡眠(升压::了posix_time ::毫秒(100));
}INT主(INT ARGC,字符** argv的)
{
   提高:: ASIO :: io_service对象SVC(3);   的std ::的unique_ptr<提高:: ASIO :: io_service对象::工作>工作(新的boost ::支持ASIO :: io_service对象::工作(SVC));   提高::线程间(升压::绑定(安培;提高:: ASIO :: io_service对象::来看,&安培; SVC));
   提高::线程两(升压::绑定(安培;提高:: ASIO :: io_service对象::来看,&安培; SVC));
   提高::螺纹三(的boost ::绑定(安培;提高:: ASIO :: io_service对象::来看,&安培; SVC));   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);
   svc.post(处理);   work.reset();   three.join();
   two.join();
   one.join();   返回0;
}


解决方案

这是我使用的解决方案依赖于我自己胎面池对象的实现的事实。我创建了一个包装类型,将更新的统计数据,并复制发布到线程池的用户定义的处理程序。只有这种包装类型是不断提交给底层 io_service对象。这种方法可以让我保持跟踪张贴/执行的处理程序,而不必打扰到用户code。

下面是一个精简和简化的例子:

 的#include<&iostream的GT;
#包括LT&;内存和GT;
#包括LT&;矢量>
#包括LT&;升压/ thread.hpp>
#包括LT&;升压/ asio.hpp>//支持调度是匿名的工作
//可执行文件没有返回,并采取
//没有参数
的typedef的std ::功能<无效(无效)> functor_type;//某种方式来存储每个线程的统计
的typedef的std ::地图<提高::螺纹:: ID,INT> thread_jobcount_map;//只有这种类型实际上是发布到
//将ASIO摄,这代表们
//在运营商用户仿函数()
结构handler_wrapper
{
   handler_wrapper(常量functor_type&放大器; user_functor,thread_jobcount_map&安培;统计)
      :user_functor_(user_functor)
      ,statistics_(统计)
   {
   }   void运算符()()
   {
      user_functor_();      //只是为了演示,假设一个长期任务
      提高:: this_thread ::睡眠(升压::了posix_time ::毫秒(100));      //增量执行的作业
      ++ statistics_ [的boost :: this_thread :: get_id()];
   }   functor_type user_functor_;
   thread_jobcount_map&安培;统计_;
};//匿名线程功能,只需运行前摄
无效thread_func(提高:: ASIO :: io_service对象和放大器;摄)
{
   proactor.run();
}一流的线程池
{
上市:
   线程池(为size_t THREAD_COUNT)
   {
      threads_.reserve(THREAD_COUNT);      work_.reset(新的boost ::支持ASIO :: io_service对象::工作(proactor_));      为(为size_t CURR = 0; CURR&下; THREAD_COUNT ++ CURR)
      {
         提高::线程日(thread_func,升压:: REF(proactor_));         //插入到这个地图的任何工作可以安排之前
         //就可以了,这意味着我们不必看它查找
         //因为我们不动态添加主题
         thread_jobcount_.insert(性病:: make_pair(th.get_id(),0));         threads_.emplace_back(性病::移动(次));
      }
   }   //用于用户获取工作进入的唯一方式
   //池是使用此功能,可确保
   //该handler_wrapper类型用于
   无效的时间表(常量functor_type&安培; user_functor)
   {
      handler_wrapper to_execute(user_functor,thread_jobcount_);
      proactor_.post(to_execute);
   }   无效连接()
   {
      //加入池中的所有线程:
      work_.reset();
      proactor_.stop();      的std :: for_each的(
         threads_.begin(),
         threads_.end(),
         [](升压::线程& T公司)
      {
         t.join();
      });
   }   //仅仅显示一个例子统计
   无效的log()
   {
      的std :: for_each的(
         thread_jobcount_.begin(),
         thread_jobcount_.end(),
         [](常量thread_jobcount_map :: VALUE_TYPE&安培;它)
      {
         性病::法院LT&;< 主题:<< it.first<< 执行<< it.second<< 工作的\\ n;
      });
   }私人的:
   的std ::矢量<提高::线程> threads_;
   的std ::的unique_ptr<提高:: ASIO :: io_service对象::工作>工作_;
   提高:: ASIO :: io_service对象proactor_;
   thread_jobcount_map thread_jobcount_;
};附加结构
{
   ADD(INT LHS,RHS INT,INT *结果)
      :lhs_(左)
      ,rhs_(右)
      ,result_(结果)
   {
   }   void运算符()()
   {
      * result_ = lhs_ + rhs_;
   }   INT lhs_,rhs_;
   INT * result_;
};INT主(INT ARGC,字符** argv的)
{
   //一些状态的对象是
   //用户操作的函子
   INT X = 0,Y = 0,Z = 0;   //的三个线程池
   线程池池(3);   //安排一些处理程序做一些工作
   pool.schedule(添加(5,4,&安培; X));
   pool.schedule(添加(2,2,&安培; Y));
   pool.schedule(添加(7,8,&安培; Z));   //给所有的处理程序执行时间
   提高:: this_thread ::睡眠(升压::了posix_time ::毫秒(1000));   性病::法院
      << ×=&所述;&下;点¯x所述&;&下; \\ n
      << Y =<< Y'LT;< \\ n
      << Z =<< z,其中;< \\ n;   pool.join();   pool.log();
}

输出:

  X = 9
Y = 4
Z = 15
主题:0000000000B25430执行1职位
主题:0000000000B274F0执行1职位
主题:0000000000B27990执行1职位

I've implemented a thread pool using boost::asio, and some number boost::thread objects calling boost::asio::io_service::run(). However, a requirement that I've been given is to have a way to monitor all threads for "health". My intent is to make a simple sentinel object that can be passed through the thread pool -- if it makes it through, then we can assume that the thread is still processing work.

However, given my implementation, I'm not sure how (if) I can monitor all the threads in the pool reliably. I've simply delegated the thread function to boost::asio::io_service::run(), so posting a sentinel object into the io_service instance won't guarantee which thread will actually get that sentinel and do the work.

One option may be to just periodically insert the sentinel, and hope that it gets picked up by each thread at least once in some reasonable amount of time, but that obviously isn't ideal.

Take the following example. Due to the way that the handler is coded, in this instance we can see that each thread will do the same amount of work, but in reality I will not have control of the handler implementation, some can be long running while others will be almost immediate.

#include <iostream>
#include <boost/asio.hpp>
#include <vector>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

void handler()
{
   std::cout << boost::this_thread::get_id() << "\n";
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

int main(int argc, char **argv)
{
   boost::asio::io_service svc(3);

   std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc));

   boost::thread one(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread two(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread three(boost::bind(&boost::asio::io_service::run, &svc));

   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);

   work.reset();

   three.join();
   two.join();
   one.join();

   return 0;
}
解决方案

The solution that I used relies on the fact that I own the implementation of the tread pool objects. I created a wrapper type that will update statistics, and copy the user defined handlers that are posted to the thread pool. Only this wrapper type is ever posted to the underlying io_service. This method allows me to keep track of the handlers that are posted/executed, without having to be intrusive into the user code.

Here's a stripped down and simplified example:

#include <iostream>
#include <memory>
#include <vector>
#include <boost/thread.hpp>
#include <boost/asio.hpp>

// Supports scheduling anonymous jobs that are
// executable as returning nothing and taking
// no arguments
typedef std::function<void(void)> functor_type;

// some way to store per-thread statistics
typedef std::map<boost::thread::id, int> thread_jobcount_map;

// only this type is actually posted to
// the asio proactor, this delegates to
// the user functor in operator()
struct handler_wrapper
{
   handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics)
      : user_functor_(user_functor)
      , statistics_(statistics)
   {
   }

   void operator()()
   {
      user_functor_();

      // just for illustration purposes, assume a long running job
      boost::this_thread::sleep(boost::posix_time::milliseconds(100));

      // increment executed jobs
      ++statistics_[boost::this_thread::get_id()];
   }

   functor_type         user_functor_;
   thread_jobcount_map& statistics_;
};

// anonymous thread function, just runs the proactor
void thread_func(boost::asio::io_service& proactor)
{
   proactor.run();
}

class ThreadPool
{
public:
   ThreadPool(size_t thread_count)
   {
      threads_.reserve(thread_count);

      work_.reset(new boost::asio::io_service::work(proactor_));

      for(size_t curr = 0; curr < thread_count; ++curr)
      {
         boost::thread th(thread_func, boost::ref(proactor_));

         // inserting into this map before any work can be scheduled
         // on it, means that we don't have to look it for lookups
         // since we don't dynamically add threads
         thread_jobcount_.insert(std::make_pair(th.get_id(), 0));

         threads_.emplace_back(std::move(th));
      }
   }

   // the only way for a user to get work into
   // the pool is to use this function, which ensures
   // that the handler_wrapper type is used
   void schedule(const functor_type& user_functor)
   {
      handler_wrapper to_execute(user_functor, thread_jobcount_);
      proactor_.post(to_execute);
   }

   void join()
   {
      // join all threads in pool:
      work_.reset();
      proactor_.stop();

      std::for_each(
         threads_.begin(),
         threads_.end(),
         [] (boost::thread& t)
      {
         t.join();
      });
   }

   // just an example showing statistics
   void log()
   {
      std::for_each(
         thread_jobcount_.begin(),
         thread_jobcount_.end(),
         [] (const thread_jobcount_map::value_type& it)
      {
         std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n";
      });
   }

private:
   std::vector<boost::thread> threads_;
   std::unique_ptr<boost::asio::io_service::work> work_;
   boost::asio::io_service    proactor_;
   thread_jobcount_map        thread_jobcount_;
};

struct add
{
   add(int lhs, int rhs, int* result)
      : lhs_(lhs)
      , rhs_(rhs)
      , result_(result)
   {
   }

   void operator()()
   {
      *result_ = lhs_ + rhs_;
   }

   int lhs_,rhs_;
   int* result_;
};

int main(int argc, char **argv)
{
   // some "state objects" that are
   // manipulated by the user functors
   int x = 0, y = 0, z = 0;

   // pool of three threads
   ThreadPool pool(3);

   // schedule some handlers to do some work
   pool.schedule(add(5, 4, &x));
   pool.schedule(add(2, 2, &y));
   pool.schedule(add(7, 8, &z));

   // give all the handlers time to execute
   boost::this_thread::sleep(boost::posix_time::milliseconds(1000));

   std::cout
      << "x = " << x << "\n"
      << "y = " << y << "\n"
      << "z = " << z << "\n";

   pool.join();

   pool.log();
}

Output:

x = 9
y = 4
z = 15
Thread: 0000000000B25430 executed 1 jobs
Thread: 0000000000B274F0 executed 1 jobs
Thread: 0000000000B27990 executed 1 jobs

这篇关于提高:: ASIO,线程池和线程监控的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-01 01:02