本文介绍了Python队列使用在线程中工作,但(显然)不是在多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

许多关于多处理使用的教程似乎并没有完全解决为什么下面的技术适用于线程而不是多线程处理。



为什么这不适用于多重处理,以及我在尝试做什么的实现?非常感谢!



线程实现(工作正常,对我有意义):

 从线程导入线程
从队列导入队列
从时间导入睡眠

线程函数
def producer_thread(n)
for x in range(10):
thread_q.put(n)

def consumer_thread():
while True:
item = thread_q。 get()
print item

如果__name__ =='__main__':
thread_q = Queue()


p_thread = Thread(target = producer_thread,args =(10,))
c_thread = Thread(target = consumer_thread)
c_thread.daemon = True
p_thread.start c_thread.start()
p_thread.join()
防止c_thread守护进程过早地取消
sleep(.001)

输出:

  10 
10
10
10
10
10
10
10
10
10

多处理实现(似乎与线程相同,但不起作用):

 从多进程导入进程,freeze_support 
从队列导入队列

multiprocessing函数
def producer_process ):
for(10):
process_q.put(n)

def consumer_process():
while True:
item = process_q.get()
print item

if __name__ =='__main__':
freeze_support()
process_q = Queue()
computer explodes
p_process = Process(target = producer_process,args =(10,))
c_process = Process(target = consumer_process)
c_process.daemon = True
p_process.start(); c_process.start()
p_process.join()

输出:

 跟踪(最近一次调用):
< module>中的文件< string>
文件C:\Users\J\Anaconda\lib\multiprocessing\forking.py,第381行,在主
self = load(from_parent)
文件C:\Users\J\Anaconda\lib\pickle.py,行1378,在加载
返回Unpickler(文件).load()
文件C:\ Users \J\Anaconda\lib\pickle.py,行858,在加载
dispatch [key](self)
文件C:\Users\J\Anaconda \lib \pickle.py,第1090行,在load_global
中klass = self.find_class(module,name)
文件C:\Users\\\\\ \\ pickle.py,行1126,在find_class
klass = getattr(mod,name)
AttributeError:'module'对象没有属性'get_successors'
Traceback :
在< module>中的文件< string>,第1行
文件C:\Users\J\Anaconda\lib\multiprocessing\forking.py,第381行,在主
self = load(from_parent)
文件C:\Users\J\Anaconda\lib\pickle.py,行1378,在加载
返回Unpickler(文件).load()
文件C:\ Users \J\Anaconda\lib\pickle.py,行858,在加载
dispatch [key](self)
文件C:\Users\J\Anaconda \lib \pickle.py,第1090行,在load_global
中klass = self.find_class(module,name)
文件C:\Users\\\\\ \\ pickle.py,行1126,在find_class
klass = getattr(mod,name)
AttributeError:'module'对象没有属性'get_successors'
进程进程33:
Traceback(最近调用最后一次):
文件C:\Users\J\Anaconda\lib\multiprocessing\process.py,第258行,在_bootstrap
self .run()
文件C:\Users\J\Anaconda\lib\multiprocessing\process.py,第114行,在运行
self._target(* self。 _args,** self._kwargs)
文件C:\Users\J\Documents\Python Scripts\producer_consumer_test.py,第18行,在消费者
item = q.get ()
NameError:全局名'q'未定义
过程进程32:
回溯(最近一次调用最后):
文件C:\Users \ J \Anaconda\lib\multiprocessing\process.py,行258,在_bootstrap
self.run()
文件C:\Users\\\\\ lib \multiprocessing\process.py,第114行,在运行
self._target(* self._args,** self._kwargs)
文件C:\Users\J\\ \\ documents \Python Scripts \producer_consumer_test.py,第14行,在生产者
中q.put(n)
NameError:全局名称'q'未定义
Process Process-34 :
Traceback(最近最后一次调用):
在_bootstrap $ b中的文件C:\Users\J\Anaconda\lib\multiprocessing\process.py,第258行$ b self.run()
文件C:\Users\J\Anaconda\lib\multiprocessing\process.py,第114行,在运行
self._target * self._args,** self._kwargs)
文件C:\Users\J\Documents\Python Scripts\producer_consumer_test.py,第14行,在生产者
q。 put(n)
NameError:全局名'q'未定义
过程进程-35:
回溯(最近最后调用):
文件C:\Users \J\Anaconda\lib\multiprocessing\process.py,第258行,在_bootstrap
self.run()
文件C:\Users\J\Anaconda \ lib \multiprocessing\process.py,第114行,在运行
self._target(* self._args,** self._kwargs)
文件C:\Users \ J \Documents \Python Scripts \producer_consumer_test.py,第18行,在消费者中
item = q.get()
NameError:全局名称'q'未定义
Traceback (最近最后调用):
在< module>中的文件< string>,第1行。
文件C:\Users\J\Anaconda\lib\multiprocessing\forking.py,第381行,在主
self = load(from_parent)
文件C:\Users\J\Anaconda\lib\pickle.py,行1378,在加载
返回Unpickler(文件).load()
文件C:\ Users \J\Anaconda\lib\pickle.py,行858,在加载
dispatch [key](self)
文件C:\Users\J\Anaconda \lib \pickle.py,第1090行,在load_global
中klass = self.find_class(module,name)
文件C:\Users\\\\\ \\ pickle.py,行1126,在find_class
klass = getattr(mod,name)
AttributeError:'module'对象没有属性'consumer'
Traceback :
在< module>中的文件< string>,第1行
文件C:\Users\J\Anaconda\lib\multiprocessing\forking.py,第381行,在主
self = load(from_parent)
文件C:\Users\J\Anaconda\lib\pickle.py,行1378,在加载
返回Unpickler(文件).load()
文件C:\ Users \J\Anaconda\lib\pickle.py,行858,在加载
dispatch [key](self)
文件C:\Users\J\Anaconda \lib \pickle.py,第1090行,在load_global
中klass = self.find_class(module,name)
文件C:\Users\\\\\ \\ pickle.py,行1126在find_class
klass = getattr(mod,name)
AttributeError:'module'object没有属性'producer'

p>

解决方案

import Queue 适用于多线程应用程序:不适用于多进程应用程序。 p>

来自多处理导入队列适用于多进程应用程序:



根据文档multiprocessing.Queue是一个接近克隆的Queue.Queue



除了multiprocessing.Queue有JoinableQueue它有task_done()和join()方法,以防万一你需要它。



在你的例子中,我不认为你需要JoinableQueue。您尝试过以下操作:

 来自多进程导入(Process,Queue,freeze_support)

def producer q,n):
对于范围(n)中的x:
q.put(x)
q.put(end)

$ b b def consumer(q):
while True:
item = q.get()
if item ==end:
break
打印项目

if __name__ =='__main__':
freeze_support()
q = Queue()
c = Process(target = consumer,args =(q,))
c.start()
p = Process(target = producer,args =(q,10))
p.start()
c.join()

在Linux和Windows上测试。


many of the tutorials on multiprocessing use don't seem to completely address why the technique below works for threading but not multiprocessing.

Why doesn't this work for multiprocessing, and what is the implementation for what I am trying to do? Thank you!

Threading implementation (works fine, makes sense to me):

from threading import Thread
from Queue import Queue
from time import sleep    

"""threading functions"""
def producer_thread(n):
    for x in range(10):
        thread_q.put(n)

def consumer_thread():
    while True:
        item = thread_q.get()
        print item

if __name__ == '__main__':
    thread_q = Queue()

    """works fine"""
    p_thread = Thread(target=producer_thread, args=(10,))
    c_thread = Thread(target=consumer_thread)
    c_thread.daemon=True
    p_thread.start(); c_thread.start()
    p_thread.join()
    """prevents c_thread daemon process from cancelling prematurely"""
    sleep(.001)

Output:

10
10
10
10
10
10
10
10
10
10

Multiprocessing implementation (seems to be identical to threading but doesn't work at all):

from multiprocessing import Process, freeze_support
from Queue import Queue

"""multiprocessing functions"""
def producer_process(n):
    for x in range(10):
        process_q.put(n)

def consumer_process():
    while True:
        item = process_q.get()
        print item
#            
if __name__ == '__main__':
    freeze_support()
    process_q = Queue()        
    """computer explodes"""
    p_process = Process(target=producer_process, args=(10,))
    c_process = Process(target=consumer_process)
    c_process.daemon=True
    p_process.start(); c_process.start()
    p_process.join()

Output:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
    q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
    item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
    klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'
解决方案

import Queue is for multithreads apps: https://docs.python.org/2/library/queue.html not for multi processes apps.

from multiprocessing import Queue is for multiprocesses apps: https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes

According to the documentation multiprocessing.Queue "is a near clone of Queue.Queue"

Besides multiprocessing.Queue there is the JoinableQueue that has task_done() and join() methods in case you need it.

In your example I don't think you need JoinableQueue. Did you try this:

from multiprocessing import (Process, Queue, freeze_support)

def producer(q, n):
    for x in range(n):
        q.put(x)
    q.put("end")


def consumer(q):
    while True:
        item = q.get()
        if item == "end":
            break
        print item

if __name__ == '__main__':
    freeze_support()
    q = Queue()
    c = Process(target=consumer, args=(q,))
    c.start()
    p = Process(target=producer, args=(q, 10))
    p.start()
    c.join()

Tested in Linux and Windows.

这篇关于Python队列使用在线程中工作,但(显然)不是在多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 02:24