我想设置一些接受输入并对其进行处理的流程,并且此结果的结果是我要处理的另一项任务。本质上,每个任务都会导致零个或多个(相同类型的)新任务,最终所有任务都不会产生新任务。

我认为一个队列对此很有好处,所以我有一个输入队列和一个结果队列来添加没有新结果的任务。在任何时候,队列可能都是空的,但是如果另一个进程正在执行任务,则可以添加更多队列。

因此,我只希望它在所有进程同时尝试从输入队列中获取时结束。

我对python多处理和一般的多处理都是全新的。

编辑以添加我的意思的基本概述:

class Consumer(Process):
    def __init__(self, name):
        super().__init__(name=name)

    def run():
        # This is where I would have the task try to get a new task off of the
        # queue and then calculate the results and put them into the queue
        # After which it would then try to get a new task and repeat

        # If this an all other processes are trying to get and the queue is
        # empty That is the only time I know that everything is complete and can
        # continue
        pass

def start_processing():
    in_queue = Queue()
    results_queue = Queue()
    consumers = [Consumer(str(i)) for i in range(cpu_count())]

    for i in consumers:
        i.start()

    # Wait for the above mentioned conditions to be true before continuing

最佳答案

JoinableQueue被设计为适合此目的。加入JoinableQueue将阻止,直到有正在进行的任务为止。

您可以按以下方式使用它:主进程将产生一定数量的工作进程,为它们分配JoinableQueue。工作进程将使用队列来产生和使用新任务。主进程将通过加入队列来等待,直到没有更多的任务正在进行。之后,它将终止工作进程并退出。

一个非常简化的示例(伪代码):

def consumer(queue):
    for task in queue.get():
        results = process_task(task)

        if 'more_tasks' in results:
            for new_task in results['more_tasks']:
                queue.put(new_task)

        # signal the queue that a task has been completed
        queue.task_done()

def main():
    queue = JoinableQueue()

    processes = start_processes(consumer, queue)

    for task in initial_tasks:
        queue.put(task)

    queue.join()  # block until all work is done

    terminate_processes(processes)

关于python - 当所有进程都试图从队列中获取并且队列为空时,结束处理?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46743939/

10-12 06:06