SpringBoot 集成 MQTT 并发发送消息异常有大佬帮忙看看吗 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
129duckflew
V2EX    程序员

SpringBoot 集成 MQTT 并发发送消息异常有大佬帮忙看看吗

  •  
  •   129duckflew 355 天前 2042 次点击
    这是一个创建于 355 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我在 github 搜索到一个有点年头了 但是可能有关的 issue , 这个 issue 已经关闭了,本来还想留言来着 https://github.com/eclipse/paho.mqtt.java/issues/323
    我使用 Spring-Integration-mqtt 模块发送和接收 MQTT 消息,发生以下异常:

    2024-10-22 10:56:45.161 [][] ERROR o.s.i.handler.LoggingHandler:250 - org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [bean 'mqttOutboundHandler' for component 'mqttOutboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [com/rms/config/mqtt/MqttConfig.class]'; from source: 'com.rms.config.mqtt.MqttConfig.mqttOutboundHandler(org.eclipse.paho.mqttv5.client.MqttConnectionOptions)'], failedMessage=GenericMessage [payload={"value":[],"unitId":741,"fieldName":"landingCall-down-front-1-32","isRunningData":false,"isError":false}, headers={replyChannel=nullChannel, errorChannel=, mqtt_qos=0, id=1237ba75-e408-10ff-e322-5f692dd8970e, mqtt_topic=status/741/landingCall-down-front-1-32, timestamp=1729565799271}] at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:283) at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.handleMessageInternal(Mqttv5PahoMessageHandler.java:222) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129) at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56) at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314) at java.base/java.lang.VirtualThread.run(VirtualThread.java:311) Caused by: Internal error, caused by no new message IDs being available (32001) at org.eclipse.paho.mqttv5.client.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:32) at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454) at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:511) at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:155) at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:218) at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1530) at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1499) at org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler.publish(Mqttv5PahoMessageHandler.java:271) ... 10 more 

    环境:

    • SpringBoot 3.2.5
    • java21
    • PahoMQTT1.2.5 下面是我的核心逻辑。我使用 Spring-Integration 集成流程配置了两个 Mqtt 客户端。一个用于接收信息,另一个用于发送消息。发送消息的频率约为每秒 5000 条消息。
    mqtt: client-id-inbound: rms-inbound client-id-outbound: rms-outbound url: tcp://127.0.0.1:1883 username: rms password: 123456 
    import java.util.concurrent.Executors; @Configuration @IntegrationComponentScan("com.rms.config") @Slf4j @ConfigurationProperties(prefix = "mqtt") @Data public class MqttConfig { private String clientIdInbound; private String clientIdOutbound; private String url; private String password; private String username; @Bean public MqttConnectionOptions mqttConnectOptions(){ MqttConnectionOptions optiOns= new MqttConnectionOptions(); options.setServerURIs(new String[] { url}); options.setUserName(username); options.setPassword(password.getBytes()); options.setAutomaticReconnect(true); return options; } @Bean public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager(MqttConnectionOptions options) { Mqttv5ClientManager clientManager = new Mqttv5ClientManager(options, clientIdInbound); clientManager.setPersistence(new MqttDefaultFilePersistence()); return clientManager; } @Bean public SimpleMessageConverter simpleMessageConverter(){ return new SimpleMessageConverter(); } @Bean public MessageHandler mqttOutboundHandler(MqttConnectionOptions connectionOptions) { Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(connectionOptions,clientIdOutbound); messageHandler.setAsync(true); messageHandler.setDefaultTopic("defaultTopic"); messageHandler.setDefaultQos(MqttQoS.AT_MOST_ONCE.value()); messageHandler.setConverter(simpleMessageConverter()); return messageHandler; } @Bean public IntegrationFlow mqttOutboundFlow(MessageHandler mqttOutboundHandler){ return IntegrationFlow.from("mqttOutboundChannel") .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor())) .handle(mqttOutboundHandler) .get(); } @Bean public IntegrationFlow statusInboundFlow(ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManage){ Mqttv5PahoMessageDrivenChannelAdapter messageProducer = new Mqttv5PahoMessageDrivenChannelAdapter(clientManage, "status/+/#"); return IntegrationFlow.from(messageProducer) .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor())) .transform(Transformers.objectToString()) .transform(Transformers.fromJson(Prop.class)) .route(Prop.class,prop->{ if (Boolean.TRUE.equals(prop.getIsError())){ return "errorDataChannel"; } else if (Boolean.TRUE.equals(prop.getIsRunningData())){ return "runningDataChannel"; } else { return "discardChannel"; } }) .get(); } @Bean public CanProtocolLoader canProtocolLoader(){ return new CanProtocolLoader(); } @Bean public UnitErrorHandler errorLogHandler(LogService logService ){ return new UnitErrorHandler(logService); } @Bean public UnitRunningDataHandler unitRunningDataHandler(LogService logService){ return new UnitRunningDataHandler(logService); } @Bean public IntegrationFlow errorLogChannelFlow(UnitErrorHandler unitErrorHandler){ return IntegrationFlow.from("errorDataChannel") .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor())) .handle(unitErrorHandler) .get(); } @Bean public IntegrationFlow runningDataChannelFlow(UnitRunningDataHandler runningDataHandler){ return IntegrationFlow.from("runningDataChannel") .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor())) .handle(runningDataHandler) .get(); } @Bean public IntegrationFlow discardChannelFlow(){ return IntegrationFlow.from("discardChannel") .channel(MessageChannels.executor(Executors.newVirtualThreadPerTaskExecutor())) .handle(message -> { }) .get(); } } 
    @Slf4j @RequiredArgsConstructor public class UnitErrorHandler implements MessageHandler { private final LogService logService; @Override public void handleMessage(Message<?> message) throws MessagingException { Prop<?> prop = (Prop<?>) message.getPayload(); logService.saveErrorLog(prop); } } 

    我使用 MessagingGateway 注释将消息定向到我的 mqttOutboundChannel ,以便我可以使用 MqttGateway 发送 mqtt 消息

     @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic); void sendToTopic(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.QOS ) int qos); void sendWithResp(String payload, @Header(MqttHeaders.TOPIC)String topic,@Header(MqttHeaders.RESPONSE_TOPIC) String responseTopic,@Header(MqttHeaders.QOS ) int qos); } 

    当消息投递率很低,一般每秒不到 200 条消息时,我一开始提到的错误就不会出现,但是当我将消息投递率提高到 5000 条时,按照上面的配置就开始出现报错,另外如果我去掉了 mqtt 消息的入站部分,也就是上面的代码中引入 的 statusInboundFlow 集成 Flow 后 发送消息又不报错了

    14 条回复    2024-10-25 09:40:35 +08:00
    maokg
        1
    maokg  
       354 天前
    Caused by: Internal error, caused by no new message IDs being available (32001),好像是 id 的问题
    129duckflew
        2
    129duckflew  
    OP
       354 天前
    @maokg 嗯,应该在发送大量消息的时候因为某些原因耗尽了可用的 ID 分配,看了一下源码的 at org.eclipse.paho.mqttv5.client.internal.ClientState.getNextMessageId(ClientState.java:1454) 这里从 inUseIdMap 查找三次如果找不到可以使用的 id 就会抛出这个异常,但是我不明白的点是什么原因导致 Client 一直不释放这些 id
    ZGame
        3
    ZGame  
       354 天前
    @129duckflew 我猜是背压问题 我感觉可能是你代码写的有问题
    ZGame
        4
    ZGame  
       354 天前
    每秒 5000 条...
    maokg
        5
    maokg  
       354 天前
    @129duckflew 网络差? client 没收到然后没确认( Qos 1 or 2)
    129duckflew
        6
    129duckflew  
    OP
       354 天前
    @maokg 代码里面发送的都是 QOS 0 消息
    129duckflew
        7
    129duckflew  
    OP
       354 天前
    @ZGame paho 客户端似乎没提供背压机制吧,背压我确实不太了解 只在响应式流里面听到过
    ZGame
        8
    ZGame  
       354 天前
    @129duckflew #7 mqtt 服务端借助第三方吧 阿里云或者自己搭建的,应该就没有这个问题了?像生产数据太快来不及消费吧
    129duckflew
        9
    129duckflew  
    OP
       354 天前
    @ZGame 我感觉不是数据没有消费的问题,我在帖子里面提到,我把入站监听注释掉 只发送不接受 就可以正常发送不报错的
    ZGame
        10
    ZGame  
       354 天前
    @129duckflew #9 对阿这就是背压问题啊 生产者生产太快消费者来不及消费不就内存溢出 oom 背压问题了吗...
    ZGame
        11
    ZGame  
       354 天前
    @129duckflew #9 所以你应该做的是 比如说问问 gpt ,有没有开源的 mqtt 服务端 能够把消息聚集起来,然后你客户端按一定频率去消费 ,多的选择丢弃或者 降低发送频率,
    huifer
        12
    huifer  
       352 天前
    能不能提供一个 DEMO 发给我,我正好在写这个方面的书籍。 交流可以加一加我 cWZ5ZDA5NQ==
    asp1111
        13
    asp1111  
       352 天前
    为啥没用 netty ,至少懂的的人多一些
    129duckflew
        14
    129duckflew  
    OP
       352 天前
    @huifer 我已经编写了一个最小 demo 请查看: https://github.com/129duckflew/inte-paho-test.git
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2592 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 04:49 PVG 12:49 LAX 21:49 JFK 00:49
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86