想用多线程把数据库的数据写入 elasticsearch
如果发生异常,要立即终止整个循环,所以用了 Future
代码如下
private Result loadDataFromDbIntoEs(Long maxId) { ExecutorService pool = Executors.newWorkStealingPool(); LinkedTransferQueue<Future<Result>> futureQueue = new LinkedTransferQueue<>(); try { //遍历数据库的表 for (long i = 0; i <= maxId; i += getDbPageSize()) { //创建任务 Callable<Result> task = createTask(i); //任务入队 futureQueue.put(pool.submit(task)); //队列超过一定长度后,先执行掉一部分再继续 if (futureQueue.size() > QUEUE_SIZE * 8) { if ((i % 100000) == 0) { log.debug("{} - Iterating future list, {} of {}", esEntityClassName, i, maxId); } //执行一部分任务 checkFuture(futureQueue); } } pool.shutdown(); //处理队列中剩余的任务 while (!futureQueue.isEmpty()) { checkFuture(futureQueue); } log.info("{} - sync complete", esEntityClassName); } catch (SyncException | InterruptedException | ExecutionException e) { //throw... } return Result.ok(); } /** * 创建任务 */ private Callable<Result> createTask(Long currentId) { return () -> { List<D> dbList = dbRepository.findByIdBetween(currentId, currentId + getDbPageSize() - 1); if (dbList.isEmpty()) { //忽略没有数据的 id 区间 return Result.ok(); } //写入 es return bulkCreate(dbListToEsList(dbList)); }; } /** * 消费任务 */ private void checkFuture(LinkedTransferQueue<Future<Result>> futureQueue) throws ExecutionException, InterruptedException { for (int i = 0; i < QUEUE_SIZE; i++) { Future<Result> future = futureQueue.poll(); if (future != null) { Result result = future.get(); if (!Result.REQUEST_SUCCESS.equals(result.getStatus())) { throw new SyncException(result.getMessage()); } } } }
现在的问题是,服务器 16 核,cpu 占用率并不高,大多数时候只有 es 的进程占了 20% 不知道是哪里有问题导致效率太低?
![]() | 1 gosansam 2019-07-12 11:49:43 +08:00 Result result = future.get(); 这个获取结果是阻塞的 |
2 zhady009 2019-07-12 13:30:03 +08:00 guava 的 ListeningExecutorService submit 返回 ListenableFuture 应该可以解决 |
![]() | 3 softtwilight 2019-07-12 13:49:47 +08:00 可以试试用 completableFuture,checkFuture 可以改 join,如果 io 耗时多,Executors.newWorkStealingPool() 的线程可能有点少,completableFuture 也可以自定义线程池 |
4 rykinia OP 谢谢各位的解答。 研究了几天,感觉主要问题还是 es 的阻塞比较久,不过把这里改成了主线程消费 queue 里面的,线程池里异步往 queue 添加,稍微好了点。 |