消息队列 Kafka Nodejs 的使用 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
kaolalicai
V2EX    Node.js

消息队列 Kafka Nodejs 的使用

  •  
  •   kaolalicai 2019-08-01 16:18:45 +08:00 7521 次点击
    这是一个创建于 2337 天前的主题,其中的信息可能已经有所发展或是发生改变。

    一. 消息队列

    (一) 使用场景:

    这边就先不介绍消息队列的优劣,主要列了一下它的三种核心的场景。

    1 . 解耦

    解耦.jpg

    2 . 异步

    异步.jpg

    3 . 削峰

    削峰.jpg

    (二) 消费方式:

    1 . 点对点:Work Queue

    点对点 1.jpg

    点对点 2.jpg

    2 . 发布-订阅:Publish/Subscribe

    发布订阅.jpg

    目前我们项目应用到的场景:

    目前我们使用 RabbitMq,主要使用点对点的消费模式。

    削峰,异步:

    削峰异步.jpg

    我们这些场景如果用 Kafka 该如何实现?

    二. Kafka

    (一) 简介

    官网的描述是这几句:

    Apache Kafka is a distributed streaming platform**. What exactly does that mean?**
    A streaming platform has three key capabilities:

    • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
    • Store streams of records in a fault-tolerant durable way.
    • Process streams of records as they occur.

    Kafka 是一个流处理平台

    一个流处理平台有三个关键的特点:

    1. 发布&订阅流式数据,类似于消息队列或企业消息传递系统;
    2. 在高容错方式下保存流式数据;
    3. 当数据流产生时实时进行处理。

    Kafka is generally used for two broad classes of applications:

    • Building real-time streaming data pipelines that reliably get data between systems or applications
    • Building real-time streaming applications that transform or react to the streams of data

    Kafka 主要应用在两个类应用中:

    1. 构建可在系统或应用程序之前构建可靠获取数据的实时数据流管道;
    2. 构建一个转换或响应数据流的实时数据流应用程序。

    (二) kafka 架构图

    架构图.jpg

    (三)名词

    Producer: 生产者,发送信息的服务端
    Consumer:消费者,订阅消息的客户端
    Broker:消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群
    Topic: 主题,可以理解成队列
    ConsumerGroup:消费者组,一个 ConsumerGoup 里面包括多个 Consumer,每个 ConsumerGoup 里面只有一个 Consumer 可以消费一个 Topic。基于这个特性,每个 ConsumerGoup 里面只存一个 Consumer 可以实现广播;所有 Consumer 都存在于同一个 ConsumerGoup 内则可以实现单播。
    Partition:基于 Kafka 的拓展性,有可能一个很大的 Topic 会存在于不同的 Broker 里面。这时一个 Topic 里面就会存在多个 Partition,Partition 是一个有序的队列,Partition 上每个消息会有一个顺序的 id Offset。但是,值得注意的是,Kafka 会保证 Partition 的顺序性,而没有保证 Topic 的顺序性。
    Offset:Kafka 的存储文件都是 offset 顺序存储的,以 offset.kafka 来命名。例如第一个就是 0000.kafka, 第 n 个文件就是 n-1.kafka。
    Zookeerper:管理多个 Kafka 节点,具有管理集群配置的功能。

    三. Kafka Nodejs 实现

    (一)消费方式:点对点

    1.单个消费者的实现,应用场景是只有一个消费者节点 需要消费该消息。
    图例: Producer:

    // Producer.ts import * as kafka from 'kafka-node' const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}) const producer = new kafka.HighLevelProducer(client) producer.on('ready', function () { console.log('Kafka Producer is connected and ready.') }) // For this demo we just log producer errors to the console. producer.on('error', function (error) { console.error(error) }) const sendRecord = (objData, cb) => { const buffer = Buffer.from(JSON.stringify(objData)) // Create a new payload const record = [ { topic: 'webevents.dev', messages: buffer, attributes: 1 /* Use GZip compression for the payload */ } ] // Send record to Kafka and log result/error producer.send(record, cb) } let times = 0 setInterval(() => { sendRecord({ msg: `this is message ${++times}!` }, (err, data) => { if (err) { console.log(`err: ${err}`) } console.log(`data: ${JSON.stringify(data)}`) }) }, 1000) 

    Consumer 代码:

    // Consumer.ts import * as kafka from 'kafka-node' const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}) const topics = [ { topic: 'webevents.dev' } ] const optiOns= { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 // encoding: 'buffer' } // { autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 }; const cOnsumer= new kafka.Consumer(client, topics, options) consumer.on('message', function (message) { // Read string into a buffer. console.info(`[message]:==:>${JSON.stringify(message)}`) const buf = new Buffer(String(message.value), 'binary') const decodedMessage = JSON.parse(buf.toString()) console.log('decodedMessage: ', decodedMessage) }) consumer.on('error', function (err) { console.log('error', err) }) process.on('SIGINT', function () { consumer.close(true, function () { process.exit() }) }) 

    2.当我的服务是多节点,如何保证同一个消息只被其中一个节点消费呢。 这个时候就需要把每个节点当做同一个 ConsumerGroup 里的不同 Consumer。 图例: Producer 同上 Consumer:

    // Consumer1 import * as kafka from 'kafka-node' const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}) const offset = new kafka.Offset(client) import * as bluebird from 'bluebird' const cOnsumerGoupOptions= { kafkaHost: 'localhost:9092', groupId: 'ExampleTestGroup', sessionTimeout: 15000, protocol: ['roundrobin'], fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' } as any const cOnsumer= new kafka.ConsumerGroup(Object.assign({id: 'onsumer1'}, consumerGoupOptions), ['test']) export default consumer // 处理消息 consumer.on('message', async function (message) { console.info('i am consumer1!') // Read string into a buffer. console.info(`[message]:==:>${JSON.stringify(message)}`) // const buf = new Buffer(String(message.value), 'binary') const decodedMessage = message // JSON.parse(buf.toString()) await bluebird.delay(1000) console.log('decodedMessage: ', decodedMessage) }) // 消息处理错误 consumer.on('error', function (err) { console.log('error', err) }) consumer.on('offsetOutOfRange', function (topic) { console.info(`[offsetOutOfRange]:==:>${topic}`) topic.maxNum = 2 offset.fetch([topic], function (err, offsets) { if (err) { return console.error(err) } let min = Math.min.apply(null, offsets[topic.topic][topic.partition]) consumer.setOffset(topic.topic, topic.partition, min) }) }) process.on('SIGINT', function () { consumer.close(true, function () { console.log('consumer colse!') process.exit() }) }) 
    // Consumer2 import * as kafka from 'kafka-node' const client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'}) const offset = new kafka.Offset(client) import * as bluebird from 'bluebird' const cOnsumerGoupOptions= { kafkaHost: 'localhost:9092', groupId: 'ExampleTestGroup', sessionTimeout: 15000, protocol: ['roundrobin'], fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest' } as any const cOnsumer= new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test']) export default consumer // 处理消息 consumer.on('message', async function (message) { console.info('i am consumer2!') // Read string into a buffer. console.info(`[message]:==:>${JSON.stringify(message)}`) // const buf = new Buffer(String(message.value), 'binary') const decodedMessage = message // JSON.parse(buf.toString()) await bluebird.delay(1000) console.log('decodedMessage: ', decodedMessage) }) // 消息处理错误 consumer.on('error', function (err) { console.log('error', err) }) consumer.on('offsetOutOfRange', function (topic) { console.info(`[offsetOutOfRange]:==:>${topic}`) topic.maxNum = 2 offset.fetch([topic], function (err, offsets) { if (err) { return console.error(err) } let min = Math.min.apply(null, offsets[topic.topic][topic.partition]) consumer.setOffset(topic.topic, topic.partition, min) }) }) process.on('SIGINT', function () { consumer.close(true, function () { console.log('consumer colse!') process.exit() }) }) 

    执行之后,发现了一个问题:同一个 ConsumerGroup 的不同 Consumer 没有均匀消费数据, 会出现一段时间,只有一个 Consumer 消费, 而另一个 Conumser 不消费的情况。

    为什么呢?
    这里就需要知道消费端的均衡算法
    [Kafka 消费端均衡算法]: https://blog.csdn.net/wobuaizhi/article/details/80950387

    算法如下:
    1.A=(partition 数量 /同分组消费者总个数) 2.M=对上面所得到的 A 值小数点第一位向上取整 3.计算出该消费者拉取数据的 patition 合集:Ci = [P(M*i ),P((i + 1) * M -1)] Partition 数量为 1, 因为只有一个 broker

    同分组消费者总个数:2

    A = 1 / 2

    M = roundUp (A) = 1

    C0 = [P(0), P ( 0]`

    C1 = [P(1), P(1)]

    所以,如果不是 C0 消费者不可用,C1 一直都不会去消费 Partition0 里面的消息
    结论是,如果非多 Kafka 节点的话, 单纯增加同一消费组里的消费者, 并不能做到均衡消费数据的情况。
    有其他方法可以实现吗?
    有的, 我们可以从 Producer 里面入手,分发消息时固定 Topic 对应 固定的消费者节点。
    Producer:

    // Producer // ... const sendRecord = (objData, cb) => { const partition = Date.now() % 2 === 0 ? 0 : 1 const buffer = Buffer.from(JSON.stringify(objData) + '_' + partition) // Create a new payload const record = [ { topic: `test${partition}`, // 这里用了随机方法分配 topic messages: buffer, attributes: 1, /* Use GZip compression for the payload */ key: `key_${partition}` } ] // Send record to Kafka and log result/error console.info(`[record]:==:>${JSON.stringify(record)}`) producer.send(record, cb) } // ... 

    Consumer:

    // Consumer1 // ... const cOnsumer= new kafka.ConsumerGroup(Object.assign({id: 'consumer1'}, consumerGoupOptions), ['test0', 'test1']) // 这里需要优先输入 需要消费的 topic, 次要消费的 topic 也要写上,以防另一节点重启时, 消息没及时消费 // ... 
    // Consumer2 // ... const cOnsumer= new kafka.ConsumerGroup(Object.assign({id: 'consumer2'}, consumerGoupOptions), ['test1', 'test2']) // 这里需要优先输入 需要消费的 topic, 次要消费的 topic 也要写上,以防另一节点重启时, 消息没及时消费 // ... 

    四. 总结:

    Kafka

    设计上:队列消息不删除,不同 ConsumerGroup 都可以 publish-subscribe,同一 ConsumerGroup 里面只有一个 Consumer 能消费同一个 Topic
    延迟消费:不支持:Consumer 开启后, 会自动获取 Producer 生产对应 Topic 的消息, 若想 Consumer 暂时不消费消息, 需要中断 Consumer 的服务
    负载均衡:从集群上看, 即使其中一个 Broker 挂了,其他 Broker 上的 partition 都会存在副本集,kafka 仍然可以正常运行。从 ConsumerGroup 上看,即使其中的 Consumer 挂了, 同一 ConsumerGroup 的其他 Consumer 仍然可以消费其 Topic 的消息,而不需要担心服务中断。
    实际上:Kafka 做点对点队列,有点浪费。只用一个 ConsumerGroup,并没有发挥 Kafka 的优势。但是 Kafka 这种很方便就能拓展成发布-订阅模式,消费端建立另外一个 ConsumerGroup,就可以为另一个服务启用。

    End

    参考资料

    代码:
    https://github.com/yuchenzhen/node-kafka-demo

    https://blog.csdn.net/tototuzuoquan/article/details/73441373

    https://zhuanlan.zhihu.com/p/58836260

    https://juejin.im/post/5b59c6055188257bcc16738c

    https://lotabout.me/2018/kafka-introduction/

    2 条回复    2019-08-01 18:58:57 +08:00
    julyclyde
        1
    julyclyde  
       2019-08-01 18:27:08 +08:00
    so 这么长到底写了什么?
    好像没有什么信息量啊
    PDX
        2
    PDX  
       2019-08-01 18:58:57 +08:00
    闹眼睛
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2237 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 30ms UTC 16:10 PVG 00:10 LAX 08:10 JFK 11:10
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86