本文介绍了如何在关闭后重新使用线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个.csv文件包含超过7000万行,其中每行生成 Runnable ,然后由线程池执行。这个Runnable会在Mysql中插入一条记录。



更重要的是,我想记录csv文件中 RandomAccessFile 的位置。位置写入文件。我想在线程池中的所有线程都完成时写入此记录。调用ThreadPoolExecutor.shutdown()。但是当更多的线路来了,我需要一个线程池。



代码如下:

  public static boolean processPage()throws Exception {

long pos = getPosition();
long start = System.currentTimeMillis();
raf.seek(pos);
if(pos == 0)
raf.readLine();
for(int i = 0; i String lineStr = raf.readLine();
if(lineStr == null)
return false;
String [] line = lineStr.split(,);
final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
//System.out.println(\"userId:+ log.getUserId()%512);

pool.execute(new Runnable(){
public void run(){
try {
experienceService.insertExperienceLog(log);
} catch (BaseException e){
e.printStackTrace();
}
}
});

long end = System.currentTimeMillis();
}

BufferedWriter resultWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(new File(
RESULT_FILENAME),true)));
resultWriter.write(\\\
);
resultWriter.write(String.valueOf(raf.getFilePointer()));
resultWriter.close();
long time = System.currentTimeMillis() - start;
System.out.println(time);
return true;
}

谢谢!

解决方案

如,您不能重用已被关闭的 ExecutorService 。我建议您不要使用任何解决方法,因为(a)它们在所有情况下都无法正常工作;



您必须


  1. 实例化一个新的 ExecutorService ;或


  2. 不会终止 ExecutorService / ol>

    第一个解决方案很容易实现,所以我不详细说明。



    要在所有提交的任务完成后执行操作,您可以查看,并改用它。它包装一个 ExecutorService ,它将执行线程管理,但是运行时会被包装成一些东西,告诉 ExecutorCompletionService 当它们完成后,它可以向你报告:

      ExecutorService executor = ...; 
    ExecutorCompletionService ecs = new ExecutorCompletionService(executor);

    for(int i = 0; i ... ecs.submit(...); ...
    }

    for(int i = 0; i ecs.take();
    }

    方法 take() ExecutorCompletionService 类将阻塞,直到任务完成(正常或突然)。它会返回未来,因此您可以检查结果。



    我希望这可以帮助您,因为我没有完全理解你的问题。


    I have a .csv file containing over 70 million lines of which each line is to generate a Runnable and then executed by threadpool. This Runnable will insert a record into Mysql.

    What's more , I want to record a position of the csv file for the RandomAccessFile to locate. The position is written to a File.I want to write this record when all the threads in threadpool are finished.So ThreadPoolExecutor.shutdown() is invoked. But when more lines come, I need a threadpool again. How can I reuse this current threadpool instead of make a new one.

    The code is as follows:

    public static boolean processPage() throws Exception {
    
        long pos = getPosition();
        long start = System.currentTimeMillis();
        raf.seek(pos);
        if(pos==0)
            raf.readLine();
        for (int i = 0; i < PAGESIZE; i++) {
            String lineStr = raf.readLine();
            if (lineStr == null)
                return false;
            String[] line = lineStr.split(",");
            final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
            //System.out.println("userId: "+log.getUserId()%512);
    
            pool.execute(new Runnable(){
                public void run(){
                    try {
                        experienceService.insertExperienceLog(log);
                    } catch (BaseException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            long end = System.currentTimeMillis();
        }
    
        BufferedWriter resultWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream(new File(
                        RESULT_FILENAME), true)));
        resultWriter.write("\n");
        resultWriter.write(String.valueOf(raf.getFilePointer()));
        resultWriter.close();
        long time = System.currentTimeMillis()-start;
        System.out.println(time);
        return true;
    }
    

    Thanks !

    解决方案

    As stated in the documentation, you cannot reuse an ExecutorService that has been shut down. I'd recommend against any workarounds, since (a) they may not work as expected in all situations; and (b) you can achieve what you want using standard classes.

    You must either

    1. instantiate a new ExecutorService; or

    2. not terminate the ExecutorService.

    The first solution is easily implemented, so I won't detail it.

    For the second, since you want to execute an action once all the submitted tasks have finished, you might take a look at ExecutorCompletionService and use it instead. It wraps an ExecutorService which will do the thread management, but the runnables will get wrapped into something that will tell the ExecutorCompletionService when they have finished, so it can report back to you:

    ExecutorService executor = ...;
    ExecutorCompletionService ecs = new ExecutorCompletionService(executor);
    
    for (int i = 0; i < totalTasks; i++) {
      ... ecs.submit(...); ...
    }
    
    for (int i = 0; i < totalTasks; i++) {
      ecs.take();
    }
    

    The method take() on the ExecutorCompletionService class will block until a task has finished (either normally or abruptly). It will return a Future, so you can check the results if you wish.

    I hope this can help you, since I didn't completely understand your problem.

    这篇关于如何在关闭后重新使用线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-27 20:47