本文介绍了Python 2.7.6中具有多处理功能的奇怪Queue.PriorityQueue行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如您从标题中所知道的,我正在尝试将PriorityQueue与多处理一起使用.更准确地说,我想创建共享的PriorityQueue,编写了一些代码,但它没有按预期运行.

As you know from the title, I'm trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make shared PriorityQueue, wrote some code and it doesn't run as I expected.

查看代码:

import time
from multiprocessing import Process, Lock
from Queue import PriorityQueue


def worker(queue):
    lock = Lock()
    with lock:
        for i in range(100):
            queue.put(i)

    print "worker", queue.qsize()


pr_queue = PriorityQueue()
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

获得以下输出:

worker 100
main 0

正在发生的事情以及如何正确地做我想做的事情?谢谢.

What's happening and how to do what I want the right way?Thank you.

推荐答案

问题不是在这种情况下不能腌制-如果您使用的是类似Unix的平台,则可以将队列传递给子对象而无需腌制(在Windows上,我认为您会在这里得到一个腌制错误).根本问题是您没有使用进程安全队列.只能在进程之间使用的队列是 Queue对象放在multiprocessing模块中.不幸的是,没有可用的PriorityQueue实现.但是,您可以通过向 multiprocessing.Manager 类注册PriorityQueue来轻松创建一个,像这样:

The problem isn't that it's not picklable in this case - if you're using a Unix-like platform, the queue can be passed to the child without pickling. (On Windows, I think you would get a pickling error here, though). The root problem is that you're not using a process-safe queue. The only queues that can be used between processes are the Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation available. However, you can easily create one by registering a PriorityQueue with a multiprocessing.Manager class, like this:

import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from Queue import PriorityQueue


class MyManager(SyncManager):
    pass
MyManager.register("PriorityQueue", PriorityQueue)  # Register a shared PriorityQueue

def Manager():
    m = MyManager()
    m.start()
    return m

def worker(queue):
    print(queue)
    for i in range(100):
        queue.put(i)
    print "worker", queue.qsize()


m = Manager()
pr_queue = m.PriorityQueue()  # This is process-safe
worker_process = Process(target = worker, args = (pr_queue,))
worker_process.start()

time.sleep(5)    # nope, race condition, you shall not pass (probably)
print "main", pr_queue.qsize()

输出:

worker 100
main 100

请注意,这可能不如标准multiprocessing.Queue子类那样好.基于ManagerPriorityQueue是通过创建实际上包含常规PriorityQueueManager服务器进程,然后为您的主进程和工作进程提供对象,这些对象使用IPC在服务器进程中读取/写入队列.常规multiprocessing.Queue只是向Pipe写入数据或从中读取数据.如果这是一个问题,您可以尝试通过对multiprocessing.Queue进行子类化或委派来实现自己的multiprocessing.PriorityQueue.不过,这样做可能不值得.

Note that this probably won't perform quite as well as it would if it was standard multiprocessing.Queue subclass. The Manager-based PriorityQueue is implemented by creating a Manager server process which actually contains a regular PriorityQueue, and then providing your main and worker processes with Proxy objects that use IPC to read/write to the queue in the server process. Regular multiprocessing.Queues just write/read data to/from a Pipe. If that's a concern, you could try implementing your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue. It may not be worth the effort, though.

这篇关于Python 2.7.6中具有多处理功能的奇怪Queue.PriorityQueue行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-10 22:25