上一篇博客介绍了多线程与多进程的理论部分,这篇博客将参考博客以及各种教程完成Python多进程实现部分。

文章目录

multiprocessing模块

multiprocessing

Process 类

multiprocessing.Process(group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None)
  • star() 方法启动进程,
  • join() 方法实现进程间的同步,等待所有进程退出
  • close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞。
  • target 是函数名字,需要调用的函数
  • args 函数需要的参数,以 tuple的形式传入
import multiprocessing
import os

def run_proc(name):
    print('Child process {0} {1} Running '.format(name, os.getpid()))

if __name__ == '__main__':
    print('Parent process {0} is Running'.format(os.getpid()))
    for i in range(5):
        p = multiprocessing.Process(target=run_proc, args=(str(i),))
        print('process start')
        p.start()
    p.join()
    print('Process close')
Parent process 53067 is Running
process start
Child process 0 55047 Running
process start
process start
Child process 1 55048 Running
process start
Child process 2 55049 Running
process start
Child process 3 55050 Running
Child process 4 55051 Running
Process close

Process类详解

Process 类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建。
Process对象的初始化参数为Process(group=None, target=None, name=None, args=(), kwargs={}),其中group参数必须为None(为了与threading.Thread的兼容),target指向可调用对象(该对象在新的子进程中运行),name是为该子进程命的名字(默认是Proess-1,Process-2, …这样),args是被调用对象的位置参数的元组列表,kwargs是被调用对象的关键字参数。

子进程终结时会通知父进程并清空自己所占据的内存,在内核里留下退出信息(exit code,如果顺利运行,为0;如果有错误或异常状况,为大于零的整数)。父进程得知子进程终结后,需要对子进程使用wait系统调用,wait函数会从内核中取出子进程的退出信息,并清空该信息在内核中占据的空间。
如果父进程早于子进程终结,子进程变成孤儿进程,孤儿进程会被过继给init进程,init进程就成了该子进程的父进程,由init进程负责该子进程终结时调用wait函数。如果父进程不对子进程调用wait函数,子进程成为僵尸进程。僵尸进程积累时,会消耗大量内存空间。

Process类join方法

  • 如果在父进程中不调用p.join方法,则主进程与父进程并行工作:
from multiprocessing import Process
import time

def func():
    print("Child process start, %s" % time.ctime())
    time.sleep(2)
    print("Child process end, %s" % time.ctime())


if __name__ == "__main__":
    print("Parent process start, %s" % time.ctime())
    p = Process(target=func)
    p.start()
    # p.join()
    time.sleep(1)
    print("Parent process end, %s" % time.ctime())

结果

Parent process start, Sun Oct  7 18:41:18 2018
Child process start, Sun Oct  7 18:41:18 2018
Parent process end, Sun Oct  7 18:41:19 2018
Child process end, Sun Oct  7 18:41:20 2018

如果开启了p.join的调用,结果为

Parent process start, Sun Oct  7 18:41:37 2018
Child process start, Sun Oct  7 18:41:37 2018
Child process end, Sun Oct  7 18:41:39 2018
Parent process end, Sun Oct  7 18:41:40 2018

Process类守护进程

  • 另外一种情况是将子进程设置为守护进程,则父进程在退出时不会关注子进程是否结束而直接退出:
from multiprocessing import Process
import time

def func():
    print("Child process start, %s" % time.ctime())
    time.sleep(2)
    print("Child process end, %s" % time.ctime())


if __name__ == "__main__":
    print("Parent process start, %s" % time.ctime())
    p = Process(target=func)
    # 守护进程一定要在start方法调用之前设置
    p.daemon = True
    p.start()
    # p.join()
    time.sleep(1)
    print("Parent process end, %s" % time.ctime())

结果

Parent process start, Sun Oct  7 18:45:58 2018
Child process start, Sun Oct  7 18:45:58 2018
Parent process end, Sun Oct  7 18:45:59 2018
Child process end, Sun Oct  7 18:46:00 2018

如果开启主进程对join方法的调用,主进程还是会等待守护子进程结束

Parent process start, Sun Oct  7 18:46:45 2018
Child process start, Sun Oct  7 18:46:45 2018
Child process end, Sun Oct  7 18:46:47 2018
Parent process end, Sun Oct  7 18:46:48 2018

Pool 类

Pool 可以提供指定数量的进程供用户使用,默认是 CPU 核数。当有新的请求提交到 Pool 的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待

  • Pool 对象调用 join 方法会等待所有的子进程执行完毕
  • 调用 join 方法之前,必须调用 close
  • 调用 close 之后就不能继续添加新的 Process 了

pool.apply_async

apply_async 方法用来同步执行进程,允许多个进程同时进入池子。

import multiprocessing
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
        p.apply_async(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')
current process 921
Waiting for all subprocesses done...
Task 0 pid 922 is running, parent id is 921
Task 1 pid 923 is running, parent id is 921
Task 2 pid 924 is running, parent id is 921
Task 0 end.
Task 3 pid 922 is running, parent id is 921
Task 1 end.
Task 4 pid 923 is running, parent id is 921
Task 2 end.
Task 5 pid 924 is running, parent id is 921
Task 3 end.
Task 4 end.
Task 5 end.
All processes done!

pool.apply

该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。

import multiprocessing
import os
import time

def run_task(name):
    print('Task {0} pid {1} is running, parent id is {2}'.format(name, os.getpid(), os.getppid()))
    time.sleep(1)
    print('Task {0} end.'.format(name))

if __name__ == '__main__':
    print('current process {0}'.format(os.getpid()))
    p = multiprocessing.Pool(processes=3)
    for i in range(6):
        p.apply(run_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All processes done!')
Task 0 pid 928 is running, parent id is 927
Task 0 end.
Task 1 pid 929 is running, parent id is 927
Task 1 end.
Task 2 pid 930 is running, parent id is 927
Task 2 end.
Task 3 pid 928 is running, parent id is 927
Task 3 end.
Task 4 pid 929 is running, parent id is 927
Task 4 end.
Task 5 pid 930 is running, parent id is 927
Task 5 end.
Waiting for all subprocesses done...
All processes done!

Queue 进程间通信

Queue 用来在多个进程间通信(队列,先进先出)。Queue 有两个方法,get 和 put。

put 方法

put 方法用来插入数据到队列中,参数为blockedtimeout

  • blocked = True(默认值),timeout 为正
该方法会阻塞 timeout 指定的时间,直到该队列有剩余空间。
如果超时,抛出 Queue.Full 异常。
  • blocked = False
如果 Queue 已满,立刻抛出 Queue.Full 异常

get 方法

get 方法用来从队列中读取并删除一个元素。参数为blockedtimeout

  • blocked = True(默认值),timeout 为正
等待时间内,没有取到任何元素,会抛出 Queue.Empty 异常。
  • blocked = True
Queue 有一个值可用,立刻返回改值;Queue 没有任何元素
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def proc_write(q,urls):
    print('Process(%s) is writing...' % os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' % url)
        time.sleep(random.random())
# 读数据进程执行的代码:
def proc_read(q):
    print('Process(%s) is reading...' % os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue.' % url)
if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q,['url_1', 'url_2', 'url_3']))
    proc_writer2 = Process(target=proc_write, args=(q,['url_4','url_5','url_6']))
    proc_reader = Process(target=proc_read, args=(q,))
    # 启动子进程proc_writer,写入:
    proc_writer1.start()
    proc_writer2.start()
    # 启动子进程proc_reader,读取:
    proc_reader.start()
    # 等待proc_writer结束:
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader进程里是死循环,无法等待其结束,只能强行终止:
    proc_reader.terminate()
Process(1083) is writing...
Put url_1 to queue...
Process(1084) is writing...
Put url_4 to queue...
Process(1085) is reading...
Get url_1 from queue.
Get url_4 from queue.
Put url_5 to queue...
Get url_5 from queue.
Put url_2 to queue...
Get url_2 from queue.
Put url_6 to queue...
Get url_6 from queue.
Put url_3 to queue...
Get url_3 from queue.

Pipe 进程间通信

常用来在两个进程间通信,两个进程分别位于管道的两端。
示例一

from multiprocessing import Process, Pipe

def send(pipe):
    pipe.send(['spam'] + [42, 'egg'])   # send 传输一个列表
    pipe.close()

if __name__ == '__main__':
    (con1, con2) = Pipe()                            # 创建两个 Pipe 实例
    sender = Process(target=send, args=(con1, ))     # 函数的参数,args 一定是实例化之后的 Pip 变量,不能直接写 args=(Pip(),)
    sender.start()                                   # Process 类启动进程
    print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 从send收到消息
    con2.close()                                     # 关闭管道

结果

con2 got: ['spam', 42, 'egg']

示例二

from multiprocessing import Process, Pipe

def talk(pipe):
    pipe.send(dict(name='Bob', spam=42))            # 传输一个字典
    reply = pipe.recv()                             # 接收传输的数据
    print('talker got:', reply)

if __name__ == '__main__':
    (parentEnd, childEnd) = Pipe()                  # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
    child = Process(target=talk, args=(childEnd,))  # 创建一个 Process 进程,名称为 child
    child.start()                                   # 启动进程
    print('parent got:', parentEnd.recv())          # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
    parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
    child.join()                                    # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
    print('parent exit')

结果

parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
parent exit

共享内存

在进程间共享状态可以使用multiprocessing.Valuemultiprocessing.Array这样特殊的共享内存对象:

from multiprocessing import Process, Value, Array

def func(n, a):
    n.value = 3.1415926
    for i in range(len(a)):
        a[i] = -i

if __name__ == "__main__":
    # 'd'表示浮点型数据,'i'表示整数
    n = Value('d', 0.0)
    a = Array('i', range(10))
    p = Process(target=func, args=(n, a,))
    p.start()
    p.join()

    print(n.value)
    print(a[:])

结果

3.1415926
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

参考

博客https://blog.csdn.net/cityzenoldwang/article/details/78584175
博客https://blog.csdn.net/a464057216/article/details/52735584

10-07 19:04