我正在编写一个小脚本,用于处理大量数据。就像这样:
outproc = None
for input in input_files:
p = Popen('process_input "%s" | more_input_processing' %(input, ),
shell=True, stdout=PIPE)
for line in p.stdout.xreadlines():
if linecount % 1000000 == 0:
outfile = "output%03d" %(linecount // 1000000, )
if outproc:
outproc.stdin.close()
result = outproc.wait() # <-- deadlock here
assert result == 0, "outproc exited with %s" %(result, )
outproc = Popen('handle_output "%s"' %(outfile, ),
shell=True, stdin=PIPE)
linecount += 1
outproc.stdin.write(line)
p.stdout.close()
result = p.wait()
assert result == 0, "p exited with %s" %(result, )
但是,如文档警告所示,当我尝试等待
outproc
时遇到了僵局(请参阅注释)。文档提出的“解决方案”是使用
.communicate()
…,但是这样做将涉及在刷新之前将所有输入读入内存,这是不希望的。那么,如何在没有死锁的情况下在子流程之间传输数据?
最佳答案
您没有在子进程实际读取的管道上使用close
,因此它不会收到SIGPIPE或导致它退出的任何内容。当您有足够的数据时,只需终止该过程即可。或者,通过管道输入和输出,并使用select知道何时应该读取或写入。