弱鸡求教一个关于 Java 多层 for 循环效率问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
请不要在回答技术问题时复制粘贴 AI 生成的内容
Gct012
V2EX    程序员

弱鸡求教一个关于 Java 多层 for 循环效率问题

  •  1
    &nsp;
  •   Gct012 2023-03-10 14:56:34 +08:00 3765 次点击
    这是一个创建于 948 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我是一个菜鸡开发,目前遇到一个需求想请教下各位大佬。假设需要改造以下 For 循环

    for(int i = 0; i < list.size() ; i ++){ List resultList = HttpRequest.post(url).body(list.get(i)); for(int j = 0 ; j < resultList.size() ; j++){ var resultA = functionA(resultList.get(j)); var resultB = functionB(resultA); var resultC = functionC(resultB); } } 

    其中 list 数据来源 API 接口,数据量在 100 到 1000 不等。functionA 、B 、C 都有业务逻辑( Http 请求,数据库查询等,都是需要串行执行的)。目前单线程运行比较慢,想问下有什么比较好的办法可以提高处理效率?

    我打算使用多线程并行处理 list 的数据,但是里面那层 for 循环数据量也比较大(多的可能有 1 万条),里面那层不知道有没有办法也可以加快效率的?或者针对这类场景是否有比较通用的解决办法?

    31 条回复    2023-03-10 22:55:25 +08:00
    idealhs
        1
    idealhs  
       2023-03-10 15:03:21 +08:00
    没写过 java ,个人思路应该把 List resultList = HttpRequest.post(url).body(list.get(i)); 提出来,结果使用 yield return 。给下面的业务逻辑调用。然后你的 HttpRequest 应该可以换成 Async 的方法,在业务逻辑里面使用到 resultList[i]的时候, 去 await
    sbex
        2
    sbex  
       2023-03-10 15:04:49 +08:00
    这个主要还是得分析具体性能瓶颈在哪里,单从代码看目前只能想到线程池。
    RyanLeeCUP
        3
    RyanLeeCUP  
       2023-03-10 15:07:59 +08:00
    你可以先合并一下不可并行的操作,比如 List 可以并行处理的,如果量大就拆分成若干个批次去并行
    function A B C 不能并行,所以看成一个行为,对 resultList 也可以并行化
    把这个当成管道 pipe(listAction).pipe(resultAction).execute()
    大概这样
    LeegoYih
        4
    LeegoYih  
       2023-03-10 15:09:46 +08:00
    如果只需要保证 functionABC 的调用顺序可以用 Fork/Join
    resultList.parallelStream()
    .map(o -> functionA(o))
    .map(o -> functionB(o))
    .map(o -> functionC(o))
    .collect(Collectors.toList());
    Ericality
        5
    Ericality  
       2023-03-10 15:11:16 +08:00
    function ABC 是否可以直接用多线程来执行? 如果可以的话就可以节约很多资源
    或者直接用 parallelStream 来代替 for 循环处理 也可以直接并行处理
    但是具体要看对数据的操作方式吧 如果没有时序要求可以考虑
    还有我也是个菜鸡 如果说的不对望后面大佬指出
    awalkingman
        6
    strong>awalkingman  
       2023-03-10 15:11:41 +08:00
    内层也开线程,CountDownLatch 等待归集结果。
    jiajianjava
        7
    jiajianjava  
       2023-03-10 15:14:18 +08:00
    这个应该是生产消费模式, 多线程生产者请求数据,把数据提交个阻塞队列, 多线程消费者从队列获取数据 处理 ABC 任务
    Gct012
        8
    Gct012  
    OP
       2023-03-10 15:21:11 +08:00
    @Ericality 里面的 Function ABC 目前看下来只能串行操作。外层的 For 循环我打算用 parallelStream 来遍历,但是不太确定里面的那层循环是否还能开并行流...
    justNoBody
        9
    justNoBody  
       2023-03-10 15:23:36 +08:00
    把 4 楼和 7 楼的答案结合一下就好了,然后记得如果使用 parallelStream 务必要自定义线程池,使用默认的线程池会导致其他任务阻塞
    Gct012
        10
    Gct012  
    OP
       2023-03-10 15:26:04 +08:00
    @RyanLeeCUP 大佬你说的这个 pipe 是标准还是第三方库的额,我貌似没搜到类似的写法...
    awalkingman
        11
    awalkingman  
       2023-03-10 15:27:00 +08:00   1
    for(int i = 0; i < list.size() ; i ++){
    List resultList = HttpRequest.post(url).body(list.get(i));
    CountDownLatch latch = new CountDownLatch(resultList.size());
    for(int j = 0 ; j < resultList.size() ; j++){
    // 这里继续开线程
    {
    var resultA = functionA(resultList.get(j));
    var resultB = functionB(resultA);
    var resultC = functionC(resultB);
    latch.countDown();
    }
    latch.await();
    }
    }
    Gct012
        12
    Gct012  
    OP
       2023-03-10 15:28:22 +08:00
    @justNoBody 那这样的话是不是外层和里层的循环得拆成两个队列?
    dqzcwxb
        13
    dqzcwxb  
       2023-03-10 15:28:45 +08:00
    不要用 parallelStream 去做 io 操作,parallelStream 只推荐在 cpu 密集型任务时使用
    你这个用 completablefuture 是很合适的
    Gct012
        14
    Gct012  
    OP
       2023-03-10 15:29:32 +08:00
    @newskillsget 感谢大佬!我试试
    DreamStar
        15
    DreamStar  
       2023-03-10 15:31:04 +08:00
    先从业务上调整, 能整合的整合, 能合并的合并.
    其次同步转异步, 事件驱动用消息队列+本地事件表,根据具体的消费能力调整并发即可.
    你这个量用单进程多线程做稳定性太差,吞吐量太低,没啥可观测性.
    Martin9
        16
    Martin9  
       2023-03-10 15:31:24 +08:00
    以下回答来自 chatgpt, 供你参考。

    针对这个场景,使用多线程并行处理 list 数据可以提高处理效率。你可以使用 Java 的线程池来实现多线程处理。Java 提供的线程池可以在多个线程之间共享一组线程,可以重复利用线程,减少线程创建和销毁的开销,从而提高效率。

    在处理大量数据时,可以考虑使用分治思想,将数据分成若干份,分别交给不同的线程去处理,处理完成后再将结果合并。这样可以充分利用多核 CPU 的性能,提高并行处理的效率。

    对于里面那层 for 循环的处理,你可以使用并行流来提高处理效率。Java 8 引入了 Stream API ,可以方便地进行并行处理。你可以使用 stream() 方法将结果列表转换成流,然后使用 parallel() 方法将流转换为并行流,最后使用 forEach() 方法对流进行处理。

    List<List> resultLists = new ArrayList<>();
    IntStream.range(0, list.size())
    .parallel()
    .forEach(i -> {
    List resultList = HttpRequest.post(url).body(list.get(i));
    resultLists.add(resultList);
    });

    List results = resultLists.stream()
    .flatMap(List::stream)
    .parallel()
    .map(result -> {
    var resultA = functionA(result);
    var resultB = functionB(resultA);
    var resultC = functionC(resultB);
    return resultC;
    })
    .collect(Collectors.toList());
    这里使用了 Java 8 的 Stream API ,通过并行处理来提高处理效率。第一个 forEach() 方法将结果列表转换成流,并行地处理列表中的每个元素,将结果添加到结果列表中。第二个流中的 flatMap() 方法将多个结果列表合并成一个流,然后并行地对每个结果进行处理,最后将结果收集到一个列表中。
    Ericality
        17
    Ericality  
       2023-03-10 15:35:46 +08:00
    @Gct012 可以 但是好像你想要 ABC 顺序执行 那为什么不直接在外面开多线程呢
    即 ThreadUtils.excutor.excute(list ->
    functionA(list.get(i));
    functionB(resultA);
    functionC(resultC);
    )
    同时我注意到 resultList 是一个 http 请求 那是不是这个 list 请求一遍就可以了?
    List resultList = HttpRequest.post(url).body(list.get(i));
    //下面是原来的 for 循环的代替
    resultList.stream.paralla.map(currentList ->{
    ThreadUtiles.excutor.excute
    .....
    }

    )
    以上都是伪代码哈 只是提供一个思路
    awalkingman
        18
    awalkingman  
       2023-03-10 15:38:03 +08:00
    @jiajianjava 优雅一点做法确实如此。多线程开发调试比较麻烦的,可以把外层结果都提取到队列里,然后再开多个消费者消费,这样观测性高(看生产者有没有生产,看队列有没有消费和消费后的结果是否符合预期)和调试难度也比较低。
    leonshaw
        19
    leonshaw  
       2023-03-10 15:48:46 +08:00
    内层 10000 个开线程太多了,池满了一样阻塞。要改成异步或者试试虚线程。
    luckyrayyy
        20
    luckyrayyy  
       2023-03-10 15:51:52 +08:00
    list.parallelStrem()
    .flatMap(item -> {return HttpRequest.post(url).body(item).stream();})
    .parallel()
    .forEach(item -> {
    funcC(funcB(funcA(item)));
    });

    这样?
    Aluneth
        21
    Aluneth  
       2023-03-10 16:18:09 +08:00
    能不能 f 将 uncABC 视为一个函数,去掉外部循环,将需要跟中间件交流的地方都改成批量执行。最终还是要看这个循环耗时的点具体在什么地方。
    RyanLeeCUP
        22
    RyanLeeCUP  
       2023-03-10 16:28:59 +08:00
    @Gct012 不是库 类似伪代码,担心并发量就分批,然后把串行操作抽象成一个节点,节点任务并行化,最后会成管道式的流程,最后管道本身也可以被并行化 按这个思路去设计
    ldyisbest
        23
    ldyisbest  
       2023-03-10 16:30:31 +08:00
    List resultList = HttpRequest.post(url).body(list.get(i)); 提到最外层, 用批量了接口,思想是减少 http 请求次数,functionA,B,C 里面也这样做
    levintrueno
        24
    levintrueno  
       2023-03-10 17:51:44 +08:00
    public class Code {

    // 执行外层任务的线程池
    static ExecutorService outerExecutor = Executors.newFixedThreadPool(8);

    // 执行内层任务的线程池
    static ExecutorService innerExecutor = Executors.newFixedThreadPool(16);

    // 任务总数
    static AtomicInteger taskCount = new AtomicInteger();

    static String url = "url";

    static Random random = ThreadLocalRandom.current();

    public static void optimization() {

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();

    // 模拟任务
    final int maxTask = random.nextInt(1000);

    System.out.println("外层总任务数:" + maxTask);

    List<String> list = IntStream.rangeClosed(1, maxTask).mapToObj(String::valueOf).collect(Collectors.toList());

    // 50 个任务一组
    final List<List<String>> partition = Lists.partition(list, 50);

    System.out.println("拆分任务数量:" + partition.size());

    partition.parallelStream()
    .map(task -> CompletableFuture.runAsync(new OuterTask(task), outerExecutor))
    .forEach(CompletableFuture::join);

    System.out.println("taskCount = " + taskCount);
    stopWatch.stop();
    System.out.println("耗时:" + stopWatch.getTotalTimeSeconds());

    innerExecutor.shutdown();
    outerExecutor.shutdown();

    }

    private static class OuterTask implements Runnable {

    private final List<String> tasks;

    public OuterTask(List<String> tasks) {
    this.tasks = tasks;
    }

    @Override
    public void run() {
    tasks.parallelStream()
    .map(task -> CompletableFuture.runAsync(new InnerTask(task), innerExecutor))
    .forEach(CompletableFuture::join);
    }
    }

    private static class InnerTask implements Runnable {

    private final String body;

    public InnerTask(String body) {
    this.body = body;
    }

    @Override
    public void run() {
    final List<String> respOnseResult= HttpRequest.post(url).body(body);

    for (String aParam : responseResult) {
    final String bParam = functionA(aParam);
    final String cParam = functionB(bParam);
    final String result = functionC(cParam);

    // handle result...

    taskCount.incrementAndGet();
    }
    }
    }
    }

    考虑不周,仅作参考。。。
    Dahunvwu
        25
    Dahunvwu  
       2023-03-10 17:58:03 +08:00
    disruptor 框架了解一下,或者使用 1.8 的 CompletionService
    disruptor.handleEventsWithWorkerPool(poolA)
    .thenHandleEventsWithWorkerPool(poolB)
    .thenHandleEventsWithWorkerPool(poolC)
    .thenHandleEventsWithWorkerPool(poolD)
    ymz
        26
    ymz  
       2023-03-10 18:04:42 +08:00
    线程池,以及 resultList 需要和数据库打交道的一起提出来,切分后一条 sql 处理若干个,CPU 计算很快的,主要还是 IO
    ration
        27
    ration  
       2023-03-10 18:30:02 +08:00 via Android
    看看瓶颈在哪里,http 请求还是数据库,添加日志看看时间多少。接着有些能合并请求的合并,多线程作为最后的手段。曾经试过下载文件的,实际上太多线程没用,网络限制在那里,提高带宽就好了。
    rb6221
        28
    rb6221  
       2023-03-10 18:37:44 +08:00
    你拿到 A 只是为了传入 B ?拿到 B 只是为了传入 C ?那可以把 funcABC 压缩合并简化一下
    night98
        29
    night98  
       2023-03-10 22:28:57 +08:00
    第二行可以看看接口提供方能否提供批量接口,这样速度会快一点,然后下面的 abc 方法只能通过增加线程数的方式提升效率,或者上异步。
    L0L
        30
    L0L  
       2023-03-10 22:47:02 +08:00 via Android
    @luckyrayyy 20 楼流操作大师,并发,还不会冲突。
    ychost
        31
    ychost  
       2023-03-10 22:55:25 +08:00
    Java19 的 Loom 开一开试一试,不行的话实时 reactor
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2707 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 15:15 PVG 23:15 LAX 08:15 JFK 11:15
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86