我有一个redis列表,其中一个发布者推送一些消息(json序列化)。
另一方面,订阅服务器可以获取每个json blob并执行一些操作。最简单的方法是连续地这样做。但我想让它快一点;我想维护一个工作进程池(多个使用者),每当新消息到达时,检查池中是否有“免费”进程可以开始处理
我正在寻找以下基于池的版本

while not False:
    _, new_user = conn.blpop('queue:users')
    if not new_user:
        continue
    try:
        process_new_user(new_user, conn)
    except Exception as e:
        print e
    else:
        pass

但是我无法将其转换为使用pythons multiprocessing.pool类的代码。文档也没有帮助

最佳答案

from multiprocessing import Pool


pool = Pool()

while 1:
    _, new_user = conn.blpop('queue:users')

    if not new_user:
        continue

    pool.apply_async(process_new_user, args=(new_user, conn))

如果要处理异常,需要收集AsyncResult返回的pool.apply_async对象并检查其状态。
如果可以使用python 3,concurrent.futures池允许在异步回调中处理结果,从而更容易检查作业的退出状态。

关于python - 使用工作池消耗Redis消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33397360/

10-15 07:12