我正在尝试使用boost::thread在C++上的线程上实现Actor计算模型。
但是程序在执行过程中会引发奇怪的异常。异常不稳定,有时程序会以正确的方式工作。

有我的代码:

actor.hpp

class Actor {

  public:
    typedef boost::function<int()> Job;

  private:
    std::queue<Job>             d_jobQueue;
    boost::mutex                d_jobQueueMutex;
    boost::condition_variable   d_hasJob;
    boost::atomic<bool>         d_keepWorkerRunning;
    boost::thread               d_worker;

    void workerThread();

  public:
    Actor();
    virtual ~Actor();

    void execJobAsync(const Job& job);

    int execJobSync(const Job& job);
};

actor.cpp
namespace {

int executeJobSync(std::string          *error,
                   boost::promise<int> *promise,
                   const Actor::Job     *job)
{
    int rc = (*job)();

    promise->set_value(rc);
    return 0;
}

}

void Actor::workerThread()
{
    while (d_keepWorkerRunning) try {
        Job job;
        {
            boost::unique_lock<boost::mutex> g(d_jobQueueMutex);

            while (d_jobQueue.empty()) {
                d_hasJob.wait(g);
            }

            job = d_jobQueue.front();
            d_jobQueue.pop();
        }

        job();
    }
    catch (...) {
        // Log error
    }
}

void Actor::execJobAsync(const Job& job)
{
    boost::mutex::scoped_lock g(d_jobQueueMutex);
    d_jobQueue.push(job);
    d_hasJob.notify_one();
}

int Actor::execJobSync(const Job& job)
{
    std::string error;
    boost::promise<int> promise;
    boost::unique_future<int> future = promise.get_future();

    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_jobQueue.push(boost::bind(executeJobSync, &error, &promise, &job));
        d_hasJob.notify_one();
    }

    int rc = future.get();

    if (rc) {
        ErrorUtil::setLastError(rc, error.c_str());
    }

    return rc;
}

Actor::Actor()
: d_keepWorkerRunning(true)
, d_worker(&Actor::workerThread, this)
{
}

Actor::~Actor()
{
    d_keepWorkerRunning = false;
    {
        boost::mutex::scoped_lock g(d_jobQueueMutex);
        d_hasJob.notify_one();
    }
    d_worker.join();
}

实际上引发的异常是int rc = future.get();行中的boost::thread_interrupted。但是形式助推文档,我无法解释这个异常。 Docs说



但是我的工作线程不能处于中断状态。

当我使用gdb并设置“catch throw”时,我看到回溯跟踪看起来像



我调查了Boost源,但无法理解为什么interrupt_checker决定工作线程被中断。

所以有人C++大师,请帮助我。我需要怎么做才能获得正确的代码?
我正在使用:

boost 1_53

Linux版本2.6.18-194.32.1.el5 Red Hat 4.1.2-48

gcc 4.7





最佳答案

我发现了几个错误:
Actor::workerThread函数对d_jobQueueMutex进行双重解锁。第一个解锁是手动d_jobQueueMutex.unlock();,第二个是boost::unique_lock<boost::mutex>的析构函数。

您应该防止解锁之一,例如unique_lockmutex之间的release关联:

g.release(); // <------------ PATCH
d_jobQueueMutex.unlock();

或添加其他代码块+默认构造的Job
workerThread可能永远不会离开以下循环:
while (d_jobQueue.empty()) {
    d_hasJob.wait(g);
}

想象以下情况:d_jobQueue为空,Actor::~Actor()被调用,它设置标志并通知工作线程:
d_keepWorkerRunning = false;
d_hasJob.notify_one();
workerThread在while循环中唤醒,看到队列为空并再次休眠。

发送特殊的最终作业以停止工作线程是一种常见的做法:
~Actor()
{
    execJobSync([this]()->int
    {
        d_keepWorkerRunning = false;
        return 0;
    });
    d_worker.join();
}

在这种情况下,d_keepWorkerRunning不需要是原子的。

LIVE DEMO on Coliru

编辑:



您在EventQueueImplActor中都有并发队列,但类型不同。可以将公共(public)部分提取到适用于任何类型的单独实体concurrent_queue<T>中。与捕获散布在不同类中的错误相比,在一处调试和测试队列要容易得多。

因此,您可以尝试使用此 concurrent_queue<T> (on Coliru)

关于c++ - 使用boost::thread的Actor计算模型,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19593427/

10-14 04:34