请教一个使用 HAProxy 代理访问 kafka 集群的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
967182
V2EX    Kafka

请教一个使用 HAProxy 代理访问 kafka 集群的问题

  •  
  •   967182 2023-04-21 19:52:25 +08:00 2747 次点击
    这是一个创建于 904 天前的主题,其中的信息可能已经有所发展或是发生改变。

    kafka 集群有三台机器 kafka01:9092 kafka03:9092 kafka03:9092

    因为受网络原因约束不能直接访问集群,只能在服务器 192.168.30.160 上搭代理 试了两种配置方式

     方式一: listen kafka bind *:992 mode tcp balance roundrobin no option clitcpka timeout check 5s server kafka01 192.168.41.168:9092 check inter 5000 rise 2 fall 3 server kafka02 192.168.41.169:9092 check inter 5000 rise 2 fall 3 server kafka03 192.168.41.170:9092 check inter 5000 rise 2 fall 3 
     方式二: listen kafka bind *:9092 mode tcp balance roundrobin server kafka1 127.0.0.1:8881 check server kafka2 127.0.0.1:8883 check server kafka3 127.0.0.1:8885 check listen kafka01 bind *:8881 mode tcp server kafka1 192.168.41.168:9092 check listen kafka02 bind *:8883 mode tcp server kafka1 192.168.41.169:9092 check listen kafka03 bind *:8885 mode tcp server kafka1 192.168.41.170:9092 check 
     kafka client 试过这两个配置,kafka01,kafka02,kafka03 都指向了 192.168.30.160 public final static String bootstrapServers = "kafka01:9092,kafka02:9092,kafka03:9092"; public final static String bootstrapServers = "kafka01:9092"; 网络测试 telnet 通, 代理和 host ping 也都通。 

    发送消息时报错 :

     19:37:39.347 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [kafka01:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = gzip connections.max.idle.ms = 3000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 500 max.block.ms = 60000 max.in.flight.requests.per.cOnnection= 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 10000 retries = 0 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.cOnfig= null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.secOnds= 300 sasl.login.refresh.min.period.secOnds= 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 19:37:42.274 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Starting Kafka producer I/O thread. 19:37:42.280 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node kafka01:9092 (id: -1 rack: null) for sending metadata request 19:37:42.283 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node kafka01:9092 (id: -1 rack: null) using address kafka01/192.168.30.160 19:37:42.287 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1 19:37:42.287 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b 19:37:42.288 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1682077062274 19:37:42.296 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Kafka producer started 19:37:45.442 [main] INFO com.example.demokafka.test.TestProduct - 准备发送 19:38:00.121 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 19:38:00.432 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions. 19:38:00.432 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node -1. 19:38:00.577 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with kafka01/192.168.30.160 disconnected java.io.EOFException: null at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.base/java.lang.Thread.run(Thread.java:834) 19:38:00.579 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected. 19:38:00.583 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker kafka01:9092 (id: -1 rack: null) disconnected 

    实在想不通是啥问题了,望大佬能赐教。

    8 条回复    2023-04-23 16:39:40 +08:00
    lsk569937453
        1
    lsk569937453  
       2023-04-21 21:50:57 +08:00
    换个能代理 kafka 的 proxy,只要支持 tcp 就行了,比如 nginx 。
    然后看是否正常,如果都失败,就排除是 proxy 的问题了。
    然后这条链路上的其他问题,dns 啥的。
    0m9ionbP8wuvs8S3
        2
    0m9ionbP8wuvs8S3  
       2023-04-21 22:22:36 +08:00
    你是想通过 192.168.30.160 : 9092 一个端口来访问 3 个集群节点?这应该不行的,因为消息是可能会被发送到到不同的分区,所以生产者应该会尝试连接集群里面其他的节点,ha 没做 kafka 协议解析应该没法识别要连到哪个节点,如果是单个节点应该可以。
    967182
        3
    967182  
    OP
       2023-04-23 08:58:44 +08:00
    @vgbw 我试过用 9092 9093 9094 三个端口访问集群貌似也不行
    967182
        4
    967182  
    OP
       2023-04-23 09:00:31 +08:00
    @lsk569937453 nginx 也试过不好用,在局域网也搭了一套这样的环境基本可以排除是网络问题的。
    lsk569937453
        5
    lsk569937453  
       2023-04-23 09:32:45 +08:00
    https://github.com/grepplabs/kafka-proxy
    这个用过吗?我看也是 proxy 上得开多端口啊。
    0m9ionbP8wuvs8S3
        6
    0m9ionbP8wuvs8S3  
       2023-04-23 11:57:54 +08:00
    @967182 如果是 ha 开 3 个端口, 修改每个 kafka 节点配置文件 config/server.properties 的 advertised.listeners=PLAINTEXT://{haproxy_ip}:{ha 对应该节点的端口}
    967182
        7
    967182  
    OP
       2023-04-23 16:37:30 +08:00
    @vgbw 不行,情况比较复杂好多项目在用, 必须保证是 9092 不能影响别人
    967182
        8
    967182  
    OP
       2023-04-23 16:39:40 +08:00
    @lsk569937453 嗯嗯,在看这个东西,目前也有问题没跑通。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     838 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 20:46 PVG 04:46 LAX 13:46 JFK 16:46
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86