求助 Java 大量任务分布式处理的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
yesterdaysun
V2EX    Java

求助 Java 大量任务分布式处理的问题

  •  1
     
  •   yesterdaysun 2022-04-26 12:30:59 +08:00 5333 次点击
    这是一个创建于 1266 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题是这样, 现在系统中有大量去和第三方 API 交互的任务, 比如有 1000 个用户, 每个用户又有各自 1 万个小的记录去和第三方 API 慢慢交互, 或者没有那么多记录但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络, 之前的方式就是一个线程池, 把所有大小任务塞进去, 但是这个线程池大小很难搞, 多了的, 有时会突然来一堆任务占住 CPU 和数据库, 少了的话, 一大堆任务又阻塞住.

    现在想搞成分布式好几台机器一起跑, 考察了一下方案, 有点迷惑:

    1. 一种是分布式任务队列, 看到一个 Celery 好像是这种, 但是这个 python 的, 我想要 Java 的, 结果没找到
    2. 一种是任务调度框架, quartz, xxljob 这种, 感觉我想要的更靠近这种, 但是又有点迷惑, 比如感觉我这种需求适合"分片广播"这种任务, 比如我把 1000 个用户的任务分片到 3 台机器, 但是然后每台机器上的任务为每个用户再单独为他名下的 1 万条记录自己做线程池请求? 或者我把任务拆到单个小记录的级别, 那岂不是得成千上万的 trigger, 然后任务调度又一般是一个主 job, 然后传参数这种, 那比如我要确保一个时间只有一个用户的任务在跑, 怎么做这个限制, 全要自己在任务中处理吗

    所以, 其实就是我想找一个比较现成的框架, 能处理超长的任务队列, 分布式, 并发的执行, 可以自动削峰填谷, 有一些任务自动处理, 比如重试, 故障转移等等, 又能够有一些保证一致性的机制, 比如按 job+某个参数确保不会重复执行, 还能程序方式发起调度, 而不是在某个管理后台手动编辑

    我想知道这样的东西存在吗, 还是必须自己实现, 求各位大佬赐教

    31 条回复    2022-04-28 15:37:50 +08:00
    aguesuka
        1
    aguesuka  
       2022-04-26 12:33:50 +08:00
    storm
    biubiuF
        2
    biubiuF  
       2022-04-26 12:40:35 +08:00
    你需要 kafka ,把你现在的 jobs 弄成消费者
    RedBeanIce
        3
    RedBeanIce  
       2022-04-26 12:59:38 +08:00
    可能是 xxjob ????我记得有分片处理,,,就是一堆小任务,大家去处理
    bthulu
        4
    bthulu  
       2022-04-26 13:06:09 +08:00
    既然时间都消耗在网络 IO 上, 上 windows 系统, 用 IOCP 去调接口, 单机就能搞定了, 用不着搞这么多骚操作
    jorneyr
        5
    jorneyr  
       2022-04-26 13:09:21 +08:00
    @biubiuF Kafka 的每个 partition 一个消费者组里同时只能有一个消费者进行消费,这种情况我觉得 RabbitMQ 可能更合适,不必明确的限制消费者个数,看情况随时动态增减消费者,每个消息可以使用阻塞的方式执行。
    DengDDDD
        6
    DengDDDD  
       2022-04-26 14:11:28 +08:00
    Akka
    lmshl
        7
    lmshl  
       2022-04-26 14:15:32 +08:00
    改异步纤程,你这才一千万个 IO 小任务,犯不着上分布式。Akka Stream (调度) + Akka HTTP (调 API ) 随便搞一搞单机就完事了
    ming159
        8
    ming159  
       2022-04-26 14:16:34 +08:00
    如你所说:“其实时间都是消耗在网络 IO 上” 线程是不解决 IO 问题的,你需要的是 异步 IO 处理机制。一个线程同时处理多个 IO ,而不是一个线程处理一个 IO 。
    ymmud
        9
    ymmud  
       2022-04-26 14:25:24 +08:00
    akka cluster sharding , 根据需求分片就行了
    lmshl
        10
    lmshl  
       2022-04-26 14:25:58 +08:00
    我写过一个
    所有 fiber 去数据库查任务状态,select * from tasks where state = 'todo',然后执行这一批任务,更新任务状态。
    最后并行 128 同时跑所有 fiber
    zmal
        11
    zmal  
       2022-04-26 14:57:11 +08:00
    需求场景是两个问题:
    1. 是否要把这部分逻辑从主系统解耦出来。
    2. 怎样加快这部分业务的处理速度,减少资源占用,包括但不限于可以任意扩容的分布式、异步 IO 等等。
    如果是我的话,个人对 Flink 比较熟悉,可能会选择解耦后用 Flink 来处理,Flink 解决了分布式、一致性容错等问题。
    akka 解决的是异步 io 并发量问题,楼上 akka 的方案应该也是可行的。看你对哪个工具比较熟悉了。
    git00ll
        12
    git00ll  
       2022-04-26 15:26:13 +08:00
    `但是有一个很耗时的同步接口, 可能 10 分钟以上, 其实时间都是消耗在网络 IO 上, 大部分时间在等网络`

    这句话不明白,啥接口要耗时 10 分钟? 等网络是什么意思。如果接口一次请求响应要 10 分钟,多开点线程如 200-300 个,网络堵塞的时候是不会大量占用 cpu 的。关键如果接口能否承受这么高并发数。
    5boy
        13
    5boy  
       2022-04-26 16:12:24 +08:00
    mark, 有没有不用大数据框架实现的方式?
    litchinn
        14
    litchinn  
       2022-04-26 16:24:53 +08:00
    /t/848357 ,隔壁刚提出的这个动态线程池不知道能不能实现这个需求。另外你说线程池大小不好调,换成分布式多个机器跑,那节点数量不是一样需要调整吗,k8s 弹性伸缩?
    misaka19000
        15
    misaka19000  
       2022-04-26 16:55:55 +08:00
    用协程或者异步 IO
    Saurichthys
        16
    Saurichthys  
       2022-04-26 16:58:49 +08:00
    不要用 xxl-job 的方案,基于数据库,性能不佳,莫名其妙问题很多
    yesterdaysun
        17
    yesterdaysun  
    OP
       2022-04-26 17:56:23 +08:00
    @git00ll 说的不清楚, 其实是一个长流程, 比如请求一个报告, 但是不会立即返回, 需要等第三方处理好, 才能拿到, 中间就每隔 1-2 分钟去轮询一次看看报告有没有好, 通常都要 10 分钟左右, 关键不是每种任务都是这样的, 如果单为它建一个线程池又感觉有点过了, 想搞个通用的解法

    上面的我都研究了一下, 我这个系统比较简单, 本身就是个单体, 并不是分布式的, 这次也只是想要把这个后台任务独立出去搞多机并行, 感觉我这个还不到动用 akka/协程之类的方案的地步, 应该还是简单点, 一个简单的调度系统加动态线程池就足够了, 美团开源的那个动态线程池看上去比较适合, 我先研究一下试试看
    polarbear007
        18
    polarbear007  
       2022-04-26 17:56:55 +08:00
    个人认为使用非阻塞 io 即可
    jekkro
        19
    jekkro  
       2022-04-26 18:17:50 +08:00
    用 redis 实现异步队列即可,一个进程专门负责插入任务到 Redis 队列中,另外几个负责从队列中获取信息并执行,完成后更新数据库里的状态。如果发生 Redis 所在的机器 down 机,则负责插入任务的那个进程重新把没有完成的再插入一遍(不过这个目前为止还没有发生过)。我有类似的业务,已经跑了 12 年了。
    另外因为 Redis 有各种复杂数据结构,可以满足延时队列,优先级队列,自动去重等功能。感觉性能优秀,代码简单。
    jekkro
        20
    jekkro  
       2022-04-26 18:21:16 +08:00
    不能用非阻塞 io 的原因一般是因为那些接口库不是自己实现的,没办法去改造那些接口底层库,虽然 http 的接口自己也可以实现,但是有些场景(比如各种开放平台的接口库)不可能把第三方提供的接口库重新写一边,而仅仅是为了解决阻塞 io 的问题。
    lmshl
        21
    lmshl  
       2022-04-26 18:28:37 +08:00
    @yesterdaysun 以上技术方案中,综合代码量和开发难度来看,从易到难依次应该是
    纤程 >> Akka Stream > nio-pool > xxjob/scheduler > 动态线程池屎上雕花 >> akka cluster sharding >> akka cluster without sharding

    纤程是真的简单,你这需求 20-50 行左右就完事了,不就是个
    flow = post(...) >> (sleep(1.minutes) *> check(xxx)).retryWhile(isCompleted) >> retrieve()
    然后 tasks.foreachPar(<你想开多大并行>)(flow)
    的事
    outoftimeerror
        22
    outoftimeerror  
       2022-04-26 18:33:29 +08:00
    我也写 java ,不过你这个需求让我选型的话,我会用 golang (goroutine+chan)+ redis
    XhstormR02
        23
    XhstormR02  
       2022-04-26 19:24:21 +08:00 via Android
    @lmshl java 的纤程 Quasar ,最近一次更新是 2018 年,都好多年没更新了 https://github.com/puniverse/quasar ,倒不如用 kotlin 的 coroutines
    https://github.com/Kotlin/kotlinx.coroutines/
    dddd1919
        24
    dddd1919  
       2022-04-26 21:27:40 +08:00
    显然是该上 MQ 了,把用户放到队列,由消费端去挨个处理用户任务,如果单个用户跑的话配一个消费任务就够了

    强业务需求建议 RabbitMQ/RocketMQ
    lmshl
        25
    lmshl  
       2022-04-26 21:41:39 +08:00
    @XhstormR02 反正我说的也不是 Java
    其实上面写的是 Scala 伪代码
    档燃,Kotlin 也不错,起码有 suspend/await 可以用,不像 IO Monad 要切换编程思维
    mind3x
        26
    mind3x  
       2022-04-27 02:40:39 +08:00 via Android
    建议看看 Uber 的开源框架 Cadence ,支持 Go 和 Java 。
    https://cadenceworkflow.io

    上手会有一点门槛。
    ymmud
        27
    ymmud  
       2022-04-27 12:02:07 +08:00
    @lmshl 看着有点像 zio
    lmshl
        28
    lmshl  
       2022-04-27 20:37:23 +08:00
    @ymmud 兄弟慧眼
    yhvictor
        29
    yhvictor  
       2022-04-27 23:36:07 +08:00 via iPhone
    协程应该满好弄的。
    nio 有点难写。

    但我觉得吧,楼主这工作分两部分,第一部分是网络 io 等待准备好,第二部分是处理数据。
    第一部分是 io 密集,第二部分是 cpu 密集。
    所以如果拆成两部分,第一步就可以线程开满,直到数据准备好,放入一个 queue 。
    第二步开线程约等于或小于 cpu 核心数,从 queue 中读准备好的数据源并处理。
    齐活。
    byte10
        30
    byte10  
       2022-04-28 09:35:05 +08:00
    @polarbear007 是的,一个 NIO 就解决啦,花里胡哨的,想一些错误的方案。

    首先 IO 密集型,线程开到 1000 个都不是问题,线程在 IO 的时候不占用 cpu 。当然可能同时响应时就会出现 cpu 拉满,所以 cpu 使用率就是锯齿形的,不好分析瓶颈。

    你这个核心问题就是 IO 时间不确定,无法确定最大线程数。你可以看我的那个教程,https://www.bilibili.com/video/BV1FS4y1o7QB , NIO 如何无视 IO 时间,解决线程池大小的问题。你千万不要搞分布式,分布式是单机 cpu 性能出现瓶颈才干的事情,你这个场景一个树莓派 4B 就能完成。一定要切记,这些很基本很基本的问题,不要把事情想复杂了,在这一点犯错的人太多了。

    至于多个异步转同步问题,countdownlatch 和 cyclicbarrier 都能很好解决。

    找到最核心的问题,解决核心问题,加油。
    wolfie
        31
    wolfie  
       2022-04-28 15:37:50 +08:00
    做过类似功能,自己实现了一个,参考了 xxl-job 的表结构。

    # some_table
    - id
    - status ( running 、succeed 、failed )

    # some_table_job (频繁扫描表)
    - some_table_id
    - is_running
    - next_run_time (索引字段)
    - last_run_time (索引字段)(几分钟一扫描,防止异常结束,长时间未完成)
    - version (版本号,乐观锁)

    # some_table_job_log
    - some_table_id
    - some_table_job_id
    - result ( succeed 、failed )

    1. 新增 some_table ,同时新增 some_table_job
    2. 定时任务扫描 some_table_job ,拉取任务数据
    3. 任务执行完
    - 成功:写入 some_table_job_log ,删除 some_table_job ,回写 some_table 状态。
    - 失败:写入 some_table_job_log ,计算 some_table_job 下一次执行时间。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2577 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 30ms UTC 15:30 PVG 23:30 LAX 08:30 JFK 11:30
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86