我想设置一些接受输入并对其进行处理的流程,并且此结果的结果是我要处理的另一项任务。本质上,每个任务都会导致零个或多个(相同类型的)新任务,最终所有任务都不会产生新任务。
我认为一个队列对此很有好处,所以我有一个输入队列和一个结果队列来添加没有新结果的任务。在任何时候,队列可能都是空的,但是如果另一个进程正在执行任务,则可以添加更多队列。
因此,我只希望它在所有进程同时尝试从输入队列中获取时结束。
我对python多处理和一般的多处理都是全新的。
编辑以添加我的意思的基本概述:
class Consumer(Process):
def __init__(self, name):
super().__init__(name=name)
def run():
# This is where I would have the task try to get a new task off of the
# queue and then calculate the results and put them into the queue
# After which it would then try to get a new task and repeat
# If this an all other processes are trying to get and the queue is
# empty That is the only time I know that everything is complete and can
# continue
pass
def start_processing():
in_queue = Queue()
results_queue = Queue()
consumers = [Consumer(str(i)) for i in range(cpu_count())]
for i in consumers:
i.start()
# Wait for the above mentioned conditions to be true before continuing
最佳答案
JoinableQueue
被设计为适合此目的。加入JoinableQueue
将阻止,直到有正在进行的任务为止。
您可以按以下方式使用它:主进程将产生一定数量的工作进程,为它们分配JoinableQueue
。工作进程将使用队列来产生和使用新任务。主进程将通过加入队列来等待,直到没有更多的任务正在进行。之后,它将终止工作进程并退出。
一个非常简化的示例(伪代码):
def consumer(queue):
for task in queue.get():
results = process_task(task)
if 'more_tasks' in results:
for new_task in results['more_tasks']:
queue.put(new_task)
# signal the queue that a task has been completed
queue.task_done()
def main():
queue = JoinableQueue()
processes = start_processes(consumer, queue)
for task in initial_tasks:
queue.put(task)
queue.join() # block until all work is done
terminate_processes(processes)
关于python - 当所有进程都试图从队列中获取并且队列为空时,结束处理?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46743939/