团队最近有一个 Node.js 全新的模块需要开发,涉及多进程的管理和通讯,简化模型可以理解为需要频繁从 master 进程调用 worker 进程的某些方法,简单设计实现了一个 event-invoke 的库,可以简单优雅进行调用。
Node.js 提供了 child_process 模块,在 master 进程通过 fork / spawn 等方法调用可以创建 worker 进程并获取其对象(简称 cp )。父子进程会建立 IPC 通道,在 master 进程中可以使用 cp.send() 给 worker 进程发送 IPC 消息,而在 worker 进程中也可以通过 process.send() 给父进程发送 IPC 消息,达到双工通信的目的。(进程管理涉及更复杂的工作,本文暂不涉及)
基于以上前提,借助 IPC 通道和进程对象,我们可以通过事件驱动的方式实现进程间的通信,只需要简单的几行代码,就能实现基本调用逻辑,例如:
// master.js const child_process = require('child_process'); const cp = child_process.fork('./worker.js'); function invoke() { cp.send({ name: 'methodA', args: [] }); cp.on('message', (packet) => { console.log('result: %j', packet.payload); }); } invoke(); // worker.js const methodMap = { methodA() {} } cp.on('message', async (packet) => { const { name, args } = packet; const result = await methodMap[name)(...args); process.send({ name, payload: result }); });
仔细分析上述代码实现,直观感受 invoke 调用并不优雅,并且当调用量较大时,会创建很多的 message 监听器,并且要保证请求和响应是一一对应,需要做很多额外的设计。希望设计一个简单理想的方式,只需提供 invoke 方法,传入方法名和参数,返回一个 Promise ,像调用本地方法那样进行 IPC 调用,而不用考虑消息通信的细节。
// 假想中的 IPC 调用 const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2);
从调用的模型看,可以将角色抽象为 Invoker 和 Callee ,分别对应服务调用方和提供方,将消息通讯的细节可以封装在内部。parent_process 和 child_process 的通信桥梁是操作系统提供的 IPC 通道,单纯从 API 的视角看,可以简化为两个 Event 对象(主进程为 cp ,子进程为 process )。Event 对象作为中间的双工通道两端,暂且命名为 InvokerChannel 和 CalleeChannel 。
关键实体和流程如下:
实际上,这个设计不仅适用 IPC 调用,在浏览器的场景下也能直接得到很好的应用,比如说跨 iframe 的调用可以包装 window.postMessage(),跨标签页调用可以使用 storage 事件,以及 Web worker 中可借助 worker.postMessage() 作为通信的桥梁。
基于以上设计,实现编码必然不在话下,趁着非工作时间迅速完成开发和文档的工作,源代码:https://github.com/x-cold/event-invoke
npm i -S event-invoke
示例代码:Example code
// parent.js const cp = require('child_process'); const { Invoker } = require('event-invoke'); const invokerChannel = cp.fork('./child.js'); const invoker = new Invoker(invokerChannel); async function main() { const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2); invoker.destroy(); } main();
// child.js const { Callee } = require('event-invoke'); const calleeChannel = process; const callee = new Callee(calleeChannel); // async method callee.register(async function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); }); }); // sync method callee.register(function max(...args) { return Math.max(...args); }); callee.listen();
示例代码:Example code
// pm2.config.cjs module.exports = { apps: [ { script: 'invoker.js', name: 'invoker', exec_mode: 'fork', }, { script: 'callee.js', name: 'callee', exec_mode: 'fork', } ], };
// callee.js import net from 'net'; import pm2 from 'pm2'; import { Callee, BaseCalleeChannel } from 'event-invoke'; const messageType = 'event-invoke'; const messageTopic = 'some topic'; class CalleeChannel extends BaseCalleeChannel { constructor() { super(); this._OnProcessMessage= this.onProcessMessage.bind(this); process.on('message', this._onProcessMessage); } onProcessMessage(packet) { if (packet.type !== messageType) { return; } this.emit('message', packet.data); } send(data) { pm2.list((err, processes) => { if (err) { throw err; } const list = processes.filter(p => p.name === 'invoker'); const pmId = list[0].pm2_env.pm_id; pm2.sendDataToProcessId({ id: pmId, type: messageType, topic: messageTopic, data, }, function (err, res) { if (err) { throw err; } }); }); } destory() { process.off('message', this._onProcessMessage); } } const channel = new CalleeChannel(); const callee = new Callee(channel); // async method callee.register(async function sleep(ms) { return new Promie((resolve) => { setTimeout(resolve, ms); }); }); // sync method callee.register(function max(...args) { return Math.max(...args); }); callee.listen(); // keep your process alive net.createServer().listen();
// invoker.js import pm2 from 'pm2'; import { Invoker, BaseInvokerChannel } from 'event-invoke'; const messageType = 'event-invoke'; const messageTopic = 'some topic'; class InvokerChannel extends BaseInvokerChannel { constructor() { super(); this._OnProcessMessage= this.onProcessMessage.bind(this); process.on('message', this._onProcessMessage); } onProcessMessage(packet) { if (packet.type !== messageType) { return; } this.emit('message', packet.data); } send(data) { pm2.list((err, processes) => { if (err) { throw err; } const list = processes.filter(p => p.name === 'callee'); const pmId = list[0].pm2_env.pm_id; pm2.sendDataToProcessId({ id: pmId, type: messageType, topic: messageTopic, data, }, function (err, res) { if (err) { throw err; } }); }); } connect() { this.cOnnected= true; } disconnect() { this.cOnnected= false; } destory() { process.off('message', this._onProcessMessage); } } const channel = new InvokerChannel(); channel.connect(); const invoker = new Invoker(channel); setInterval(async () => { const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2); }, 5 * 1000);
目前 event-invoke 具备了优雅调用“IPC”调用的基本能力,代码覆盖率 100%,同时提供了相对完善的类型描述。感兴趣的同学可以直接使用,有任何问题可以直接提 Issue。
另外一些后续仍要持续完善的部分:
![]() | 1 codehz 2022-02-28 12:21:57 +08:00 via Android ![]() 我觉得这个接口设计不够友好,js 完全可以用 Proxy 的方法让远程调用表现的好像本地调用一样,还可以支持迭代器和异步迭代器 指 https://github.com/Jack-Works/async-call-rpc |
![]() | 2 kinglisky 2022-02-28 12:56:19 +08:00 |
![]() | 3 EPr2hh6LADQWqRVH 2022-02-28 13:23:37 +08:00 之前我也给 Electron 写了一个,现在的话意义不大了,MessageChannel 的形式标准化程度更高抽象得也更好,浏览器也能用,完爆自己造轮子 |
![]() | 4 zhuangzhuang1988 2022-02-28 13:32:03 +08:00 |
7 watcher 2022-02-28 15:25:35 +08:00 更优雅轻量地用 Nodejs 进行... |
![]() | 8 dany813 2022-02-28 15:27:21 +08:00 学习了 |
![]() | 10 yazoox 2022-03-01 13:57:55 +08:00 不错,学习一下。 楼主多写一些在不同场景下的应用示例。 |
![]() | 11 himself65 2022-03-06 23:48:15 +08:00 via iPhone 支持 1L ,我们项目也在用那个库 |