本文介绍了Python 3中大规模,蛮力最大化的高效多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我最近的问题的扩展避免Python中的竞争条件3的多处理队列.希望这个问题的版本更具体.

This is an extension of my recent question Avoiding race conditions in Python 3's multiprocessing Queues. Hopefully this version of the question is more specific.

TL; DR:在使用multiprocessing.Queue从队列中馈送工作进程的多处理模型中,为什么我的工作进程如此闲置?不会互相争用共享队列的锁,但是队列实际上花费很多时间,实际上只是空了.主要过程是运行一个I/O绑定线程-是减慢了CPU绑定输入队列的速度吗?

TL;DR: In a multiprocessing model where worker processes are fed from a queue using multiprocessing.Queue, why are my worker processes so idle? Each process has its own input queue so they're not fighting each other for a shared queue's lock, but the queues spend a lot of time actually just empty. The main process is running an I/O-bound thread -- is that slowing the CPU-bound filling of the input queues?

我试图在一定约束下找到N个集合的笛卡尔积的最大元素,每个元素都带有M_i个元素(对于0≤i≤N).回想一下笛卡尔乘积的元素是长度为N的元组,其元素是N个集合的元素.我将这些元组称为组合",以强调我正在遍历原始集合的每个组合的事实.当我的函数is_feasible返回True时,组合满足约束条件.在我的问题中,我试图找到其元素具有最大权重的组合:sum(element.weight for element in combination).

I'm trying to find the maximal element of the Cartesian product of N sets each with M_i elements (for 0 <= i < N) under a certain constraint. Recall that the elements of the Cartesian product are length-N tuples whose elements are are elements of the N sets. I'll call these tuples 'combinations' to emphasize the fact that I'm looping over every combination of the original sets. A combination meets the constraint when my function is_feasible returns True. In my problem, I'm trying to find the combination whose elements have the greatest weight: sum(element.weight for element in combination).

我的问题很大,但是我公司的服务器也很大. 我正在尝试将以下串行算法重写为并行算法.

My problem size is large, but so is my company's server. I'm trying to rewrite the following serial algorithm as a parallel algorithm.

from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
    """Return the largest (total-weight, combination) tuple from all
    possible combinations of the elements in the several sets, subject
    to the constraint that is_feasible(combo) returns True."""
    return max(
                map(
                    lambda combination: (
                        sum(element.weight for element in combination),
                        combination
                    ),
                    filter(
                        is_feasible, # Returns True if combo meets constraint
                        product(*sets)
                    )
                ),
                key=itemgetter(0) # Only maximize based on sum of weight
            )

我当前的多处理方法是创建工作进程并将其组合与输入队列一起提供.当工人收到毒丸时,他们将最佳组合他们已经在输出队列中看到并退出.我从主进程的主线程填充输入队列.这项技术的一个优势是,我可以从主进程中生成一个辅助线程来运行监视工具(我可以使用REPL来查看到目前为止已处理了多少组合以及队列是否满).

My current multiprocessing approach is to create worker processes and feed them combinations with an input queue. When the workers receive a poison pill they place the best combination they've seen on an output queue and exit. I fill the input queue from the main thread of the main process. One advantage of this technique is that I can spawn a secondary thread from the main process to run a monitoring tool (just a REPL I can use to see how many combinations have been processed so far and how full the queues are).

                    +-----------+
            in_q0   |   worker0 |----\
            /-------+-----------+     \
+-----------+   in_q1   +-----------+  \ out_q  +-----------+
|   main    |-----------|   worker1 |-----------|   main    |
+-----------+           +-----------+  /        +-----------+
            \-------+-----------+     /
            in_q2   |   worker2 |----/
                    +-----------+

我本来是让所有工作人员都从一个输入队列中读取的,但是发现他们都没有碰上CPU.考虑到他们花了所有时间等待queue.get()解除阻塞,我给了他们自己的队列.这增加了对CPU的压力,因此我认为工人的工作频率更高.但是,队列大部分时间都是空的! (我从我提到的监视REPL中知道这一点).这向我表明,主循环填充队列很慢.这是循环:

I originally had all the workers reading from one input queue but found that none of them were hitting the CPU. Figuring that they were spending all their time waiting for queue.get() to unblock, I gave them their own queues. That increased pressure on the CPU, so I figured the workers were active more often. However, the queues spend most of their time empty! (I know this from the monitoring REPL I mentioned). This suggests to me that the main loop filling up the queues is slow. Here is that loop:

from itertools import cycle
main():
    # (Create workers, each with its own input queue)
    # Cycle through each worker's queue and add a combination to that queue
    for combo, worker in zip(product(*sets), cycle(workers)):
        worker.in_q.put(combo)
    # (Collect results and return)

我想瓶颈是worker.in_q.put().我该如何加快速度?我的第一个直觉是使工作人员变慢,但是那根本没有道理……监视器线程是否经常停止循环是一个问题吗?我怎么能知道?

I'm guessing the bottleneck is worker.in_q.put(). How do I make that faster? My first instinct was to make the workers slower, but that just doesn't make sense... Is the problem that the monitor thread is stopping the loop too often? How would I be able to tell?

或者,有没有另一种方法可以实现这一功能,而无需等待太多的锁?

Alternatively, is there another way to implement this that doesn't involve so much waiting on locks?

推荐答案

您的元素是什么样的?腌制它们以将其放入队列的速度可能很慢,这显然是瓶颈.请注意,每个元素都是一遍又一遍地独立腌制的.

What do your elements look like? It could be that pickling them to put them in the queue is slow, which would obviously be a bottleneck. Note that each element is being independently pickled over and over and over again.

在这种情况下,这种方法可能会有所帮助:

If this is the case, this approach might help:

  • 选择基数大于等于您的工人数的集合.理想情况下,数量应该远远超过工人数量.将此集合称为A,并为每个工作人员分配近似相等的A子集.将该子集传输给每个工人.
  • 将除A以外的所有集合的全部内容分发给每个工作程序(可能通过pickle.dumps一次,然后将相同的字符串传输给每个工作程序,或者可能通过共享内存或其他方式).
  • 然后,每个工作人员都拥有完成其子集所需的全部信息.它可以在product(my_A_subset, *other_sets)上以快乐的方式开始(可能顺序不同),在每个作业(或每三个作业或其他任何作业)之间轮询某种停止信号.不需要通过队列,一位共享内存值可以正常工作.
  • Choose a set with cardinality >= your number of workers. Ideally, it'd be much more than the number of workers. Call this set A, and assign approximately equal subsets of A to each worker. Transmit that subset to each worker.
  • Distribute the full contents of all sets other than A to each of the workers (probably through pickle.dumps once and then transmitting the same string to each worker, or possibly through shared memory or whatever else).
  • Then each worker has the full information it needs to do its subset. It can start on its merry way over product(my_A_subset, *other_sets) (possibly ordered differently), polling for some kind of stop signal between each job (or every three jobs or whatever). This doesn't need to be through a queue, a one-bit shared-memory value works fine.

这篇关于Python 3中大规模,蛮力最大化的高效多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-14 05:43