[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
BBCCBB
V2EX    Kafka

[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?

  •  
  •   BBCCBB 2020-08-21 09:20:30 +08:00 3675 次点击
    这是一个创建于 1927 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式

    但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kfka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??

    不胜感激

    31 条回复    2020-08-21 20:48:13 +08:00
    useben
        1
    useben  
       2020-08-21 09:43:14 +08:00
    生成唯一 groupId, 存到文件, 启动时读文件, 有就用原来的, 没有再生成写到文件...
    BBCCBB
        2
    BBCCBB  
    OP
       2020-08-21 09:53:11 +08:00
    @useben 这个在固定机器上是可以的, 但我们这里 docker 镜像每次都不知道到哪个机器上了.
    SingeeKing
        3
    SingeeKing  
    PRO
       2020-08-21 09:54:10 +08:00 via iPhone
    环境变量?
    hustmisa
        4
    hustmisa  
       2020-08-21 09:55:02 +08:00
    首先 consumer 的使用业务需求是什么,新启动的 groupId 对启动期间的数据是否可以丢弃?
    如果可以丢弃 kafka 配置 retention 很短就可以了,这样不会堆积;如果不能丢弃,配置 retention 长一些这样重启换 groupId 也能续接数据(大概是你现在的方式),但是就不要频繁换 groupId 了啊。我是这么理解的不知道对不对
    BBCCBB
        5
    BBCCBB  
    OP
       2020-08-21 10:08:29 +08:00
    @SingeeKing 都是同一个服务, 不同实例,这个不方便给每个实例加环境变量, 哈哈

    @hustmisa 可以丢弃, 因为应用上有 ack 超时重试机制, 要命的是重启后老的 groupId 不会自动心跳超时消失, 会在监控上看到消息不断堆积. 其实我想实现的就是 rocketmq 的广播的功能.. 但我们使用 kafka.
    mosesyou
        6
    mosesyou  
       2020-08-21 10:11:18 +08:00
    为什么用唯一或者固定的 groupId 不行
    BBCCBB
        7
    BBCCBB  
    OP
       2020-08-21 10:19:32 +08:00
    @mosesyou 这个业务场景需要同一个服务里的不同实例全都消费到每个 MQ
    zardly666
        8
    zardly666  
       2020-08-21 10:28:17 +08:00
    用 redis 做一个类似选 ID 的东西,服务启动份数等于 ID 份数。

    for (int i = 0; i < 启动份数; i++) {
    if (redisUtil.setnx( + i, “lockthing”,time )) {
    bucketConfig.setConsumeZsetBucketNum(i);
    log.info("此实例的消费者为" + i);
    break;
    }
    }

    服务启动的时候,第一个服务拿到 consumerId+1 ;
    第二个服务拿到 consumerId+2 ;
    这样,就复用几个了吧。
    zardly666
        9
    zardly666  
       2020-08-21 10:30:00 +08:00
    代码没删干净,大概意思就是服务启动动态去拿自己所属的 consumer
    wisej
        10
    wisej  
       2020-08-21 10:30:58 +08:00 via Android
    另一种思路,服务同一个 groupid,分发由服务自己来做(拿到服务其它实例的 ip )

    另外旧 cg 堆积会有什么负面影响么?除了消息会冗余地保存,直到 retention 设置的时间被清除
    sonice
        11
    sonice  
       2020-08-21 10:40:54 +08:00
    想多了,consumerGroup 堆积能有多少,起停一次多一个,也不会有很多啊。这也不会导致 zk 性能降低啊
    amwyyyy
        12
    amwyyyy  
       2020-08-21 11:04:23 +08:00
    原来 consumerGroup 的堆积只是个数字,消息数据只有一份,不管你有几个 consumerGroup 。过期的 consumerGroup 会被清理掉。
    kifile
        13
    kifile  
       2020-08-21 11:05:34 +08:00
    我的理解,题主的意思是因为 ConsumerGroup 的 GroupId 每次重启会重新生成一个新的,导致监控面板上出现了废弃的 groupId 的 Lag 不断增大的现象。

    如果重启时 Consumer 的 offset 没有什么意义,那就在重启新应用前,删除老的 ConsumerGroup,做一个这种策略不就好了?
    BBCCBB
        14
    BBCCBB  
    OP
       2020-08-21 11:13:58 +08:00
    @zardly666 这个倒是可以做, 类似 snowflake 算法 workid 的生成. 但相对较麻烦, 老哥但还有没有简单点的解决办法啊
    @wisej consumergroup 堆积就是监控上看着有点慌. 磁盘会占用.
    @sonice 额, 我们多个实例发布的时候重启, 那就会一次性有多个 old consumerGroup, 监控上看着蛇皮的很, 比较难搞.
    @kifile 是, 这是一种方案, 但开发没有 KafkaAdmin 的权限, 所以代码里删除不掉, 只能手动了.... 哈哈


    谢谢各位, 期待更好的方案
    mosesyou
        15
    mosesyou  
       2020-08-21 11:23:21 +08:00
    纯 docker 么,如果是 k8s 的话,用 statefulset,可以实现每个实例有固定递增编码 0,1,2....
    BBCCBB
        16
    BBCCBB  
    OP
       2020-08-21 11:32:54 +08:00
    @mosesyou 我们将 docker 镜像上传到云上, 然后后续的流程我得研究一下, 问一下我们负责这一块的同事, 如果可行的话这得确是一个好办法. 多谢.
    yangbonis
        17
    yangbonis  
       2020-08-21 11:35:21 +08:00 via iPhone
    mq 不是本来就组播工作的?所有订阅都会收到。
    BBCCBB
        18
    BBCCBB  
    OP
       2020-08-21 11:37:57 +08:00
    @yangbonis 是需要同一个服务不同实例都收到, 你说的这个大概是不同服务.
    j2gg0s
        19
    j2gg0s  
       2020-08-21 12:39:55 +08:00
    @BBCCBB 瞎逼设计,每个实例根据消息在自己的内存里面做些什么工作吗?不能搞个 redis 或者 db ?

    然后,kafka 的监控看到堆积是没有什么大影响的,因为消息只存一份。
    如果你觉得不爽,可以在实例 shutdown 的时候了,把 consumergroup 注销掉?
    j2gg0s
        20
    j2gg0s  
       2020-08-21 12:40:34 +08:00
    @j2gg0s 每次重启,重头还是重新开始消费呢?
    sampeng
        21
    sampeng  
       2020-08-21 12:42:26 +08:00 via iPhone
    @j2gg0s 嗯。然后磁盘就爆了…
    BBCCBB
        22
    BBCCBB  
    OP
       2020-08-21 12:42:59 +08:00
    @j2gg0s 业务场景你都不清楚你瞎 bb 个毛.
    BBCCBB
        23
    BBCCBB  
    OP
       2020-08-21 12:43:55 +08:00
    im 推消息, 量小, 所以还不想做路由中心. 所以采用广播.
    j2gg0s
        24
    j2gg0s  
       2020-08-21 12:44:09 +08:00
    @sampeng kafka 的磁盘被爆炸了?
    rockyou12
        25
    rockyou12  
       2020-08-21 13:24:00 +08:00
    我觉得最好用其它 mq,kafka 本来就不适合这种场景,你这业务看起来也不需要持久化,redis 的 sub/pub 可能都够了
    lwldcr
        26
    lwldcr  
       2020-08-21 14:38:37 +08:00
    你这个问题 加一个预处理步骤就可以了吧。

    比如你一组应用有 10 个实例,那你提前分配好 groupId 名字,如 cg_1, cg_2,..., 然后存到一个地方:DB 、Redis 等

    然后每个应用实例启动时 去存储的地方请求分配一个 groupId,用这个 groupId 启动 kafka 消费服务不就完事了
    JKeita
        27
    JKeita  
       2020-08-21 15:07:03 +08:00
    固定 group id 每次启动清除 offset 怎样?
    yty2012g
        28
    yty2012g  
       2020-08-21 16:56:06 +08:00
    固定 group id,每次设置 offset 到最新应该就可以满足
    IamNotShady
        29
    IamNotShady  
       2020-08-21 19:45:51 +08:00 via iPhone
    redis 的 pub/sub 不香吗?
    timonwong
        30
    timonwong  
       2020-08-21 20:39:39 +08:00
    不要用 High Level Consumer API 就完了,之前用 go 写了一个,也用到了线上一年,不过不保证无 bug

    https://github.com/imperfectgo/kafkasub
    timonwong
        31
    timonwong  
       2020-08-21 20:48:13 +08:00
    原理是手动维护 offset,如果程序不死 retry 的时候保持 offset,程序死了从最新的来,可以按照自己的需求来调整。

    不过有一点要注意的真的是 IM 的话,因为 kafka 的 partition reblance IO 相当大,可能造成非常大的 E2E 的 latency,这点要注意(虽然可以通过配置限制 IO 来绕过)。 总的来说,其实不适合 IM 这个场景
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     892 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 23:04 PVG 07:04 LAX 15:04 JFK 18:04
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86