本文介绍了适当实施生产者-消费者方案和“优雅"的生产过程.终止线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做我的第一个多线程项目,因此有些不确定的事情.我的设置的详细信息位于上一个问题,简而言之:我有一个由Executors.newFixedThreadPool(N)实现的线程池.给一个线程一个动作,该动作对本地和远程资源进行一系列查询,并迭代地填充ArrayBlockingQueue,其余线程则在队列上调用take()方法并处理队列中的对象.

I am working on my first multi-threaded project and thus have a couple of things that I am unsure of. Details on my setup was on a previous question, in short: I have a thread pool implemented by Executors.newFixedThreadPool(N). One thread is given an action which does a series of queries to local and remote resources and iteratively populates an ArrayBlockingQueue, while the rest of the threads invoke take() method on the queue and process the objects in the queue.

尽管在监督下进行的小型测试似乎运行正常,但我不确定如何处理特殊情况,例如开始(队列中还没有任何物品),结束(队列被清空)以及任何最终的.我在这里做了关于SO的一些阅读,然后通过 Goetz Kabutz .共识似乎是不应忽视这些例外.但是,我不确定所提供的示例与我的情况有何关系,我还没有在代码中的任何地方调用thread.interrupt() ...说到这一点,我不确定是否应该这样做...

Even though small and supervised tests seem to run OK, I am unsure about how I handle special scenarios such as the beginning (the queue has no items yet), the end (the queue is emptied), and any eventual InterruptedExceptions. I have done some reading here on SO, which then led me to two really nice articles by Goetz and Kabutz. The consensus seems to be that one should not ignore these exceptions. However I am unsure how the examples supplied relates to my situation, I have not invoked thread.interrupt() anywhere in my code... Speaking of which, I'm getting unsure if I should have done so...

总而言之,给出下面的代码,我如何最好地处理特殊情况,例如终止条件和InterrruptedExceptions?希望这些问题有意义,否则我会尽力进一步描述它.

To sum it up, given the code below, how do I best handle the special cases, such as termination criteria and the InterrruptedExceptions? Hope the questions make sense, otherwise I'll do my best to describe it further.

预先感谢

编辑:我已经进行了一段时间的实施,并且遇到了新的麻烦,所以我想我会更新情况.我遇到了遇到ConcurrentModificationException的不幸,这很可能是由于线程池的不完全关闭/终止而造成的.我一想到可以使用isTerminated()就可以尝试,然后由于wait()不同步而得到了IllegalMonitorStateException.代码的当前状态如下:

edit: I have been working on the implementation for a while now, and I have come across a new hiccup so I figured I'd update the situation. I have had the misfortune of coming across ConcurrentModificationException which was most likely due to incomplete shutdown/termination of the thread pool. As soon as I figured out I could use isTerminated() I tried that, then I got a IllegalMonitorStateException due to an unsynchronized wait(). The current state of the code is below:

我听了@乔纳森(Jonathan)的回答,但我认为他的建议并不像我需要/想要的那样有效.背景故事与我上面提到的相同,相关的代码如下:

I have followed some of the advices from @Jonathan's answer, however I don't think his proposal works quite like what I need/want. The background story is the same as I have mentioned above, and relevant bits of code are as follows:

类持有/管理池以及可运行对象的提交:

Class holding/managing the pool, and submission of runnables:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }


通过位于单独类中的以下代码将元素添加到队列中:


Elements are added to the queue by the following code on located in a separate class:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

正如乔纳森(Jonathan)所述,

... offer()而不是take()背后的动机.由于我的分析花费了很长时间,所以无法预见的障碍很烦人并且很难弄清楚.因此,我需要比较迅速地知道是由于坏块导致失败还是仅仅是处理数字...

... motivation behind offer() instead of take() is as Jonathan mentioned. Unforeseen blocks are annoying and hard to figure out as my analysis take a long time as it is. So I need to know relatively quick if the fails due to a bad block, or if it's just crunching numbers...

最后;这是我的测试类中的代码,我在其中检查并发服务"(在此处称为cs)与其余要分析的对象之间的交互:

and finally; here's the code in my test class where I check the interaction between the "concurrency service" (named cs here) and the rest of the objects to be analyzed:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

我意识到这是一个很长的问题,但是我试图做到详尽而具体.希望它不会对您造成太大的拖累,如果发生...,我深表歉意.

I realize that this has been a VERY long question but I tried to be detailed and specific. Hopefully it won't be too much of a drag, and I apologize in case it is...

推荐答案

不是使用take来阻止,而是使用类似以下的内容:

Instead of using take, which blocks, use something more like this:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

这假定该块包含在某种while (!stopSignal) {...}中.

This assumes that the block is enclosed in some sort of while (!stopSignal) {...}.

然后,在添加到队列的代码中,执行以下操作:

Then, in the code that adds to the queue, do this:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

对于InterruptedException来说,它们非常适合发信号通知线程立即测试停止信号,而不必等到下一次超时测试.我建议再次测试停止信号,并可能记录异常.

As for InterruptedExceptions, they are good for signaling the thread to test the stop signal immediately, instead of waiting until the next timeout-and-test. I suggest just testing your stop signal again, and possibly logging the exception.

我在发出紧急信号时会使用它们,而不是正常关机,但是很少有这种情况是必需的.

I use them when signaling a panic, versus a normal shutdown, but it is rare that such a situation is necessary.

这篇关于适当实施生产者-消费者方案和“优雅"的生产过程.终止线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 23:28