我对使用python进行队列处理感到好奇。让我先描述一下:


主线程创建一个负责收集和处理消息的全局对象(该对象内部有一个队列)
该全局对象具有在线程中启动的方法。方法有权访问队列(属于该对象)并从中读取消息
主线程启动了一系列进程(带有multiprocessing.Process),这些进程将消息发布到全局对象。


问题是:对于队列处理器,队列始终为空。让我用一些代码说明问题:

import threading as t
import time
import sys
from multiprocessing import Process
if sys.version_info.major is 2:
    import Queue as queue
else:
    import queue

class ExampleRecorder:
    queue = queue.Queue()

    def run(self):
        self.thread = t.Thread(target=self.start_processor)
        self.thread.daemon = True
        self.thread.start()

    def start_processor(self):
        while 1:
            print("PROCESSOR! QUEUE ID: {}. QUEUE SIZE: {}. IS EMPTY: {}".format(id(self.queue), self.queue.qsize(), self.queue.empty()))
            time.sleep(1)

    def push_message(self, span):
        self.queue.put(span)
        print("RECORDER! QUEUE ID: {}. QUEUE SIZE: {}".format(id(self.queue), self.queue.qsize()))

er = ExampleRecorder()
er.run()

def producer():
    while 1:
        print("Adding an item")
        er.push_message("foo")
        time.sleep(1)


proc = Process(target=producer)
proc.start()


该脚本的示例输出为:

$ python3 model.py
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True
Adding an item
RECORDER! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 1
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True
Adding an item
RECORDER! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 2
PROCESSOR! QUEUE ID: <queue.Queue object at 0x1095f9b70>. QUEUE SIZE: 0. IS EMPTY: True


如您所见,队列接收对象并增长,但是对于处理器而言,它始终为空。那里可能出什么问题了?在进程中的线程中处理共享队列是否有些棘手?

附言经过Python 2.7.12和3.5.2检查

最佳答案

Queue.Queue仅在同一进程中工作,它与线程一起使用,而不与单独的进程一起使用。

您需要一个multiprocessing.Queue实例进行进程间通信,但是您还必须重组代码以将队列的实例显式传递给producer。现在,在新过程中评估ExampleRecorder定义时,每个生产者将创建一个不同的实例。

注意:使用您编写的代码,所有ExampleRecorder实例(在同一进程中)共享同一队列!您确定这是您想要的吗?

通过在queue块内定义class ...queue是类的属性,而不是其实例。这与在queue中定义__init__()非常不同。一个简单的例子:

from queue import Queue

class SampleClass:
  queue = Queue()

class AnotherSample:
  def __init__(self):
    self.queue = Queue()

inst1 = SampleClass()
inst2 = SampleClass()

inst3 = AnotherSample()
inst4 = AnotherSample()


如果我们测试queue属性,我们会看到区别:(is运算符测试两个变量是否是同一对象的别名)

inst1.queue is inst2.queue
Out[8]: True

inst3.queue is inst4.queue
Out[9]: False

关于python - Python:进程中线程中的队列处理很奇怪,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46281521/

10-16 10:11