分享一个 Java 开发的并发编程工具包 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Joker123456789
V2EX    Java

分享一个 Java 开发的并发编程工具包

  •  
  •   Joker123456789 2024-10-02 15:31:14 +08:00 2341 次点击
    这是一个创建于 449 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Magician-Concurrent

    Magician-Concurrent 是一个并发编程工具包,当你需要并发执行某些代码的时候,不需要自己创建和管理线程,除此之外里面还提供了生产者与消费者模型

    初始化配置

    导入依赖

    <dependency> <goupId>com.github.yuyenews</groupId> <artifactId>Magician-Concurrent</artifactId> <version>1.0.0</version> </dependency> 

    并发处理任务

    MagicianConcurrent.getConcurrentTaskSync() .setTimeout(1000) // 超时时间 .setTimeUnit(TimeUnit.MILLISECONDS) // 超时时间的单位 .add(() -> { // 添加一个任务 // 在这里可以写上任务的业务逻辑 }, (result, e) -> { // 此任务处理后的回调 if(result.equals(ConcurrentTaskResultEnum.FAIL)){ // 任务失败,此时 e 里面有详细的异常信息 } else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) { // 任务成功,此时 e 是空的 } }) .add(() -> { // 添加一个任务 // 在这里可以写上任务的业务逻辑 }, (result, e) -> { // 此任务处理后的回调 if(result.equals(ConcurrentTaskResultEnum.FAIL)){ // 任务失败,此时 e 里面有详细的异常信息 } else if(result.equals(ConcurrentTaskResultEnum.SUCCESS)) { // 任务成功,此时 e 是空的 } }) .start(); 

    添加进去的任务会并发执行,但是在它们执行完之前,这整个代码块会同步等待在这,一直等到所有任务执行完或者超时才会继续往下走。

    这里面的超时时间就是用来设置同步等待多久的。

    • 如果设置为 0 表示一直等到所有任务完成为止
    • 设置为大于 0 的时候,表示只等待这么久

    并发处理 List ,Set 等所有 Collection 类的集合里的元素

    同步执行

    // 假如有一个 List 需要并发处理里面的元素 List<String> dataList = new ArrayList<>(); 

    每个元素并发执行

    // 只需要将他传入 syncRunner 方法即可 MagicianConcurrent.getConcurrentCollectionSync() .syncRunner(dataList, data -> { // 这里可以拿到 List 里的元素,进行处理 // List 里的元素是什么类型,这个 data 就是什么类型 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); 

    这个方法会将传进去的集合分成若干组,每组的大小由参数指定。

    这些组会排队执行,但是每一组在执行的时候都是并发的,里面的每一个元素都会由单独的线程去处理。

    需要等一组处理完了,才会处理下一组,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接进行下一组,所以这里的最后两个参数就是用来设置这个期限的。

    每一组并发执行

    // 也可以用 syncGroupRunner 方法 MagicianConcurrent.getConcurrentCollectionSync() .syncGroupRunner(dataList, data -> { // 这里可以拿到每一组的元素,进行处理 // 这个 data 就是每一组 List ,可以自己迭代处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); 

    这个方法会将传进去的集合分成若干组,每组的大小由参数指定。

    每一组由单独的线程处理。

    会一直同步等待在这里,直到所有组都处理完了才会进行下一步,但是有时候我们不想这么死板的等待,所以可以设置一个超时时间,超过了这个期限就不等了,直接执行下一步。所以这里的最后两个参数就是用来设置这个期限的。

    异步执行

    其实就是将上面 [同步处理] 的代码放到了一个线程里,内部处理依然是上面 [同步处理] 的逻辑,但是这整个代码块将会异步执行,不需要等在这。所以个别相同的参数就不再重复解释了。

    // 假如有一个 List 需要并发处理里面的元素 List<String> dataList = new ArrayList<>(); 

    每个元素并发执行

    // 只需要将他传入 asyncRunner 方法即可 MagicianConcurrent.ConcurrentCollectionAsync( 1, // 核心线程数 1, // 最大线程数 1, // 线程空闲时间 TimeUnit.MINUTES // 空闲时间单位 .asyncRunner(dataList, data -> { // 这里可以拿到 List 里的元素,进行处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); 

    ConcurrentCollectionAsync 里的参数其实就是线程池的参数,除了上面这种写法,还可以这样写。

    每调用一次 asyncRunner 都会占用一个线程,而这些线程都是由一个线程池在管理。

    ConcurrentCollectionAsync cOncurrentCollectionAsync= MagicianConcurrent.ConcurrentCollectionAsync( 1, // 核心线程数 1, // 最大线程数 1, // 线程空闲时间 TimeUnit.MINUTES // 空闲时间单位 ); concurrentCollectionAsync.asyncRunner(dataList, data -> { // 这里可以拿到 List 里的元素,进行处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); concurrentCollectionAsync.asyncRunner(dataList2, data -> { // 这里可以拿到 List 里的元素,进行处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); concurrentCollectionAsync.asyncRunner(dataList3, data -> { // 这里可以拿到 List 里的元素,进行处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 ); 

    用这个方法可以管理线程池

    // 关闭线程池 concurrentCollectionAsync.shutdown(); // 立刻关闭线程池 concurrentCollectionAsync.shutdownNow(); // 获取线程池 ThreadPoolExecutor threadPoolExecutor = concurrentCollectionAsync.getPoolExecutor(); 

    每一组并发执行

    // 也可以用 asyncGroupRunner 方法,每个参数的具体含义可以参考文档 MagicianConcurrent.ConcurrentCollectionAsync( 1, // 核心线程数 1, // 最大线程数 1, // 线程空闲时间 TimeUnit.MINUTES // 空闲时间单位 .asyncGroupRunner(dataList, data -> { // 这里可以拿到 List 里的元素,进行处理 System.out.println(data); }, 10, // 每组多少条元素 1, // 每组之间同步等待多久 TimeUnit.MINUTES // 等待的时间单位 

    同上

    并发处理所有 Map 类的集合里的元素

    Map 的逻辑跟 Collection 一模一样,只不过是传入的集合变成了 Map ,就不再累述了,感谢理解。

    同步执行

    每个元素并发执行

    // 假如有一个 Map 需要并发处理里面的元素 Map<String, Object> dataMap = new HashMap<>(); // 只需要将他传入 syncRunner 方法即可 MagicianConcurrent.getConcurrentMapSync() .syncRunner(dataMap, (key, value) -> { // 这里可以拿到 Map 里的元素,进行处理 System.out.println(key); System.out.println(value); }, 10, 1, TimeUnit.MINUTES); 

    每一组并发执行

    // 也可以用 syncGroupRunner 方法 MagicianConcurrent.getConcurrentMapSync() .syncGroupRunner(dataMap, data -> { // 这里可以拿到每一组 Map 进行处理 System.out.println(data); }, 10, 1, TimeUnit.MINUTES); 

    异步执行

    每个元素并发执行

    // 假如有一个 Map 需要并发处理里面的元素 Map<String, Object> dataMap = new HashMap<>(); // 只需要将他传入 asyncRunner 方法即可 MagicianConcurrent.getConcurrentMapAsync( 1, 1, 1, TimeUnit.MINUTES ).asyncRunner(dataMap, (key, value) -> { // 这里可以拿到 Map 里的元素,进行处理 System.out.println(key); System.out.println(value); }, 10, 1, TimeUnit.MINUTES); 

    每一组并发执行

    // 也可以用 asyncGroupRunner 方法 MagicianConcurrent.getConcurrentMapAsync( 1, 1, 1, TimeUnit.MINUTES ).asyncGroupRunner(dataMap, data -> { // 这里可以拿到每一组 Map 进行处理 System.out.println(data); }, 10, 1, TimeUnit.MINUTES); 

    生产者与消费者

    这是一个多对多的模型,多个生产者可以给多个消费者推送不同类型的数据,

    // 创建一组生产者与消费者,而这样组可以创建无限个 // 每一组的生产者都只会把数据推送给同一组的消费者 MagicianConcurrent.getProducerAndConsumerManager() .addProducer(new MagicianProducer() { // 添加一个生产者(可以添加多个) /** * 设置 ID ,必须全局唯一,默认是当前类的全名 * 如果采用默认值,可以不重写这个方法 * @return */ @Override public String getId() { return super.getId(); } /** * 设置 producer 方法是否重复执行,默认重复 * 如果采用默认值,可以不重写这个方法 * @return */ @Override public boolean getLoop() { return super.getLoop(); } /** * 设置 是否等消费者全部空闲了才继续生产下一轮数据,默认 false * 如果采用默认值,可以不重写这个方法 * @return */ @Override public boolean getAllFree() { return super.getAllFree(); } /** * 当生产者启动后,会自动执行这个方法,我们可以在这个方法里生产数据,并通过 publish 方法发布给消费者 * * 这边举一个例子 * 假如我们需要不断地扫描某张表,根据里面的数据状态去执行一些业务逻辑 * 那么我们可以在这个方法里写一个查询的逻辑,然后将查询到数据发送给消费者 */ @Override public void producer() { // 根据上面的例子,我们可以查询这张表里符合条件的数据 List<Object> dataList = selectList(); // 然后将他推送给消费者 // 可以推送任意类型的数据 this.publish(dataList); /* * 如果你只需要执行一次,那么到此就结束了,这个生产者也可以被回收掉了 * * 但是如果你需要不断地执行上述操作,来维护这张表里的数据,这个时候你有两种做法 * 第一种:加一个 while 循环 * 但是这种方式有个问题,如果消费者的消费速度跟不上,那么就很容易造成消费者队列积压,出现内存问题。 * 而数据积压太久又会影响时效性,可能你推送给消费者的时候,这条数据需要处理,但是等到被消费的时候又不需要处理了,这样容易出现数据错乱的问题。 * * 第二种:等消费者把你推给他的数据消费完了,再推送下一轮,而我们就是采用的这种 * 如果你想用这种方式,那么你不需要再写其他的任何逻辑,只需要将上面提到的 getLoop 方法重写一下,并返回 true 即可 * 当你设置为 true 以后,生产者在推送完一轮后会不断地监视消费者,当发现了空闲的消费者才会推送下一轮数据,并且数据只会推送给这个空闲的消费者 * * 如果你想等所有消费者都空闲了以后再推送下一轮,而不是发现一个空闲的就推送一轮 * 那么你可以重写上面提到的 getAllFree 方法,返回 true 即可 */ } }) .addConsumer(new MagicianConsumer() { // 添加一个消费者,可以添加多个 /** * 设置 ID ,必须全局唯一,默认是当前类的全名 * 如果采用默认值,可以不重写这个方法 * @return */ @Override public String getId() { return super.getId(); } /** * 心跳通知,消费者每消费一个任务,都会触发一下这个方法 * 我们可以根据他触发的频率来判断这个消费者的活跃度 * * 注意!!! * 这个方法里不可以有耗时的操作,不然会将消费者阻塞的 * 如果一定要加耗时的操作,那么务必在新线程里搞 * @param id */ @Override public void pulse(String id) { new Thread(()->{ // 如果你需要在这个方法里搞一些耗时的操作,那么务必要像这样开启一个新线程 // 不然消费者会被阻塞的 }).start(); } /** * 消费频率限制,默认 10 毫秒,取值范围:0 - long 的最大值,单位:毫秒 * * 如果任务执行的耗时小于 execFrequencyLimit ,则等待 execFrequencyLimit 毫秒后再消费下一个任务 * * 首先这是一个生产者和消费者多对多的模型结构,我们以一个生产者对多个消费者来举例 * 生产者生产的数据只有一份,但是他会推送给多个消费者 * 而我们之所以要配置多个消费者,是因为需要他们执行不同的业务逻辑 * 多个消费者执行的业务逻辑不同,也就意味着他们需要的数据大概率会不同 * * 比如消费者 A 需要处理男性的数据,消费者 B 需要处理女性的数据 * 如果生产者刚好连续推送了几批男性的数据,那么这会导致消费者 B 筛选不到女性数据,那么他就不会处理业务逻辑了 * 这么一来,消费者 B 就会无限接近空转,而空转会引起 CPU 占用率过大,所以必须加以限制 * * 千万不要小看这个问题,本人曾经在实战中亲测过,做不做这个限制,CPU 的占有率会达到 10 倍的差距 * 当然了,这跟消费者的业务逻辑还是有一定关系的,具体情况具体看待 * 如果你的消费者几乎不会出现空转,那么这里可以设置为 0 * */ @Override public long getExecFrequencyLimit() { return super.getExecFrequencyLimit(); } /** * 这个方法会接收到生产者推送过来的数据 * 在里面执行相应的业务逻辑即可 * @param data */ @Override public void doRunner(Object data) { // data 可以是任何类型 // 因为能给他推送数据的消费者是固定的,所以 data 有可能收到的类型也是固定的 // 所以我们可以在这里自己判断,然后转化即可 // 为什么不用泛型?这是为了兼容多个生产者,因为他们推送的数据类型可能会不同 } }) .start(); } 

    项目官网:https://magician-io.com

    1 条回复    2024-10-03 10:40:49 +08:00
    xmtpw
        1
    xmtpw  
       2024-10-03 10:40:49 +08:00 via iPhone
    收藏备用,谢谢分享
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3225 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 11:11 PVG 19:11 LAX 03:11 JFK 06:11
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86