我们有一个很大的文本文件,其中每一行都需要密集的process
。设计将具有一个class
,该文件读取文件并通过thread
将每一行的处理委托(delegate)给thread pool
。一旦池中没有空闲线程可以进行处理,则应阻止文件读取器类读取下一行。所以我需要一个blocking thread pool
在当前的实现中,ThreadPoolExecutor.submit()
和ThreadPoolExecutor.execute()
方法在配置的线程数变得繁忙之后抛出RejectedExecutionException
异常,如我在下面的代码片段中所示。
public class BlockingTp {
public static void main(String[] args) {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService= new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
int Jobs = 10;
System.out.println("Starting application with " + Jobs + " jobs");
for (int i = 1; i <= Jobs; i++)
try {
executorService.submit(new WorkerThread(i));
System.out.println("job added " + (i));
} catch (RejectedExecutionException e) {
System.err.println("RejectedExecutionException");
}
}
}
class WorkerThread implements Runnable {
int job;
public WorkerThread(int job) {
this.job = job;
}
public void run() {
try {
Thread.sleep(1000);
} catch (Exception excep) {
}
}
}
上面程序的输出是
Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
有人可以给我一些启示吗,即我如何实现阻塞线程池。
最佳答案
您需要在执行程序服务上设置拒绝执行处理程序。
BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// block until there's room
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Producer thread interrupted", e);
}
}
});
因此,它不会调用TRE而不是
RejectedExecutionException
,而是将调用拒绝处理程序,该处理程序将依次尝试将作业放回到队列中。这阻止了调用者。