[ Java ]CrudBoy 想请教一个多线程处理的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
rykinia
V2EX    问与答

[ Java ]CrudBoy 想请教一个多线程处理的问题

  •  
  •   rykinia 2019-07-12 10:20:04 +08:00 1518 次点击
    这是一个创建于 2285 天前的主题,其中的信息可能已经有所发展或是发生改变。

    想用多线程把数据库的数据写入 elasticsearch

    如果发生异常,要立即终止整个循环,所以用了 Future

    代码如下

    private Result loadDataFromDbIntoEs(Long maxId) { ExecutorService pool = Executors.newWorkStealingPool(); LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>(); try { //遍历数据库的表 for (long i = 0; i <= maxId; i += getDbPageSize()) { //创建任务 Callable<Result> task = createTask(i); //任务入队 futureQueue.put(pool.submit(task)); //队列超过一定长度后,先执行掉一部分再继续 if (futureQueue.size() > QUEUE_SIZE * 8) { if ((i % 100000) == 0) { log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId); } //执行一部分任务 checkFuture(futureQueue); } } pool.shutdown(); //处理队列中剩余的任务 while (!futureQueue.isEmpty()) { checkFuture(futureQueue); } log.info("{} - sync complete", esEntityClassName); } catch (SyncException | InterruptedException | ExecutionException e) { //throw... } return Result.ok(); } /** * 创建任务 */ private Callable<Result> createTask(Long currentId) { return () -> { List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1); if (dbList.isEmpty()) { //忽略没有数据的 id 区间 return Result.ok(); } //写入 es return bulkCreate(dbListToEsList(dbList)); }; } /** * 消费任务 */ private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException { for (int i = 0; i < QUEUE_SIZE; i++) { Future<Result> future = futureQueue.poll(); if (future != null) { Result result = future.get(); if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) { throw new SyncException(result.getMessage()); } } } } 

    现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?

    4 条回复    2019-07-16 14:33:52 +08:00
    gosansam
        1
    gosansam  
       2019-07-12 11:49:43 +08:00
    Result result = future.get();
    这个获取结果是阻塞的
    zhady009
        2
    zhady009  
       2019-07-12 13:30:03 +08:00
    guava 的 ListeningExecutorService submit 返回 ListenableFuture 应该可以解决
    softtwilight
        3
    softtwilight  
       2019-07-12 13:49:47 +08:00
    可以试试用 completableFuture,checkFuture 可以改 join,如果 io 耗时多,Executors.newWorkStealingPool() 的线程可能有点少,completableFuture 也可以自定义线程池
    rykinia
        4
    rykinia  
    OP
       2019-07-16 14:33:52 +08:00
    谢谢各位的解答。
    研究了几天,感觉主要问题还是 es 的阻塞比较久,不过把这里改成了主线程消费 queue 里面的,线程池里异步往 queue 添加,稍微好了点。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     922 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 23ms UTC 21:37 PVG 05:37 LAX 14:37 JFK 17:37
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86