我们总是避免不了这一种需求 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Joker123456789
V2EX    Java

我们总是避免不了这一种需求

  •  
  •   Joker123456789 322 天前 2439 次点击
    这是一个创建于 322 天前的主题,其中的信息可能已经有所发展或是发生改变。

    非常不好意思,连续发两篇文章肯定是打扰到大家了,但我是有苦衷的[大哭][大哭],之前梯子出了点问题,所以一直访问不上,今天刚刚弄好,所以就把之前积累的两篇文章都发出来了。

    在此我真诚的给大家道个歉,实在是不好意思。


    我们总是避免不了这样一种需求,写个定时任务去查数据库或者去别的地方捞数据,然后对查出来的数据进行各种业务逻辑的处理。

    这其实就是就是一种生产者与消费者模式,查询数据库的过程可以称为“生产”,对数据行处理的过程可以称为“消费”,只不过大部分情况下我们都懒得去把他们分开,而是查出来就直接处理了。但是这种做法有个很明显的缺陷,就是生产和消费这两个步骤有时会互相阻塞。

    也就是说当查询数据的步骤出现阻塞的时候就没有数据来处理了。

    反过来也一样,当处理数据的步骤阻塞了,会导致查询数据的步骤停下来。

    所以为了让这两种能力可以各自发挥,我们需要把他们分开,各干各的,而这种分开的设计方式就叫生产者与消费者模型。

    只要能查到数据就尽管查,就算没人处理也不会影响我继续查。

    只要有数据我就处理,我才不管你此刻是不是阻塞了,你之前查出来的数据,我可能都还没处理完。

    所以我写了一个小工具,可以很好的解决这个问题

    他是一个多对多的模型,可以把多个生产者和多个消费者放到一个组合里,这个组合里所有的生产者都可以发布数据,而发布出去的数据会被所有消费者接收到。

    每个生产者都会单独占用一个线程,每个消费者也一样。

    比如像这样子我们就可以创建一个组合

    // 创建一组生产者与消费者,而这样组可以创建无限个 // 每一组的生产者都只会把数据推送给同一组的消费者 MagicDataProcessing.getProducerAndConsumerManager() .addProducer(new DemoProducer()) // 添加一个生产者(可以添加多个) .addConsumer(new DemoConsumer()) // 添加一个消费者(可以添加多个) .start(); 

    生产者长这样

    这里省略了很多其他配置,感兴趣的可以去官网查看

    public class DemoProducer extends MagicProducer { /** * 当生产者启动后,会自动执行这个方法,我们可以在这个方法里生产数据,并通过 publish 方法发布给消费者 * * 这边举一个例子 * 假如我们需要不断地扫描某张表,根据里面的数据状态去执行一些业务逻辑 * 那么我们可以在这个方法里写一个查询的逻辑,然后将查询到数据发送给消费者 */ @Override public void producer() { // 根据上面的例子,我们可以查询这张表里符合条件的数据 List<Object> dataList = selectList(); // 然后将他推送给消费者 // 可以推送任意类型的数据 this.publish(dataList); // 也可以分页查询,一页一页的推,至于怎么分页那就是业务层的事情了 } } 

    消费者长这样

    同样省略了很多其他配置,感兴趣的可以去官网查看

    public class DemoConsumer extends MagicConsumer { /** * 心跳通知,消费者每消费一个任务,都会触发一下这个方法 * 我们可以根据他触发的频率来判断这个消费者的活跃度 * * 注意!!! * 这个方法里不可以有耗时的操作,不然会将消费者阻塞的 * 如果一定要加耗时的操作,那么务必在新线程里搞 * @param id */ @Override public void pulse(String id) { new Thread(()->{ // 如果你需要在这个方法里搞一些耗时的操作,那么务必要像这样开启一个新线程 // 不然消费者会被阻塞的 }).start(); } /** * 这个方法会接收到生产者推送过来的数据 * 在里面执行相应的业务逻辑即可 * @param data */ @Override public void doRunner(Object data) { // data 可以是任何类型 // 因为能给他推送数据的生产者是固定的,所以 data 有可能收到的类型也是固定的 // 所以我们可以在这里自己判断,然后转化即可 // 为什么不用泛型?这是为了兼容多个生产者,因为他们推送的数据类型可能会不同 } } 

    这个模型里做了哪些事情?

    • 多线程处理,也就是前面提到的,每个生产者,每个消费者都是由单独的线程去执行的
    • 会自动开始下一轮生产,当生产者里的 producer 方法执行结束就意味着一轮生产已经结束了,根据业务需求如果需要不断的生产一轮又一轮,那么 producer 方法会自动不断地去执行。
    • 限制生产者,当生产者投放完一轮数据以后,会去监视消费者是否把这批数据消费完了,如果没消费完就会进入等待,这样可以在一定程度上避免数据积压。

    其他扩展功能

    项目里还提供了其他的工具类,可以并发处理 List ,Set ,Map 等集合,如果在生产者或者消费者里用上这些工具,可以进一步提高效率。

    官网地址:https://magician-io.com

    1 条回复    2024-11-25 11:19:45 +08:00
    edwardzcn98
        1
    edwardzcn98  
       322 天前
    这是教学向的帖子吗?我看思路就和 toy message queue 一样。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5431 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 07:44 PVG 15:44 LAX 00:44 JFK 03:44
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86