我正在尝试使用boost::thread在C++上的线程上实现Actor计算模型。
但是程序在执行过程中会引发奇怪的异常。异常不稳定,有时程序会以正确的方式工作。
有我的代码:
actor.hpp
class Actor {
public:
typedef boost::function<int()> Job;
private:
std::queue<Job> d_jobQueue;
boost::mutex d_jobQueueMutex;
boost::condition_variable d_hasJob;
boost::atomic<bool> d_keepWorkerRunning;
boost::thread d_worker;
void workerThread();
public:
Actor();
virtual ~Actor();
void execJobAsync(const Job& job);
int execJobSync(const Job& job);
};
actor.cpp
namespace {
int executeJobSync(std::string *error,
boost::promise<int> *promise,
const Actor::Job *job)
{
int rc = (*job)();
promise->set_value(rc);
return 0;
}
}
void Actor::workerThread()
{
while (d_keepWorkerRunning) try {
Job job;
{
boost::unique_lock<boost::mutex> g(d_jobQueueMutex);
while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
job = d_jobQueue.front();
d_jobQueue.pop();
}
job();
}
catch (...) {
// Log error
}
}
void Actor::execJobAsync(const Job& job)
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(job);
d_hasJob.notify_one();
}
int Actor::execJobSync(const Job& job)
{
std::string error;
boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
d_hasJob.notify_one();
}
int rc = future.get();
if (rc) {
ErrorUtil::setLastError(rc, error.c_str());
}
return rc;
}
Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}
Actor::~Actor()
{
d_keepWorkerRunning = false;
{
boost::mutex::scoped_lock g(d_jobQueueMutex);
d_hasJob.notify_one();
}
d_worker.join();
}
实际上引发的异常是
int rc = future.get();
行中的boost::thread_interrupted。但是形式助推文档,我无法解释这个异常。 Docs说但是我的工作线程不能处于中断状态。
当我使用gdb并设置“catch throw”时,我看到回溯跟踪看起来像
我调查了Boost源,但无法理解为什么interrupt_checker决定工作线程被中断。
所以有人C++大师,请帮助我。我需要怎么做才能获得正确的代码?
我正在使用:
boost 1_53
Linux版本2.6.18-194.32.1.el5 Red Hat 4.1.2-48
gcc 4.7
。
最佳答案
我发现了几个错误:Actor::workerThread
函数对d_jobQueueMutex
进行双重解锁。第一个解锁是手动d_jobQueueMutex.unlock();
,第二个是boost::unique_lock<boost::mutex>
的析构函数。
您应该防止解锁之一,例如unique_lock
和mutex
之间的release关联:
g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();
或添加其他代码块+默认构造的
Job
。workerThread
可能永远不会离开以下循环:while (d_jobQueue.empty()) {
d_hasJob.wait(g);
}
想象以下情况:
d_jobQueue
为空,Actor::~Actor()
被调用,它设置标志并通知工作线程:d_keepWorkerRunning = false;
d_hasJob.notify_one();
workerThread
在while循环中唤醒,看到队列为空并再次休眠。发送特殊的最终作业以停止工作线程是一种常见的做法:
~Actor()
{
execJobSync([this]()->int
{
d_keepWorkerRunning = false;
return 0;
});
d_worker.join();
}
在这种情况下,
d_keepWorkerRunning
不需要是原子的。LIVE DEMO on Coliru
编辑:
您在
EventQueueImpl
和Actor
中都有并发队列,但类型不同。可以将公共(public)部分提取到适用于任何类型的单独实体concurrent_queue<T>
中。与捕获散布在不同类中的错误相比,在一处调试和测试队列要容易得多。因此,您可以尝试使用此
concurrent_queue<T>
(on Coliru)关于c++ - 使用boost::thread的Actor计算模型,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19593427/