本文介绍了生产者-消费者场景和“优雅"的正确实现线程池终止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做我的第一个多线程项目,因此有一些我不确定的事情.关于我的设置的详细信息在 上一个问题,简而言之:我有一个由 Executors.newFixedThreadPool(N) 实现的线程池.一个线程被赋予一个动作,它对本地和远程资源进行一系列查询,并迭代地填充 ArrayBlockingQueue,而其余线程调用 take() 方法排队并处理队列中的对象.

I am working on my first multi-threaded project and thus have a couple of things that I am unsure of. Details on my setup was on a previous question, in short: I have a thread pool implemented by Executors.newFixedThreadPool(N). One thread is given an action which does a series of queries to local and remote resources and iteratively populates an ArrayBlockingQueue, while the rest of the threads invoke take() method on the queue and process the objects in the queue.

尽管小型和有监督的测试似乎运行良好,但我不确定如何处理特殊情况,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终的 InterruptedExceptions.我在这里做了一些关于 SO 的阅读,然后我读到了 GoetzKabutz.共识似乎是人们不应忽视这些例外.但是我不确定所提供的示例如何与我的情况相关,我没有在代码中的任何地方调用 thread.interrupt() ......说到这里,我不确定我是否应该这样做所以……

Even though small and supervised tests seem to run OK, I am unsure about how I handle special scenarios such as the beginning (the queue has no items yet), the end (the queue is emptied), and any eventual InterruptedExceptions. I have done some reading here on SO, which then led me to two really nice articles by Goetz and Kabutz. The consensus seems to be that one should not ignore these exceptions. However I am unsure how the examples supplied relates to my situation, I have not invoked thread.interrupt() anywhere in my code... Speaking of which, I'm getting unsure if I should have done so...

总而言之,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrruptedExceptions?希望这些问题有意义,否则我会尽力进一步描述.

To sum it up, given the code below, how do I best handle the special cases, such as termination criteria and the InterrruptedExceptions? Hope the questions make sense, otherwise I'll do my best to describe it further.

提前致谢,

我一直致力于实施一段时间,但我遇到了一个新的小问题,所以我想我会更新情况.我不幸遇到了 ConcurrentModificationException,这很可能是由于线程池的不完整关闭/终止.一旦我发现我可以使用 isTerminated(),我就尝试了,然后由于 wait() 不同步,我得到了 IllegalMonitorStateException.代码的当前状态如下:

edit: I have been working on the implementation for a while now, and I have come across a new hiccup so I figured I'd update the situation. I have had the misfortune of coming across ConcurrentModificationException which was most likely due to incomplete shutdown/termination of the thread pool. As soon as I figured out I could use isTerminated() I tried that, then I got a IllegalMonitorStateException due to an unsynchronized wait(). The current state of the code is below:

我遵循了@Jonathan 的回答中的一些建议,但是我认为他的建议并不像我需要/想要的那样.背景故事和我上面说的一样,相关代码如下:

I have followed some of the advices from @Jonathan's answer, however I don't think his proposal works quite like what I need/want. The background story is the same as I have mentioned above, and relevant bits of code are as follows:

类持有/管理池,并提交可运行的:

Class holding/managing the pool, and submission of runnables:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

元素通过位于单独类中的以下代码添加到队列中:


Elements are added to the queue by the following code on located in a separate class:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

... offer() 而不是 take() 背后的动机正如乔纳森提到的那样.不可预见的块很烦人,很难弄清楚,因为我的分析需要很长时间.所以我需要相对快速地知道失败是由于坏块,还是只是处理数字......

... motivation behind offer() instead of take() is as Jonathan mentioned. Unforeseen blocks are annoying and hard to figure out as my analysis take a long time as it is. So I need to know relatively quick if the fails due to a bad block, or if it's just crunching numbers...

最后;这是我的测试类中的代码,我在其中检查并发服务"(此处命名为 cs)与要分析的其余对象之间的交互:

and finally; here's the code in my test class where I check the interaction between the "concurrency service" (named cs here) and the rest of the objects to be analyzed:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

我意识到这是一个很长的问题,但我试图详细和具体.希望它不会太拖累,如果它是......我很抱歉......

I realize that this has been a VERY long question but I tried to be detailed and specific. Hopefully it won't be too much of a drag, and I apologize in case it is...

推荐答案

不要使用阻塞的 take,而是使用更像这样的东西:

Instead of using take, which blocks, use something more like this:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

这假设块包含在某种 while (!stopSignal) {...} 中.

This assumes that the block is enclosed in some sort of while (!stopSignal) {...}.

然后,在添加到队列的代码中,这样做:

Then, in the code that adds to the queue, do this:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

对于InterruptedExceptions,它们有助于通知线程立即测试停止信号,而不是等到下一次超时和测试.我建议再次测试您的停止信号,并可能记录异常.

As for InterruptedExceptions, they are good for signaling the thread to test the stop signal immediately, instead of waiting until the next timeout-and-test. I suggest just testing your stop signal again, and possibly logging the exception.

我在发出恐慌信号而不是正常关机时使用它们,但这种情况很少见.

I use them when signaling a panic, versus a normal shutdown, but it is rare that such a situation is necessary.

这篇关于生产者-消费者场景和“优雅"的正确实现线程池终止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 23:28