本文介绍了利用"fork"进行多处理.使用Python 3.6.1的Linux/Intel Xeon上的上下文块?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题描述
我调整了此答案的代码(见下文) .但是,在Linux上运行此脚本时(因此,命令行:python script_name.py),它将为所有作业打印jobs running: x,但之后似乎停滞了.但是,当我使用spawn方法(mp.set_start_method('spawn'))时,效果很好,并立即开始打印counter变量的值(请参见listener方法).

Problem description
I adjusted the code from this answer a little bit (see below). However when running this script on Linux (so command line: python script_name.py) it will print jobs running: x for all the jobs but then just seems to stuck after that. However when I use the spawn method (mp.set_start_method('spawn')) it works out fine and immediately starts printing the value of the counter variable (see listener method).

问题

  • 为什么仅在生成过程中起作用?
  • 如何调整代码使其与fork一起使用? (因为它可能更快)
  • Why does it work only when spawning processes?
  • How can I adjust the code so it works with fork? (because it is probably faster)

代码

import io
import csv
import multiprocessing as mp

NEWLINE = '\n'

def file_searcher(file_path):
    parsed_file = csv.DictReader(io.open(file_path, 'r', encoding='utf-8'), delimiter='\t')

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool(mp.cpu_count())

    # put listener to work first
    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for row in parsed_file:
        print('jobs running: ' + str(len(jobs) + 1))
        job = pool.apply_async(worker, (row, q))
        jobs.append(job)

  # collect results from the workers through the pool result queue
    for job in jobs:
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()
    pool.join()

def worker(genome_row, q):
    complete_data = []
    #data processing
    #ftp connection to retrieve data
    #etc.
    q.put(complete_data)
    return complete_data

def listener(q):
    '''listens for messages on the q, writes to file. '''
    f = io.open('output.txt', 'w', encoding='utf-8')
    counter = 0
    while 1:
        m = q.get()
        counter +=1
        print(counter)
        if m == 'kill':
            break
        for x in m:
            f.write(x + NEWLINE)
        f.flush()
    f.close()

if __name__ == "__main__":
   file_searcher('path_to_some_tab_del_file.txt')

处理器信息

Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                20
On-line CPU(s) list:   0-19
Thread(s) per core:    1
Core(s) per socket:    1
Socket(s):             20
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 45
Model name:            Intel(R) Xeon(R) CPU E5-2660 v3 @ 2.60GHz
Stepping:              2
CPU MHz:               2596.501
BogoMIPS:              5193.98
Hypervisor vendor:     VMware
Virtualization type:   full
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0-19

Linux内核版本

3.10.0-514.26.2.el7.x86_64

Python版本

Python 3.6.1 :: Continuum Analytics, Inc.


日志
我按照@yacc的建议添加了代码,这将提供以下日志:


LOG
I added the code as suggested by @yacc, this will give the following log:

[server scripts]$ python main_v3.py
[INFO/SyncManager-1] child process calling self.run()
[INFO/SyncManager-1] created temp directory /tmp/pymp-2a9stjh6
[INFO/SyncManager-1] manager serving at '/tmp/pymp-2a9stjh6/listener-jxwseclw'
[DEBUG/MainProcess] requesting creation of a shared 'Queue' object
[DEBUG/SyncManager-1] 'Queue' callable returned object with id '7f0842da56a0'
[DEBUG/MainProcess] INCREF '7f0842da56a0'
[DEBUG/MainProcess] created semlock with handle 139673691570176
[DEBUG/MainProcess] created semlock with handle 139673691566080
[DEBUG/MainProcess] created semlock with handle 139673691561984
[DEBUG/MainProcess] created semlock with handle 139673691557888
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
[DEBUG/MainProcess] added worker
[INFO/ForkPoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-4] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-6] child process calling self.run()
[INFO/ForkPoolWorker-5] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-7] child process calling self.run()
[INFO/ForkPoolWorker-8] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-9] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-11] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-12] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-13] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-14] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-15] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-16] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-17] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-18] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-19] child process calling self.run()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-20] child process calling self.run()
jobs running: 1
jobs running: 2
jobs running: 3
jobs running: 4
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'
[INFO/ForkPoolWorker-21] child process calling self.run()
jobs running: 5
jobs running: 6
jobs running: 7
[DEBUG/ForkPoolWorker-2] INCREF '7f0842da56a0'
jobs running: 8
written to file
jobs running: 9
jobs running: 10
[DEBUG/ForkPoolWorker-2] thread 'MainThread' does not own a connection
[DEBUG/ForkPoolWorker-2] making connection to manager
jobs running: 11
jobs running: 12
jobs running: 13
jobs running: 14
jobs running: 15
[DEBUG/SyncManager-1] starting server thread to service 'ForkPoolWorker-2'
jobs running: 16
jobs running: 17
jobs running: 18
jobs running: 19
[DEBUG/ForkPoolWorker-4] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-3] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-5] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-6] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-7] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-8] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-10] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-9] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-11] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-13] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-14] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-12] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-15] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-16] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-18] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-17] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-20] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-19] INCREF '7f0842da56a0'
[DEBUG/ForkPoolWorker-21] INCREF '7f0842da56a0'

推荐答案

正如@jxh所暗示的,fork和spawn之间的区别很重要. 关于多处理的文档在第17.2.1.2节中指出,不同之处在于:分叉保留环境和诸如stdin/out之类的东西,而spawn只是一个全新的过程.我认为您的环境中可能有某些东西会导致工作器功能出现问题,可能是在您对其他处理的注释后面的代码中.产卵为您提供了一个干净的状态,在这些条件下一切运行良好.

As @jxh hinted, the differences between fork and spawn are important. The documentation on multiprocessing indicates in section 17.2.1.2 that the difference is: forking preserves the environment and things like stdin/out, whereas spawn just makes a fresh new process. I think you maybe have something in your environment which causes issues for the worker function, likely in the code behind your comments about other processing. Spawning gives you a clean slate and things are running fine under those conditions.

要确定发生了什么,我将让每个工作人员打印诊断消息,可能将其写到每个工作人员唯一的文件中.每次您想写一条消息时都要打开/关闭该文件,以便更新/刷新内容.

To determine what is going on, I would have each worker print diagnostic messages, probably written to a file unique to each worker. open/close that file each time you want to write a message, so the contents gets updated/flushed.

Fork不应比spawn快,因为fork需要将环境信息复制到新进程中.在任何情况下,我都认为这只是启动成本的最小值,因为工作人员需要执行一些您想并行化的计算或I/O约束工作.

Fork should not be faster than spawn, because fork needs to copy the environment information to the new process. In any case, this is only the startup cost which is minimal, I assume, because the worker needs to do some computationally or I/O bound work that you want to parallelize.

这篇关于利用"fork"进行多处理.使用Python 3.6.1的Linux/Intel Xeon上的上下文块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 04:04