由于某种原因,以下代码似乎无法在返回并完成所有结果后终止/加入进程Pool.可能与超时发生时终止工作进程有关,但是池被杀死时会创建新的工作进程,并且结果符合预期.from multiprocessing import Poolimport timeimport numpy as npfrom threading import Timerimport thread, time, sysdef f(x): time.sleep(x) return xif __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] while results: try: x, result = results.pop(0) start = time.time() print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for p in pool._pool: if p.exitcode is None: p.terminate() pool.terminate() pool.join()解决方案我没有完全理解您的问题.您说要停止一个特定的进程,但是在异常处理阶段,您要对所有作业调用终止.不知道为什么要这么做.另外,我很确定使用multiprocessing.Pool中的内部变量不是很安全.说完所有这些,我想您的问题是,为什么在发生超时时该程序无法完成.如果这是问题所在,则可以采用以下方法:from multiprocessing import Poolimport timeimport numpy as npfrom threading import Timerimport thread, time, sysdef f(x): time.sleep(x) return xif __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] result = None start = time.time() while results: try: x, result = results.pop(0) print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for i in reversed(range(len(pool._pool))): p = pool._pool[i] if p.exitcode is None: p.terminate() del pool._pool[i] pool.terminate() pool.join()重点是您需要从池中删除项目;仅对它们调用终止是不够的. I need to execute a pool of many parallel database connections and queries. I would like to use a multiprocessing.Pool or concurrent.futures ProcessPoolExecutor. Python 2.7.5In some cases, query requests take too long or will never finish (hung/zombie process). I would like to kill the specific process from the multiprocessing.Pool or concurrent.futures ProcessPoolExecutor that has timed out.Here is an example of how to kill/re-spawn the entire process pool, but ideally I would minimize that CPU thrashing since I only want to kill a specific long running process that has not returned data after timeout seconds.For some reason the code below does not seem to be able to terminate/join the process Pool after all results are returned and completed. It may have to do with killing worker processes when a timeout occurs, however the Pool creates new workers when they are killed and results are as expected.from multiprocessing import Poolimport timeimport numpy as npfrom threading import Timerimport thread, time, sysdef f(x): time.sleep(x) return xif __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] while results: try: x, result = results.pop(0) start = time.time() print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for p in pool._pool: if p.exitcode is None: p.terminate() pool.terminate() pool.join() 解决方案 I am not fully understanding your question. You say you want to stop one specific process, but then, in your exception handling phase, you are calling terminate on all jobs. Not sure why you are doing that. Also, I am pretty sure using internal variables from multiprocessing.Pool is not quite safe. Having said all of that, I think your question is why this program does not finish when a time out happens. If that is the problem, then the following does the trick: from multiprocessing import Poolimport timeimport numpy as npfrom threading import Timerimport thread, time, sysdef f(x): time.sleep(x) return xif __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] result = None start = time.time() while results: try: x, result = results.pop(0) print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for i in reversed(range(len(pool._pool))): p = pool._pool[i] if p.exitcode is None: p.terminate() del pool._pool[i] pool.terminate() pool.join()The point is you need to remove items from the pool; just calling terminate on them is not enough. 这篇关于python multiprocessing.Pool杀死*特定*长时间运行或挂起的进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-04 18:26