本文介绍了如何使用相同的线程池批量批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 我发现一个基于boost的线程池的良好实现是对此和 this 。它很容易理解和测试。它看起来像这样:I found a good implementation of boost based thread pool which is an improvement over this and this . it is very easy to understand and test. It looks like this:#include <boost/thread/thread.hpp>#include <boost/asio.hpp>// the actual thread poolstruct ThreadPool { ThreadPool(std::size_t); template<class F> void enqueue(F f); ~ThreadPool(); // the io_service we are wrapping boost::asio::io_service io_service; // dont let io_service stop boost::shared_ptr<boost::asio::io_service::work> work; //the threads boost::thread_group threads;};// the constructor just launches some amount of workersThreadPool::ThreadPool(size_t nThreads) :io_service() ,work(new boost::asio::io_service::work(io_service)){ for ( std::size_t i = 0; i < nThreads; ++i ) { threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); }}// add new work item to the pooltemplate<class F>void ThreadPool::enqueue(F f) { io_service.post(f);}// the destructor joins all threadsThreadPool::~ThreadPool() {work.reset();io_service.run();}//tester: void f(int i){ std::cout << "hello " << i << std::endl; boost::this_thread::sleep(boost::posix_time::milliseconds(300)); std::cout << "world " << i << std::endl;}//it can be tested via:int main() { // create a thread pool of 4 worker threads ThreadPool pool(4); // queue a bunch of "work items" for( int i = 0; i < 8; ++i ) { std::cout << "task " << i << " created" << std::endl; pool.enqueue(boost::bind(&f,i)); }} g ++ ThreadPool-4。 cpp -lboost_system -lboost_thread 现在的问题: 我需要知道如何修改实现能够使用这个线程池批量通过批处理 - 只有当我的第一套工作完全由线程池完成时,我需要提供第二个集合等等。我试图在批处理作业之间使用 .run()和 .reset()(在析构函数中找到)但没有运气:Now the question:I need to know how I can modify the implementation to be able to use this thread pool batch by batch- only when the first set of my work is fully completed by the thread pool, I need to supply the second set and so on. I tried to play with .run() and .reset() (found in the destructor) between the batch jobs but no luck://adding methods to the tread pool ://reset the asio work and threadvoid ThreadPool::reset(size_t nThreads){work.reset(new boost::asio::io_service::work(io_service)); for ( std::size_t i = 0; i < nThreads; ++i ) { threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service)); } std::cout << "group size : " << threads.size() << std::endl;}//join, and even , interruptvoid ThreadPool::joinAll(){ threads.join_all(); threads.interrupt_all();}//testerint main() { // create a thread pool of 4 worker threads ThreadPool pool(4); // queue a bunch of "work items" for( int i = 0; i < 20; ++i ) { std::cout << "task " << i << " created" << std::endl; pool.enqueue(boost::bind(&f,i)); } //here i play with the asio work , io_service and and the thread group pool.work.reset(); pool.io_service.run(); std::cout << "after run" << std::endl; pool.joinAll(); std::cout << "after join all" << std::endl; pool.reset(4); std::cout << "new thread group size: " << pool.threads.size() << std::endl;///btw: new threa group size is 8. I expected 4! // second batch... never completes for( int i = 20; i < 30; ++i ) { pool.enqueue(boost::bind(&f,i)); }}第二个批次未完成。我将欣赏,如果你帮助我解决这个。 谢谢The second batch doesn't complete. I will appreciate if you help me fix this.thank you UPDATE-解决方案: 解决方案,通过 Nik ,我开发了一个使用条件变量的解决方案。只需将以下代码添加到原始类中:based on a solution by Nik, I developed a solution using condition variable. Just add the following code to the original class:// add new work item to the pooltemplate<class F>void ThreadPool::enqueue(F f) { { boost::unique_lock<boost::mutex> lock(mutex_); nTasks ++; } //forwarding the job to wrapper() void (ThreadPool::*ff)(boost::tuple<F>) = &ThreadPool::wrapper<F>; io_service.post(boost::bind(ff, this, boost::make_tuple(f))); //using a tuple seems to be the only practical way. it is mentioned in boost examples.}//run+notfiytemplate<class F>void ThreadPool::wrapper(boost::tuple<F> f) { boost::get<0>(f)();//this is the task (function and its argument) that has to be executed by a thread { boost::unique_lock<boost::mutex> lock(mutex_); nTasks --; cond.notify_one(); }}void ThreadPool::wait(){ boost::unique_lock<boost::mutex> lock(mutex_); while(nTasks){ cond.wait(lock); }}现在您可以调用 wait ()方法。 一个问题:即使在最后一个批处理之后,我必须调用 pool.wait(),因为线程池的范围将在此之后结束,线程池的析构函数将被调用。在销毁期间,一些作业已完成,它将是调用 .notify()的时间。由于在销毁期间 Threadpool :: mutex 无效,在锁定期间发生异常。您的建议将不胜感激。 Now you may call wait() method between batches of work.one problem however:Even after the last batch, I have to call pool.wait() because the thread pool's scope will end after that and thread pool's destructor will be invoked. During destruction, some of the jobs are done and it will be the time to call the .notify(). As the Threadpool::mutex during destruction is invalidated, exceptions occur during locking. your suggestion will be appreciated. 推荐答案条件变量可用于获得所需的结果。 A condition variable could be used to achieve desired result. 实现一个负责调用排队任务并等待条件变量的函数。 当分配给池的所有任务完成时,通知条件变量。Implement a function responsible for calling enqueue the tasks and wait on a condition variable.Condition variable is notified when all tasks assigned to the pool are complete.每个线程都检查作业是否完成。一旦所有的工作完成,通知条件变量。Every thread checks if the jobs are complete or not. Once all the jobs are complete condition variable is notified.//An example of what you could try, this just an hint for what could be explored. void jobScheduler() { int jobs = numberOfJobs; //this could vary and can be made shared memory // queue a bunch of "work items" for( int i = 0; i < jobs; ++i ) { std::cout << "task " << i << " created" << std::endl; pool.enqueue(boost::bind(&f,i)); } //wait on a condition variable boost::mutex::scoped_lock lock(the_mutex); conditionVariable.wait(lock); //Have this varibale notified from any thread which realizes that all jobs are complete. } 解决方案2 Solution 2我有一个新的工作解决方案,一些假设函数的语法被回调,但可以根据需要更改。I have a new working solution, with some assumption about syntax of functions being called back, but that could be changed as per requirement.继续上面的行,我使用条件变量来管理我的任务,但有区别。 Continuing on the lines of above I use condition variable for managing my tasks but with a difference. 创建作业队列。 一个经理在队列中等待新的JOBS。 收到作业后,会向等待经理发送通知。 / li> Worker维护一个Manager的句柄。当所有分配的任务完成时,通知管理者。 管理员获得结束通话时,停止等待队列中的新JOBS并退出。 Create a queue of jobs. A Manager which waits for new JOBS in the queue.Once a job is received a notification is sent to waiting manager about the same.Worker maintains a handle to Manager. When all the tasks assigned are complete Manger is informed. Manager on getting a call for end, stops waiting for new JOBS in queue and exits. #include <iostream>#include <queue>#include <boost/thread/thread.hpp>#include <boost/asio.hpp>#include <boost/tuple/tuple.hpp> #include <boost/tuple/tuple_io.hpp> #include <boost/function.hpp> ///JOB Queue hold all jobs required to be executedtemplate<typename Job>class JobQueue{ private: std::queue<Job> _queue; mutable boost::mutex _mutex; boost::condition_variable _conditionVariable; public: void push(Job const& job) { boost::mutex::scoped_lock lock(_mutex); _queue.push(job); lock.unlock(); _conditionVariable.notify_one(); } bool empty() const { boost::mutex::scoped_lock lock(_mutex); return _queue.empty(); } bool tryPop(Job& poppedValue) { boost::mutex::scoped_lock lock(_mutex); if(_queue.empty()) { return false; } poppedValue = _queue.front(); _queue.pop(); return true; } void waitAndPop(Job& poppedValue) { boost::mutex::scoped_lock lock(_mutex); while(_queue.empty()) { _conditionVariable.wait(lock); } poppedValue = _queue.front(); _queue.pop(); }};///Thread pool for posting jobs to io serviceclass ThreadPool{ public : ThreadPool( int noOfThreads = 1) ; ~ThreadPool() ; template< class func > void post( func f ) ; boost::asio::io_service &getIoService() ; private : boost::asio::io_service _ioService; boost::asio::io_service::work _work ; boost::thread_group _threads;}; inline ThreadPool::ThreadPool( int noOfThreads ): _work( _ioService ){ for(int i = 0; i < noOfThreads ; ++i) // 4 _threads.create_thread(boost::bind(&boost::asio::io_service::run, &_ioService));}inline ThreadPool::~ThreadPool(){ _ioService.stop() ; _threads.join_all() ;}inline boost::asio::io_service &ThreadPool::getIoService(){ return _ioService ;} template< class func >void ThreadPool::post( func f ){ _ioService.post( f ) ;}template<typename T>class Manager;///Worker doing some work.template<typename T>class Worker{ T _data; int _taskList; boost::mutex _mutex; Manager<T>* _hndl; public: Worker(T data, int task, Manager<T>* hndle): _data(data), _taskList(task), _hndl(hndle) { } bool job() { boost::mutex::scoped_lock lock(_mutex); std::cout<<"...Men at work..."<<++_data<<std::endl; --_taskList; if(taskDone()) _hndl->end(); } bool taskDone() { std::cout<<"Tasks "<<_taskList<<std::endl<<std::endl; if(_taskList == 0) { std::cout<<"Tasks done "<<std::endl; return true; } else false; }};///Job handler waits for new jobs and///execute them as when a new job is received using Thread Pool.//Once all jobs are done hndler exits.template<typename T>class Manager{ public: typedef boost::function< bool (Worker<T>*)> Func; Manager(int threadCount): _threadCount(threadCount), _isWorkCompleted(false) { _pool = new ThreadPool(_threadCount); boost::thread jobRunner(&Manager::execute, this); } void add(Func f, Worker<T>* instance) { Job job(instance, f); _jobQueue.push(job); } void end() { boost::mutex::scoped_lock lock(_mutex); _isWorkCompleted = true; //send a dummy job add( NULL, NULL); } void workComplete() { std::cout<<"Job well done."<<std::endl; } bool isWorkDone() { boost::mutex::scoped_lock lock(_mutex); if(_isWorkCompleted) return true; return false; } void execute() { Job job; while(!isWorkDone()) { _jobQueue.waitAndPop(job); Func f = boost::get<1>(job); Worker<T>* ptr = boost::get<0>(job); if(f) { _pool->post(boost::bind(f, ptr)); } else break; } std::cout<<"Complete"<<std::endl; } private: ThreadPool *_pool; int _threadCount; typedef boost::tuple<Worker<T>*, Func > Job; JobQueue<Job> _jobQueue; bool _isWorkCompleted; boost::mutex _mutex;};typedef boost::function< bool (Worker<int>*)> IntFunc;typedef boost::function< bool (Worker<char>*)> CharFunc;int main(){ boost::asio::io_service ioService; Manager<int> jobHndl(2); Worker<int> wrk1(0,4, &jobHndl); IntFunc f= &Worker<int>::job; jobHndl.add(f, &wrk1); jobHndl.add(f, &wrk1); jobHndl.add(f, &wrk1); jobHndl.add(f, &wrk1); Manager<char> jobHndl2(2); Worker<char> wrk2(0,'a', &jobHndl2); CharFunc f2= &Worker<char>::job; jobHndl2.add(f2, &wrk2); jobHndl2.add(f2, &wrk2); jobHndl2.add(f2, &wrk2); jobHndl2.add(f2, &wrk2); ioService.run(); while(1){} return 0;} 这篇关于如何使用相同的线程池批量批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-22 15:05