请教, kafka 如何做到一个 topic 分发不同的类型的消息 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
NoKey
V2EX    程序员

请教, kafka 如何做到一个 topic 分发不同的类型的消息

  •  
  •   NoKey 2023-04-25 11:46:42 +08:00 3062 次点击
    这是一个创建于 950 天前的主题,其中的信息可能已经有所发展或是发生改变。
    场景是这样的,上游服务 A ,通过 kafka 发消息个下游服务 B,C,D

    为了后续集成方便,A 使用了一个 Topic

    这个时候,需要 BCD 接收自己的消息

    这种场景下,如何才能控制 BCD 只收到自己的消息,不收别人的消息呢?

    考虑了几种方式:
    1. 通过 key 。这样下游服务只有收到消息之后才知道 key 是啥,不是自己的丢弃,但是这样必须收消息,也就是 B 会收到 C ,D 的消息,感觉不好。
    2. 通过分区。不同下游的消息放到不同的分区,但是这样会造成分区不均衡,部分分区过大。

    请问一下大家有没有更好的办法呢?谢谢
    25 条回复    2023-04-25 16:46:13 +08:00
    antipro
        1
    antipro  
       2023-04-25 11:52:43 +08:00 via Android
    给 B ,C ,D 各建一个 Kafka
    aijam
        2
    aijam  
       2023-04-25 11:57:02 +08:00
    1F +1
    cloudzhou
        3
    cloudzhou  
       2023-04-25 12:09:09 +08:00
    给 B ,C ,D 各建一个 Topic 就可以
    dddd1919
        4
    dddd1919  
       2023-04-25 12:10:29 +08:00
    如果只让 BCD 接收到自己的消息,那就在 push 时分三个 topic ,直接把消息隔离开,缺点就是负载可能不均服务利用率降低
    如果只让 BCD 处理自己要的消息并忽略掉无意义消息,可以在各自 consumer 加 filterStrategy 过滤掉无关消息
    NoKey
        5
    NoKey  
    OP
       2023-04-25 12:18:19 +08:00
    建多个 topic 的麻烦点就是,后续要不断的增加 topic ,有没有办法,一个 topic 就可以解决呢?
    ChaYedan666
        6
    ChaYedan666  
       2023-04-25 12:37:41 +08:00
    @NoKey 不可能吧,不论怎么说,只要是都发一个 topic ,那么 BCD 就得把里面的消息拉过来做过滤,过滤后再消费自己的;或者另一种就是你自己说的第二种,同一个消费者组,监听不同分区,根据 key 发不同的分区,分区不均衡啥的就你得自己控制了
    wuYin
        7
    wuYin  
       2023-04-25 12:59:56 +08:00 via iPhone
    也许可以用 2 个 kafka 集群,A 写集群 1 ,自己写个 connector 做消息解析与分发,写到集群 2 的三个 topic ,再由 B C D 各自消费。
    这种做法引入了新的集群和组件,成本和维护代价更高。可行但不建议
    securityCoding
        8
    securityCoding  
       2023-04-25 13:02:37 +08:00 via Android
    kafka 不好做,换阿里云 rocketmq 加 tag
    kaddusabagei38
        9
    kaddusabagei38  
       2023-04-25 13:39:54 +08:00
    建议换队列
    urnoob
        10
    urnoob  
       2023-04-25 13:47:16 +08:00
    B C D 各自作为一个消费者组。
    waitwait365
        11
    waitwait365  
       2023-04-25 13:51:24 +08:00
    用 rabbitmq
    zgzhang
        12
    zgzhang  
       2023-04-25 14:04:05 +08:00
    kafka stream 做个任务来处理
    Masonnn
        13
    Masonnn  
       2023-04-25 14:12:41 +08:00
    在消息体里定义 business_type: B 、C 、D ,然后引进一个中间层 X ,X 直接消费 A 发送的消息,并根据 business_type 决定调用( HTTP 或 RPC ) B 、C 、D 。(计算机科学中的每个问题都可以用一间接层解决 doge )
    Masonnn
        14
    Masonnn  
       2023-04-25 14:14:48 +08:00
    但是上述方案有个问题:B 、C 、D 直接接受流量的冲击,没有 MQ 来缓冲,服务可能会被打爆
    fkdog
        15
    fkdog  
       2023-04-25 14:20:54 +08:00
    明明有现成的高速公路,多建两个 topic 的事,你非得要自己单独再修一条路。我不知道怎么评价你这个需求。。
    “为了方便”,请问改成 3 个 topic 不方便在哪里?
    awinds
        16
    awinds  
       2023-04-25 14:51:37 +08:00
    除非你真的有需求有另外的 E 同时消费所有数据,不然就多个 topic 吧
    lower
        17
    lower  
       2023-04-25 14:53:23 +08:00
    @WhereverYouGo 感觉问题不大,X 其实已经在 mq 后面了,慢慢一个一个取消息就行
    Super8
        18
    Super8  
       2023-04-25 15:14:49 +08:00
    可以在消息的 key 或者 value 中添加标识,例如在消息的 key 中添加 B 、C 、D 等标识,表示该消息是发给 B 、C 、D 的,然后在消费者端使用带有过滤条件的消费者来消费消息,只消费自己需要的消息。具体可以使用 Kafka 的 Consumer API 提供的 subscribe 方法中的参数来实现,例如使用 subscribe(Collections.singleton(topic), new MyPartitionAssignor()) 方法,其中 MyPartitionAssignor 实现了 PartitionAssignor 接口,可以根据标识来分配分区。另外,也可以使用 Kafka Streams 来实现消息过滤和分发。
    Super8
        19
    Super8  
       2023-04-25 15:15:32 +08:00
    rocketmq 中 tag 最适合这个场景
    zhaoyy0513
        20
    zhaoyy0513  
       2023-04-25 16:12:30 +08:00
    @Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
    zhaoyy0513
        21
    zhaoyy0513  
       2023-04-25 16:23:45 +08:00
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;

    import java.util.Properties;

    public class KafkaStreamsExample {
    public static void main(String[] args) {
    // 设置 Kafka Streams 属性
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 创建 Kafka Streams 实例
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder.stream("topic-A");

    // 根据消息的 key 将消息路由到不同的分区中
    stream.selectKey((key, value) -> key)
    .through("topic-A-shuffle")
    .groupByKey()
    .foreach((key, value) -> {
    // 处理消息
    System.out.println("Processed message: " + value);
    });

    // 将处理后的消息发送到下游服务
    stream.mapValues(value -> "processed " + value)
    .to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
    kafkaStreams.start();
    }
    }

    在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

    需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
    burymme11
        22
    burymme11  
       2023-04-25 16:34:11 +08:00
    可以中间自己加一个路由层。
    新建一个中间层 AA ,来监听 topic ,处理上游服务 A 的消息,在 AA 里面,自己写代码做负载均衡,比如根据消息 ID 取模,给 B ,C ,D 分配好不同的 key ,最后所有消息再往新的 NewTopic 里丢。这样 B ,C ,D 就监听 NewTopic 就行,以后要加薪的下游服务,你只要改动 AA 层分发路由的代码就好。
    Dlin
        23
    Dlin  
       2023-04-25 16:37:01 +08:00
    kafka 的 topic 和 rabbitmq 的 topic 不一样么。
    zhaoyy0513
        24
    zhaoyy0513  
       2023-04-25 16:37:30 +08:00
    要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

    使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

    在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

    在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

    java
    Copy
    // 创建 Kafka 消费者
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "group-B");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> cOnsumer= new KafkaConsumer<>(props);

    // 订阅"topic-A"主题
    consumer.subscribe(Collections.singletonList("topic-A"));

    // 消费消息
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    if (record.key().equals("key-B")) {
    // 处理消息
    }
    }
    consumer.commitSync();
    }
    ```

    为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

    java
    Copy
    // 创建 Kafka 消费者
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "group-B");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    props.put("enable.auto.commit", "true");
    KafkaConsumer<String, String> cOnsumer= new KafkaConsumer<>(props);
    ```
    使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。





    上面两条回复都是 chatgpt 回复的
    PythonYXY
        25
    PythonYXY  
       2023-04-25 16:46:13 +08:00
    为什么不建多个 topic 呢,如果下游服务不固定可以做成配置式的啊
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1136 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 17:07 PVG 01:07 LAX 09:07 JFK 12:07
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86