最近刚好有朋友在问Node.js多线程的问题,我总结了一下,可以考虑使用源码包里面的worker_threads或者第三方的模块来实现。
首先明确一下多线程在Node.js中的概念,然后在聊聊worker_threads的用法。天生异步,真心强大。

  1. Node.js多线程概述
    有人可能会说,Node.js虽然是单线程的,但是可以利用循环事件(Event Loop)l来实现并发执行任务。追究其本质,NodeJs实际上使用了两种不同的线程,一个是用于处理循环事件的主线程一个是工作池(Worker pool)里面的一些辅助线程。关于这两种线程主要功能和关系如图1所示。
    锋利的NodeJS之NodeJS多线程-LMLPHP

图1 Node.js线程图
所以从本质上来讲,NodeJs并不是真正的原生多线程,而是利用循环事件来实现高效并发执行任务。要做到真正的多线程,需要依赖其他模块或者第三方库。
2. Worker_threads是Node.js官方推荐的实现真正多线程的模块,有官方技术团队进行长期维护。Worker_threads不需要单独安装,它位于Node.js源码中,具体位置是lib/worker_threads.js。worker_threads模块允许使用并行执行JavaScript的线程,使用也非常方便,只需要引入该模块即可,代码如下。
const worker = require('worker_threads');
与child_process或cluster不同,worker_threads可以共享内存。它们通过传输ArrayBuffer实例或共享SharedArrayBuffer实例来实现。
官网上给了一个完整的例子,如下所示。

const {
    Worker, isMainThread, parentPort, workerData
} = require('worker_threads');

if (isMainThread) {
    module.exports = function parseJSAsync(script) {
        return new Promise((resolve, reject) => {
            const worker = new Worker(__filename, {
                workerData: script
            });
            worker.on('message', message => console.log(message));
            worker.on('error', reject);
            worker.on('exit', (code) => {
                if (code !== 0)
                    reject(new Error(`Worker stopped with exit code ${code}`));
            });
        });
    };

} else {
    const { parse } = require('som-parse-libary');
    const script = workerData;
    parentPort.postMessage(parse(script));
}

笔者对以上代码开始解析,重点概念如下所示:
Worker该类代表一个独立的js执行线程。
isMainThead一个布尔值,当前代码是否运行在Worker线程中。
parentPortMessagePort对象,如果当前线程是个生成的Worker线程,则允许和父线程通信。
workerData一个可以传递给线程构造函数的任何js数据的的复制数据。
Worker_theads还提供了很多实用的API,整理如下所示。
1.worker.getEnvironmentData(key)
可以获取环境变量,先使用setEnvironmentData来设置环境变量,然后再使用g
etEnvironmentData来获取。
举一个简单的例子,代码如下所示。

const {
  Worker,
  isMainThread,
  setEnvironmentData,
  getEnvironmentData,
} = require('worker_threads');

if (isMainThread) {
  setEnvironmentData('Hi', 'Node.js!');
  const worker = new Worker(__filename);
} else {
  console.log(getEnvironmentData('Hi'));.
}
执行这段代码,可以在控制台打印出“Node.js”字符串。
  1. isMainThread
    isMainThread可以用来判断该进程是不是主线程,如果是主线程,则返回true,否则返回false。下面编写一个嵌套worker的代码,用于展示。
const { Worker, isMainThread } = require('worker_threads');

if (isMainThread) {
    console.log("This is  a main thread\r\n");
    // This re-loads the current file inside a Worker instance.
    new Worker(__filename);
} else {
    console.log('Inside Worker!');
    console.log(isMainThread);  // Prints 'false'.
}
  1. MessageChannel和相关用法
    MessageChannel是worker_threads提供的一个双向异步的消息通信信道。下面这段代码就展示了两个MessagePort对象互相传递消息的过程,我们如果想主动结束某个Channel,那么可以使用close事件来完成。
const {MessageChannel}  = require('worker_threads');

const {port1, port2} = new MessageChannel();

// port1给port2发送信息
port1.postMessage({carName: 'BYD'});

port2.on('message', (message) => {
    console.log("I receive message is ", message);
})

// port2给port1发送信息
port2.postMessage({personality: "Brave"});
port1.on('message', (message) => {
    console.log("I receive message is ", message);
});

运行上面的代码,可以在控制台看到如下输出:

I receive message is  { personality: 'Brave' }
I receive message is  { carName: 'BYD' }

port.on(‘message’)方法是利用被动等待的方式接收事件,如果想手动接收信息可以使用receiveMessageOnPort方法,指定从某个port接收消息,如下所示。

const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const {port1, port2} = new MessageChannel();
port1.postMessage({Name: "freePHP"});

let result = receiveMessageOnPort(port2);
console.log(result);
let result2 = receiveMessageOnPort(port2);
console.log(result2);

运行上面的代码,可以得到如下输出。

{ message: { Name: 'freePHP' } }
undefined

从结果可以看出,receiveMessageOnPort可以指定从另一个MessagePort对象获取消息,是一次消耗消息。
实际工作中,我们不可能只使用单个线程来完成任务,所以需要创建线程池来维护和管理worker thread对象。为了简化线程池的实现,假设只会传递一个woker脚本作为参数,具体实现如下所示。需要单独安装async_hooks模块,它用于异步加载资源。

const { AsyncResource } = require('async_hooks'); // 用于异步加载资源
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
    constructor(callback) {
        super('WorkerPoolTaskInfo');
        this.callback = callback;
    }

    done(err, result) {
        this.runInAsyncScope(this.callback, null, err, result);
        this.emitDestroy();  // 只会被执行一次
    }
}

class WorkerPool extends EventEmitter {
    constructor(numThreads) {
        super();
        this.numThreads = numThreads;
        this.workers = [];
        this.freeWorkers = [];

        for (let i = 0; i < numThreads; i++)
            this.addNewWorker();
    }

    /**
     * 添加新的线程
     */
    addNewWorker() {
        const worker = new Worker(path.resolve(__dirname, 'task2.js'));
        worker.on('message', (result) => {
            // 如果成功状态,则将回调传给runTask方法,然后worker移除TaskInfo标记。
            worker[kTaskInfo].done(null, result);
            worker[kTaskInfo] = null;
            //
            this.freeWorkers.push(worker);
            this.emit(kWorkerFreedEvent);
        });
        worker.on('error', (err) => {
            // 报错后调用回调
            if (worker[kTaskInfo])
                worker[kTaskInfo].done(err, null);
            else
                this.emit('error', err);
            // 移除一个worker,然后启动一个新的worker来代替当前的worker
            this.workers.splice(this.workers.indexOf(worker), 1);
            this.addNewWorker();
        });
        this.workers.push(worker);
        this.freeWorkers.push(worker);
        this.emit(kWorkerFreedEvent);
    }

    /**
     * 执行任务
     * @param task
     * @param callback
     */
    runTask(task, callback) {
        if (this.freeWorkers.length === 0) {
            this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
            return;
        }

        const worker = this.freeWorkers.pop();
        worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
        worker.postMessage(task);
    }

    /**
     * 关闭线程
     */
    close() {
        for (const worker of this.workers) {
            worker.terminate();
        }
    }
}

module.exports = WorkerPool;

其中task2.js是定义好的一个计算两个数字相加的脚本,内容如下。

const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

调用这个线程池非常简单,用例如下所示。

const WorkerPool = require('./worker_pool.js');
const os = require('os');

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}
04-10 16:21