更优雅轻量地用 JS 进行 “IPC” 调用,我写了 event-invoke 库 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
xcold
V2EX    Node.js

更优雅轻量地用 JS 进行 “IPC” 调用,我写了 event-invoke 库

  •  
  •   xcold 2022-02-28 12:07:29 +08:00 9680 次点击
    这是一个创建于 1322 天前的主题,其中的信息可能已经有所发展或是发生改变。

    背景

    团队最近有一个 Node.js 全新的模块需要开发,涉及多进程的管理和通讯,简化模型可以理解为需要频繁从 master 进程调用 worker 进程的某些方法,简单设计实现了一个 event-invoke 的库,可以简单优雅进行调用。

    Node.js 提供了 child_process 模块,在 master 进程通过 fork / spawn 等方法调用可以创建 worker 进程并获取其对象(简称 cp )。父子进程会建立 IPC 通道,在 master 进程中可以使用 cp.send() 给 worker 进程发送 IPC 消息,而在 worker 进程中也可以通过 process.send() 给父进程发送 IPC 消息,达到双工通信的目的。(进程管理涉及更复杂的工作,本文暂不涉及)

    最小实现

    基于以上前提,借助 IPC 通道和进程对象,我们可以通过事件驱动的方式实现进程间的通信,只需要简单的几行代码,就能实现基本调用逻辑,例如:

    // master.js const child_process = require('child_process'); const cp = child_process.fork('./worker.js'); function invoke() { cp.send({ name: 'methodA', args: [] }); cp.on('message', (packet) => { console.log('result: %j', packet.payload); }); } invoke(); // worker.js const methodMap = { methodA() {} } cp.on('message', async (packet) => { const { name, args } = packet; const result = await methodMap[name)(...args); process.send({ name, payload: result }); }); 

    仔细分析上述代码实现,直观感受 invoke 调用并不优雅,并且当调用量较大时,会创建很多的 message 监听器,并且要保证请求和响应是一一对应,需要做很多额外的设计。希望设计一个简单理想的方式,只需提供 invoke 方法,传入方法名和参数,返回一个 Promise ,像调用本地方法那样进行 IPC 调用,而不用考虑消息通信的细节。

    // 假想中的 IPC 调用 const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2); 

    流程设计

    从调用的模型看,可以将角色抽象为 Invoker 和 Callee ,分别对应服务调用方和提供方,将消息通讯的细节可以封装在内部。parent_process 和 child_process 的通信桥梁是操作系统提供的 IPC 通道,单纯从 API 的视角看,可以简化为两个 Event 对象(主进程为 cp ,子进程为 process )。Event 对象作为中间的双工通道两端,暂且命名为 InvokerChannel 和 CalleeChannel 。

    关键实体和流程如下: 825cb24fe4bb2cc7f9713606d8594a77.png

    • Callee 中注册可被调用的所有方法,并保存在 functionMap
    • 用户调用 Invoker.invoke() 时:
      • 创建一个 promise 对象,返回给用户,同时将其保存在 promiseMap 中
      • 每次调用生成一个 id ,保证调用和执行结果是一一对应的
      • 进行超时控制,超时的任务直接执行 reject 该 promise
    • Invoker 通过 Channel 把调用方法消息发送给 Callee
    • Callee 解析收到的消息,通过 name 执行对应方法,并将结果和完成状态(成功 or 异常)通过 Channel 发送消息给 Invoker
    • Invoker 解析消息,通过 id+name 找到对应的 promise 对象,成功则 resolve ,失败则 reject

    实际上,这个设计不仅适用 IPC 调用,在浏览器的场景下也能直接得到很好的应用,比如说跨 iframe 的调用可以包装 window.postMessage(),跨标签页调用可以使用 storage 事件,以及 Web worker 中可借助 worker.postMessage() 作为通信的桥梁。

    快速开始

    基于以上设计,实现编码必然不在话下,趁着非工作时间迅速完成开发和文档的工作,源代码:https://github.com/x-cold/event-invoke

    安装依赖

    npm i -S event-invoke 

    父子进程通信实例

    示例代码:Example code

    // parent.js const cp = require('child_process'); const { Invoker } = require('event-invoke'); const invokerChannel = cp.fork('./child.js'); const invoker = new Invoker(invokerChannel); async function main() { const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2); invoker.destroy(); } main(); 
    // child.js const { Callee } = require('event-invoke'); const calleeChannel = process; const callee = new Callee(calleeChannel); // async method callee.register(async function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); }); }); // sync method callee.register(function max(...args) { return Math.max(...args); }); callee.listen(); 

    自定义 Channel 实现 PM2 进程间调用

    示例代码:Example code

    // pm2.config.cjs module.exports = { apps: [ { script: 'invoker.js', name: 'invoker', exec_mode: 'fork', }, { script: 'callee.js', name: 'callee', exec_mode: 'fork', } ], }; 
    // callee.js import net from 'net'; import pm2 from 'pm2'; import { Callee, BaseCalleeChannel } from 'event-invoke'; const messageType = 'event-invoke'; const messageTopic = 'some topic'; class CalleeChannel extends BaseCalleeChannel { constructor() { super(); this._OnProcessMessage= this.onProcessMessage.bind(this); process.on('message', this._onProcessMessage); } onProcessMessage(packet) { if (packet.type !== messageType) { return; } this.emit('message', packet.data); } send(data) { pm2.list((err, processes) => { if (err) { throw err; } const list = processes.filter(p => p.name === 'invoker'); const pmId = list[0].pm2_env.pm_id; pm2.sendDataToProcessId({ id: pmId, type: messageType, topic: messageTopic, data, }, function (err, res) { if (err) { throw err; } }); }); } destory() { process.off('message', this._onProcessMessage); } } const channel = new CalleeChannel(); const callee = new Callee(channel); // async method callee.register(async function sleep(ms) { return new Promie((resolve) => { setTimeout(resolve, ms); }); }); // sync method callee.register(function max(...args) { return Math.max(...args); }); callee.listen(); // keep your process alive net.createServer().listen(); 
    // invoker.js import pm2 from 'pm2'; import { Invoker, BaseInvokerChannel } from 'event-invoke'; const messageType = 'event-invoke'; const messageTopic = 'some topic'; class InvokerChannel extends BaseInvokerChannel { constructor() { super(); this._OnProcessMessage= this.onProcessMessage.bind(this); process.on('message', this._onProcessMessage); } onProcessMessage(packet) { if (packet.type !== messageType) { return; } this.emit('message', packet.data); } send(data) { pm2.list((err, processes) => { if (err) { throw err; } const list = processes.filter(p => p.name === 'callee'); const pmId = list[0].pm2_env.pm_id; pm2.sendDataToProcessId({ id: pmId, type: messageType, topic: messageTopic, data, }, function (err, res) { if (err) { throw err; } }); }); } connect() { this.cOnnected= true; } disconnect() { this.cOnnected= false; } destory() { process.off('message', this._onProcessMessage); } } const channel = new InvokerChannel(); channel.connect(); const invoker = new Invoker(channel); setInterval(async () => { const res1 = await invoker.invoke('sleep', 1000); console.log('sleep 1000ms:', res1); const res2 = await invoker.invoke('max', [1, 2, 3]); // 3 console.log('max(1, 2, 3):', res2); }, 5 * 1000); 

    下一步

    目前 event-invoke 具备了优雅调用“IPC”调用的基本能力,代码覆盖率 100%,同时提供了相对完善的类型描述。感兴趣的同学可以直接使用,有任何问题可以直接提 Issue

    另外一些后续仍要持续完善的部分:

    • 更丰富的示例,覆盖跨 Iframe ,跨标签页,Web worker 等使用场景
    • 提供开箱即用通用 Channel
    • 更友好的异常处理
    11 条回复    2022-03-06 23:48:15 +08:00
    codehz
        1
    codehz  
       2022-02-28 12:21:57 +08:00 via Android   2
    我觉得这个接口设计不够友好,js 完全可以用 Proxy 的方法让远程调用表现的好像本地调用一样,还可以支持迭代器和异步迭代器


    https://github.com/Jack-Works/async-call-rpc
    kinglisky
        2
    kinglisky  
       2022-02-28 12:56:19 +08:00
    好巧呀,这两天我也在搞 PRC 工具库,楼上刚发帖,楼下就看到楼主发的帖子,哈哈哈卷起来~

    留个脚印,一起学习 https://github.com/kinglisky/rpc-shooter
    EPr2hh6LADQWqRVH
        3
    EPr2hh6LADQWqRVH  
       2022-02-28 13:23:37 +08:00
    之前我也给 Electron 写了一个,现在的话意义不大了,MessageChannel 的形式标准化程度更高抽象得也更好,浏览器也能用,完爆自己造轮子
    zhuangzhuang1988
        4
    zhuangzhuang1988  
       2022-02-28 13:32:03 +08:00
    ruoxie
        5
    ruoxie  
       2022-02-28 13:43:41 +08:00
    websocket ,postMessage ,vscode 插件消息通讯我都是这么封装了,从 socket.io 那学来的
    xcold
        6
    xcold  
    OP
       2022-02-28 14:47:06 +08:00
    @codehz 很好的设计思路。
    watcher
        7
    watcher  
       2022-02-28 15:25:35 +08:00
    更优雅轻量地用 Nodejs 进行...
    dany813
        8
    dany813  
       2022-02-28 15:27:21 +08:00
    学习了
    xcold
        9
    xcold  
    OP
       2022-03-01 10:14:55 +08:00
    @watcher 浏览器也是可以支持的,只是写代码比较仓促,还没加上例子
    yazoox
        10
    yazoox  
       2022-03-01 13:57:55 +08:00
    不错,学习一下。
    楼主多写一些在不同场景下的应用示例。
    himself65
        11
    himself65  
       2022-03-06 23:48:15 +08:00 via iPhone
    支持 1L ,我们项目也在用那个库
    关于     帮助文档     自助推广系统     a href="https://blog.v2ex.com/" class="dark" target="_blank">博客     API     FAQ     Solana     1365 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 16:43 PVG 00:43 LAX 09:43 JFK 12:43
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86