本文介绍了线程池如何工作,以及如何在像NodeJS这样的async/await env中实现它?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要运行具有10_000参数的函数int f(int i),由于I/O时间的原因,该过程大约需要1秒钟才能执行.
在Python之类的语言中,我可以使用线程(或async/await,我知道,但稍后再讨论)来并行化此任务.
如果我希望始终有10个正在运行的线程,并在它们之间分配任务,则可以使用:

I need to run a function int f(int i) with 10_000 parameters and it takes around 1sec to execute due to I/O time.
In a language like Python, I can use threads (or async/await, I know, but I'll talk about it later) to parallelize this task.
If I want to always have 10 running threads, and to split the task between them, I can use ThreadingPool :

def f(p):
    x = [...]
    return x

p = ThreadPool()
xs = p.map(f, range(10_000))

但是它如何工作?如果要使用类似的方法实现,例如NodeJS和f = http("www.google.com", callback),我应该从哪里开始? 解决此类问题的算法是什么?
同样,我想同时获得10个请求,当一个请求完成时,下一个请求应该开始.

But how does it work ? If I want to implement a similar thing with, let's say NodeJS and f = http("www.google.com", callback), where should I begin ? What's the algorithms for this kind of problem ?
Again, I'd like to get 10 requests at the same time, and when one is finished the next one should start.

queue = ["www.google.com", "www.facebook.com"]
var f = function(url) {
  http.get(url, (e) => {
    const newUrl = queue.pop();
    f(newUrl);
  });
};

for (var i = 0; i < 10; i++) {
  f(queue.pop());
}

推荐答案

不确定是不是如何实现ThreadPool和其他库,但这是一个提示:使用队列来计算正在运行的任务/线程数.
我没有尝试这段代码,但是它可以给您一个想法:我们创建一个线程,每隔0.2秒检查一次是否应该启动另一个线程.
但是,这意味着要进行很多上下文切换,并且可能效率不高.

Not sure it is how ThreadPool and other libraries are implemented but here is a hint : use Queues to count how many tasks/threads are running.
I didn't try this code but it can give you an idea: we create a Thread checking every 0.2 second if we should start another Thread.
This implies a lot of context switching however and might not be efficient.

class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []

    def start(self):
        Thread(target=check, args=(0.2)).start()

    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)

            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)

            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()

这篇关于线程池如何工作,以及如何在像NodeJS这样的async/await env中实现它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-14 19:44