MQ 消费者阻塞如何处理?(ActiveMQ、RocketMQ) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
dunhanson
V2EX    程序员

MQ 消费者阻塞如何处理?(ActiveMQ、RocketMQ)

  • &nbs;
  •   dunhanson 2019-10-30 16:23:57 +08:00 7413 次点击
    这是一个创建于 2178 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题大概描述是:

    邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息

    我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。

    但不可能天天守着看然后重启一下吧

    于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果

    <policyEntry queue=">" producerFlowCOntrol="true" memoryLimit="512mb"> <slowConsumerStrategy> <abortSlowConsumerStrategy abortCOnnection="false"/> </slowConsumerStrategy> </policyEntry> 

    我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果

    我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息

    package cn.msb.rocketmq.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", cOnsumerGroup= "my-consumer_test-topic-1", selectorExpression = "first", selectorType = SelectorType.TAG, cOnsumeThreadMax= 1, cOnsumeTimeout= 1000) public class MyConsumer1 implements RocketMQListener<String> { public void onMessage(String message) { if(message.contains("1")) { try { System.out.println("1 阻塞中。。。"); Thread.sleep(1000*60*60); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("received message: {}", message); } } 

    我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理

    16 条回复    2019-11-01 10:29:59 +08:00
    superrichman
        1
    superrichman  
       2019-10-30 16:36:25 +08:00 via iPhone
    consumeTimeout 的单位是不是分钟
    dunhanson
        2
    dunhanson  
    OP
       2019-10-30 16:39:33 +08:00
    @superrichman 毫秒,默认 30000
    lucifer1108
        3
    lucifer1108  
       2019-10-30 16:42:57 +08:00
    让我想到了一个面试题,怎么限制一个方法的执行时间.
    可以用 callable+executors 实现.
    贴个 demo 代码
    ```java
    Callable<String> call = new Callable<String>() {
    public String call() throws Exception {
    // 开始执行耗时操作
    // Thread.sleep(1000 * 5);
    // return "线程执行完成.";
    // 响应时间较长的方法或接口调用,返回 String 类型
    return getRecCourses(params);
    }
    };
    try {
    ExecutorService exec = Executors.newFixedThreadPool(1);
    Future<String> future = exec.submit(call);
    // csvData 为 call 方法里的返回值,也就是我们方法的返回值
    csvData = future.get(1000 * 1, TimeUnit.MILLISECONDS); // 任务处理超时时间设为 1 秒
    } catch (TimeoutException ex) {
    // 捕获超时异常,超时处理,可以通过 ex 抛出异常,如果不抛出,则控制台不输出异常。
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据超时,使用原来推荐系统"ex);
    } catch (Exception e) {
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据失败,使用原来推荐系统");
    }
    ```
    lucifer1108
        4
    lucifer1108  
       2019-10-30 16:43:36 +08:00
    @lucifer1108 什么鬼,是我用 md 的姿势不正确么
    softtwilight
        5
    softtwilight  
       2019-10-30 16:44:53 +08:00
    cOnsumeThreadMax= 1, 单线程消费是业务需求吗? 改成多线程不会影响阻塞不会影响别的消费,但是阻塞的问题还是要解决
    dunhanson
        6
    dunhanson  
    OP
       2019-10-30 17:24:00 +08:00
    @lucifer1108 这个有点繁杂
    dunhanson
        7
    dunhanson  
    OP
       2019-10-30 17:24:35 +08:00
    @softtwilight 不是单线程消费,只是用单线程好模拟和控制
    dunhanson
        8
    dunhanson  
    OP
       2019-10-30 17:27:27 +08:00
    @lucifer1108 按道理 MQ 都应该有这个具体的配置的
    x537196
        9
    x537196  
       2019-10-30 17:35:19 +08:00
    为什么阻塞呢?其实我没怎么看懂问题,把消息取下来,放入线程池中执行响应业务不可以吗?
    justfly
        10
    justfly  
       2019-10-30 17:35:48 +08:00
    从根上解决问题,找到阻塞的原因。

    根据我的经验,如果消费者突然拿不到消息,而队列又有消息堆积的话,从客户端和服务端两侧都看下 tcp 连接还在不在。

    在某些低吞吐量的场景,tcp 连接长时间空闲,某些网络中间件会断掉连接而客户端没感知,就会 block 住了,再有大吞吐量后也不会恢复。

    如果连接已经断了,设置 rabbitmq 的心跳,而且心跳时间要比 tcp 自身的 keep alive 间隔短一些,保证连接活跃。
    dunhanson
        11
    dunhanson  
    OP
       2019-10-31 09:11:46 +08:00
    @x537196 还没找
    dunhanson
        12
    dunhanson  
    OP
       2019-10-31 09:12:02 +08:00
    @justfly 问题确实要找的
    Dabaicong
        13
    Dabaicong  
       2019-10-31 09:30:38 +08:00
    #9 楼说的对,拉下消息,放线程池中异步执行,执行成功回调。再加上守护线程,监视任务执行,超时的话,守护线程就干掉 。
    jyounn
        14
    jyounn  
       2019-10-31 13:51:41 +08:00
    ......
    cOnsumeThreadMax= 1, cOnsumeTimeout= 1000)
    ......
    Thread.sleep(1000*60*60);

    消费线程数最大为 1,然后又让消费者线程 sleep 3600 秒?线程 sleep 是不会结束的,这个时候不会创建新的消费线程,导致无法创建新线程消费.消费者消费建议使用线程池,可以复用且好管理.另外你说的阻塞具体是什么现象呢?
    dunhanson
        15
    dunhanson  
    OP
       2019-10-31 16:29:04 +08:00
    @jyounn 我这个是模拟线上环境的阻塞状态
    线上肯定不止一个消费者线程数的
    阻塞情况是指,线上配置 5 个消费者线程池,然后刚好 5 个都在执行的过程中卡住了(问题还不清楚,你就理解为都因为某种原因 sleep 了)
    jyounn
        16
    jyounn  
       2019-11-01 10:29:59 +08:00
    @dunhanson 根据你的描述,看下消费的逻辑中是否有导致无限等待的情况?可以搭一个 RocketMQ 控制台看下生产者消费者的状态,通过 debug 看看.
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3087 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 12:13 PVG 20:13 LAX 05:13 JFK 08:13
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86