
Node 版本 10.16.0 ZeroMQ 版本 5.1.0 使用 ZeroMQ 的代码如下:
服务端 /zmq-rep.js
"use strict"; const fs = require("fs"); const zmq = require("zeromq"); const moment = require("moment"); const respOnder= zmq.socket("rep"); responder.on("message", data => { let reqData = JSON.parse(data); let filename = reqData.path; console.log(`Request received:${filename}`); fs.readFile(filename, (err, data) => { if (err) { throw Error(err); } console.log(`Sending response for ${filename} at ${moment().format()}`); responder.send( JSON.stringify({ content: data.toString(), timestamp: moment().format("x"), pid: process.pid }) ); }); }); responder.bind("tcp://127.0.0.1:50221", err => { if (err) { throw Error(err); } console.log("Now listening for requests..."); }); process.on("SIGINT", () => { console.log("Now closing server..."); responder.close(); }); 客户端 /zmq-req.js
"use strict"; const zmq = require("zeromq"); const moment = require("moment"); const filename = process.argv[2]; const request = zmq.socket("req"); request.on("message", data => { let respOnse= JSON.parse(data); console.log( `Fetch file content:${response.content} at ${ response.timestamp } processed by ${response.pid}` ); }); request.connect("tcp://127.0.0.1:50221"); for (let i = 1; i <= 5; i++) { console.log( `Sending request to get ${filename} for ${i} time(s) at ${moment().format()}` ); request.send( JSON.stringify({ path: filename }) ); } 使用如下方式调用:
# Terminal 1 $ node zmq-rep.js # Terminal 2 $ node zmq-req.js target.txt 服务端输出如下:
Now listening for requests... Request received:target.txt Sending response for target.txt at 2019-07-27T07:36:47+08:00 Request received:target.txt Sending response for target.txt at 2019-07-27T07:36:47+08:00 Request received:target.txt Sending response for target.txt at 2019-07-27T07:36:47+08:00 Request received:target.txt Sending response for target.txt at 2019-07-27T07:36:47+08:00 Request received:target.txt Sending response for target.txt at 2019-07-27T07:36:47+08:00 客户端输出如下
Sending request to get target.txt for 1 time(s) at 2019-07-27T07:36:47+08:00 Sending request to get target.txt for 2 time(s) at 2019-07-27T07:36:47+08:00 Sending request to get target.txt for 3 time(s) at 2019-07-27T07:36:47+08:00 Sending request to get target.txt for 4 time(s) at 2019-07-27T07:36:47+08:00 Sending request to get target.txt for 5 time(s) at 2019-07-27T07:36:47+08:00 Fetch file content:50221 at 1564184207708 processed by 12984 Fetch file content:50221 at 1564184207734 processed by 12984 Fetch file content:50221 at 1564184207747 processed by 12984 Fetch file content:50221 at 1564184207766 processed by 12984 Fetch file content:50221 at 1564184207794 processed by 12984 可以看到,客户端代码发送请求是非阻塞的,但服务端代码是阻塞的。《 Node.js 8 the right way 》对这个现象的解释是:"Node.js event loop was left spinning while the fs.readFile for each request was being processed."但查阅文档,fs.readFile 本身是非阻塞的,如果这一解释成立的话,fs.readFile 不就是阻塞函数了吗?
我另外写了一份只使用 net 模块进行通讯的代码,现象与使用 ZeroMQ 进行通信不同,在这种情况下,服务端的回调并没有阻塞。
服务端 /tcp-server.js
"use strict"; const fs = require("fs"); const net = require("net"); const moment = require("moment"); const server = net .createServer(cOnnection=> { console.log("Request received"); connection.on("data", data => { let reqData = JSON.parse(data); console.log(`Reading ${reqData.path} at ${moment().format()}`); fs.readFile(reqData.path, (err, data) => { connection.write( JSON.stringify({ content: data.toString(), timestamp: moment().format() }) ); }); }); }) .listen({ port: 50221, hostname: "127.0.0.1" }) .on("listening", () => { console.log("Now listening..."); }); 客户端 /tcp-client.js
"use strict"; const net = require("net"); const filename = process.argv[2]; const cOnnectionPool= []; for (let i = 1; i <= 5; i++) { connectionPool.push( net .createConnection(50221, "127.0.0.1", () => { console.log( `Connetcion ${i} established and start fetching file ${filename} on server...` ); }) .on("data", data => { console.log(`Receive response:${data}`); }) ); } connectionPool.forEach(cOnnection=> { connection.write( JSON.stringify({ path: filename }) ); }); process.on("SIGINT", () => { connectionPool.forEach(cOnnection=> { connection.end(); }); }); 调用如下:
# Terminal 1 $ node tcp-server.js # Terminal 2 $ node tcp-client.js target.txt 客户端输出如下:
Connetcion 1 established and start fetching file target.txt on server... Connetcion 2 established and start fetching file target.txt on server... Connetcion 3 established and start fetching file target.txt on server... Connetcion 4 established and start fetching file target.txt on server... Connetcion 5 established and start fetching file target.txt on server... Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"} Receive response:{"content":"50221",timestamp":"2019-07-27T07:44:53+08:00"} Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"} Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"} Receive response:{"content":"50221","timestamp":"2019-07-27T07:44:53+08:00"} 服务端输出如下:
Now listening... Request received Request received Request received Request received Request received Reading target.txt at 2019-07-27T07:44:53+08:00 Reading target.txt at 2019-07-27T07:44:53+08:00 Reading target.txt at 2019-07-27T07:44:53+08:00 Reading target.txt at 2019-07-27T07:44:53+08:00 Reading target.txt at 2019-07-27T07:44:53+08:00 于是可以看到,不论在服务端还是客户端的 io 都是非阻塞的。
这种不同是否与 ZeroMQ 的实现有关呢?感谢大佬们的回复!
1 v2byy 2019-07-27 08:14:43 +08:00 via iPhone zeromq 的 req/rep 模式,server 只能收到 req,然后 rep,再收 req,再 rep。如果要异步,建议用 dealer/router 模式 |
2 fourstring OP @v2byy #1 非常感谢,这和 ZeroMQ response 模式服务端回调的执行方式有关吗?按我的理解在 node 中执行回调不应该都是异步非阻塞的吗?或者说 ZeroMQ 在一个 request 的回调完成之前不会读下一个 request ? |
3 v2byy 2019-07-27 08:59:31 +08:00 @fourstring zeromq 有好几种 message model,你说的 request/reply 模式是同步的,就是 server 必须进行一次 reply 之后才能再次接收 client 请求。看下 zeromq 的文档吧。 http://zguide.zeromq.org/page:all#The-Request-Reply-Mechanisms |
4 Alexhohom 2019-07-27 09:03:53 +08:00 最近也在看 zmq,req-rep 会阻塞。想不阻塞可以考虑 Push-Pull 模式吧,可能要开两组端口。 |
5 rawidn 2019-07-27 09:07:10 +08:00 via Android 可以考虑使用 grpc 之类的呢 |