问题描述
我有一个超过700K +行的巨大CSV文件.我必须解析该CSV文件的行并进行操作.我想通过使用线程来做到这一点.我最初尝试做的事情很简单.每个线程应处理CSV文件的唯一行.我的行数有限,只能读取3000条.我创建了三个线程.每个线程应读取一行CSV文件.以下是代码:
I have a huge CSV file with over 700K + lines. I have to parse lines of that CSV file and do operations. I thought of doing it by using threading. What I attempt to do at the first is simple. Every thread should process unique lines of the CSV file. I have a limited number of lines to read to 3000 only. I create three threads. Each thread should read a line of the CSV file. Following is the code:
import java.io.*;
class CSVOps implements Runnable
{
static int lineCount = 1;
static int limit = 3000;
BufferedReader CSVBufferedReader;
public CSVOps(){} // Default constructor
public CSVOps(BufferedReader br){
this.CSVBufferedReader = br;
}
private synchronized void readCSV(){
System.out.println("Current thread "+Thread.currentThread().getName());
String line;
try {
while((line = CSVBufferedReader.readLine()) != null){
System.out.println(line);
lineCount ++;
if(lineCount >= limit){
break;
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
readCSV();
}
}
class CSVResourceHandler
{
String CSVPath;
public CSVResourceHandler(){ }// default constructor
public CSVResourceHandler(String path){
File f = new File(path);
if(f.exists()){
CSVPath = path;
}
else{
System.out.println("Wrong file path! You gave: "+path);
}
}
public BufferedReader getCSVFileHandler(){
BufferedReader br = null;
try{
FileReader is = new FileReader(CSVPath);
br = new BufferedReader(is);
}
catch(Exception e){
}
return br;
}
}
public class invalidRefererCheck
{
public static void main(String [] args) throws InterruptedException
{
String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv";
CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV);
CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler());
Thread t1 = new Thread(ops);
t1.setName("T1");
Thread t2 = new Thread(ops);
t1.setName("T2");
Thread t3 = new Thread(ops);
t1.setName("T3");
t1.start();
t2.start();
t3.start();
}
}
类CSVResourceHandler简单地查找所传递的文件是否存在,然后创建一个BufferedReader并将其提供.该阅读器被传递给CSVOps类.它具有readCSV方法,该方法读取CSV文件的一行并进行打印.上限设置为3000.
Class CSVResourceHandler simple finds if the passed file exists and then creates a BufferedReader and gives it. This reader is passed to the CSVOps class. It has a method, readCSV, which reads a single line of the CSV file and prints it. There is a limit set to 3000.
现在,为了使线程不搞乱计数,我将这些限制和计数变量都声明为静态.当我运行该程序时,我得到奇怪的输出.我仅获得约1000条记录,有时却获得1500条记录.它们是随机排列的.在输出的最后,我得到了两行CSV文件,并且当前线程名称显示为main !!
Now for threads to not mess up with count, I declare those limit and count variable both as static. When I run this program I get weird output. I get only about 1000 records, and sometimes I get 1500. They are in random order. At the end of output I get two lines of the CSV file and the current thread name comes out to be main!!
我是线程的新手.我想阅读此CSV文件以使其变得更快.该怎么办?
I am very much a novice with threads. I want reading this CSV file to become fast. What can it be done?
推荐答案
我建议大块读取文件.分配一个大的缓冲区对象,读取一个块,从头开始解析回去,以找到最后一个EOL字符,将缓冲区的最后一位复制到临时字符串中,在EOL + 1处将空值推入缓冲区,在缓冲区中排队参考,立即创建一个新的引用,首先复制临时字符串,然后填满缓冲区的其余部分并重复直到EOF.重复直到完成.使用线程池来解析/处理缓冲区.
I suggest reading the file in big chunks. Allocate a big buffer object, read a chunk, parse back from the end to find the last EOL char, copy the last bit of the buffer into a temp string, shove a null into the buffer at the EOL+1, queue off the buffer reference, immediately create a new one, copy in the temp string first, then fill up the rest of the buffer and repeat until EOF. Repeat until done. Use a pool of threads to parse/process the buffers.
您必须将有效行的整个块排队.单行排队将导致线程通信花费比解析更长的时间.
You have to queue up whole chunks of valid lines. Queueing off single lines will result in the thread comms taking longer than the parsing.
请注意,这和类似的操作可能会导致池中的线程对块进行乱序"处理.如果必须保留顺序(例如,输入文件已排序,而输出将进入另一个必须保持排序的文件),则可以让块汇编程序线程在每个块对象中插入一个序列号.然后,池线程可以将处理后的缓冲区传递给另一个线程(或任务),该线程(或任务)保留乱序块的列表,直到所有先前的块都进入为止.
Note that this, and similar, will probably result in the chunks being processed 'out-of-order' by the threads in the pool. If order must be preserved, (for example, the input file is sorted and the output is going into another file which must remain sorted), you can have the chunk-assembler thread insert a sequence-number in each chunk object. The pool threads can then pass processed buffers to yet another thread, (or task), that keeps a list of out-of-order chunks until all previous chunks have come in.
多线程不一定是困难/危险/无效的.如果您使用队列/池/任务,请避免同步/联接,不要持续创建/终止/销毁线程,而只能在大型缓冲对象周围排队,一次只能有一个线程可以处理.您应该会看到良好的加速效果,几乎没有死锁,错误共享等的可能性.
Multithreading does not have to be difficult/dangerous/ineffective. If you use queues/pools/tasks, avoid synchronize/join, don't continually create/terminate/destroy threads and only queue around large buffer objects that only one thread ever gets to work on at a time. You should see a good speedup with next-to-no possibility of deadlocks, false-sharing, etc.
这种加速的下一步是预先分配一个缓冲池队列,以消除连续创建/删除缓冲和相关GC的风险,并在开始时使用(L1高速缓存大小)死区"每个缓冲区,以完全消除缓存共享.在多核设备上(尤其是配备SSD的设备),这样做会足够快.
The next step in such a speedup would be to pre-allocate a pool queue of buffers to eliminate continual creation/deletion of the buffers and associated GC and with a (L1 cache size) 'dead-zone' at the start of every buffer to eliminate cache-sharing completely.That would go plenty quick on a multicore box, (esp. with an SSD!).
哦,Java,对.对于空终止符,我对答案的"CplusPlus-iness"表示歉意.不过,其余几点都可以.这应该是与语言无关的答案:)
Oh, Java, right. I apologise for the 'CplusPlus-iness' of my answer with the null terminator. The rest of the points are OK, though. This should be a language-agnostic answer:)
这篇关于一个Java theading程序,它读取一个巨大的CSV文件的行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!