求助: Java 多线程如何终止其它线程 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Alextrasza
V2EX    Java

求助: Java 多线程如何终止其它线程

  •  
  •   Alextrasza 2024-09-25 10:29:10 +08:00 3902 次点击
    这是一个创建于 382 天前的主题,其中的信息可能已经有所发展或是发生改变。

    主线程去调用 10 个子线程查询任务(返回 true/false), 在回调中统计结果,
    我想在得到两个 true 时主线程立即返回 true, 不管其他的子线程时已提交到线程池还是未提交到线程池, 都不需要了

    我尝试了CompletableFuture#cancel, 和Executor#shutDownNow方法, 都没达到效果

    ThreadPoolExecutor executor = newFixedThreadFool(10); List<CompletableFuture<Boolean> list = ... //10 个任务, 都提交到 executor AtomicInteger i = 0; list.forEach(cf- cf.thenAccept(b-> if(b && i.incrementAndGet >= 2 ){ //让主线程停止阻塞立即返回, 尝试下面两个方案都不行 executor.shutDownNow(); list.forEach(f->f.cancel(true)); } )) // 希望通过某种方式让此处停止阻塞 CompletableFuture.allOf(list.toArray).join() return i.get()>=2; 

    目前的方案是把后续流程放在了子线程回调中, 加锁处理, 主线程不要返回值了, 应该是可行的,
    但是还是想请问大家, 原来的"主线程提前返回"的想法能否实现?

    20 条回复    2024-09-26 11:11:05 +08:00
    thetbw
        1
    thetbw  
       2024-09-25 10:46:40 +08:00
    在任何耗时的方法前,循环中间都判断一下 Thread.currentThread().isInterrupted(),如果为 true ,就 return ,或者抛出中断异常。就可以达到线程终止的目的了。
    如果要终止其他线程,就调用目标线程的 Thread.interrupt() 方法,线程池在退出时会自动给你调用这个方法,你只需要做好第一步判断中断就行了。想下 Thread.sleep()
    Hito
        2
    Hito  
       2024-09-25 10:48:14 +08:00
    CountDownLatch
    latch 设置 2 ,子线程中得到 true 时 latch.countDown()。主线程中 latch.await() 。
    git00ll
        3
    git00ll  
       2024-09-25 10:58:16 +08:00
    向子线程发送中断,一般 IO 这种的都会抛中断异常。
    如果没有堵塞代码你还需要增加 Thread.currentThread().isInterrupted()来判断当前中断标志位
    GloryJie
        4
    GloryJie  
       2024-09-25 11:00:40 +08:00
    做过类似的,是多个任务有一个满足就提前返回,可以参考下

    public static <T> T supplyBatchPredicateTask(List<Callable<T>> taskList, Predicate<T> predicate, long timeout, Executor executor) {
    CountDownLatch predicateCountDown = new CountDownLatch(1);
    AtomicReference<T> successReference = new AtomicReference<>();
    AtomicReference<T> failReference = new AtomicReference<>();

    List<CompletableFuture<T>> futureList = new ArrayList<>(taskList.size());

    for (Callable<T> task : taskList) {
    CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
    try {
    T result = task.call();
    if (predicate.test(result)) {
    successReference.set(result);
    predicateCountDown.countDown();
    return result;
    }
    failReference.set(result);
    } catch (Exception exception) {
    log.warn("act=supplyBatchPredicateTask 任务执行失败", exception);
    }
    return null;
    }, executor);
    futureList.add(future);
    }

    //全失败(不仅仅是任务失败,还有是断言失败)的情下进行唤醒
    CompletableFuture.allOf(futureList.toArray(new CompletableFuture[] {})).whenComplete((result, throwable) -> {
    predicateCountDown.countDown();
    });

    try {
    predicateCountDown.await(timeout, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
    // ignore
    log.warn("act=supplyBatchPredicateTask 等待任务执行结果中断", e);
    }
    return successReference.get() != null ? successReference.get() : failReference.get();
    }
    awalkingman
        5
    awalkingman  
      &nbp;2024-09-25 14:06:36 +08:00
    @Hito 这个看起来可以。正常都是 latch 个数等于线程数,子线程都结束了主线程继续往下走。如果 latch 少于开启的子线数,主线程应该可以提前往下走了。剩下的子线程如果能正常结束就正常结束,不能结束就当泄露了。
    Rickkkkkkk
        6
    Rickkkkkkk  
       2024-09-25 14:12:04 +08:00
    做不到的,你起一个线程,然后里面写个死循环,除了重启没有任何办法可以终止这个死循环。

    除非主动代码里去检查某个标记位
    MoYi123
        7
    MoYi123  
       2024-09-25 14:32:34 +08:00
    用 tgkill 给线程发信号, 当然这不是一个好的做法.
    Cruzz
        8
    Cruzz  
       2024-09-25 14:48:17 +08:00
    这不就是线程间通信么,首先得通信,代码里检查,保证线程安全,正确通信不就能做到了么
    HaibaraDP
        9
    HaibaraDP  
       2024-09-25 14:58:58 +08:00
    CountDownLatch(2),主线程和子线程的超时时间别忘了。不想阻塞主线程就把回调卸载子线程的查询结果里,AtomicInteger 累加返回 2 就执行
    diagnostics
        10
    diagnostics  
       2024-09-25 15:00:58 +08:00
    如果你是要实现 Hedged requests 的能力,那可以看一下 scala 的 firstcompletedof 。

    https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism

    我自己在 java 里写了一版实现,不过不是针对线程,而是针对 future:

    https://gist.github.com/Roiocam/fb4ae743ecaafb7385f75dbaac3030f8
    jeesk
        11
    jeesk  
       2024-09-25 15:29:17 +08:00
    参考 kotlin 协程的玩法, 如何取消协程. 现成的作业不抄 ?
    lbaci0529
        12
    lbaci0529  
       2024-09-25 15:59:34 +08:00
    这评论区妙呀
    HaibaraDP
        13
    HaibaraDP  
       2024-09-25 16:45:31 +08:00
    看错了,把方法返回值也做成 future
    Alextrasza
        14
    Alextrasza  
    OP
       2024-09-25 17:30:24 +08:00
    @Hito #2 这样的话, 如果 10 个子线程中不足两个 true, 即使 10 个都执行完了, 主线程也会一直阻塞下去吧
    siweipancc
        15
    siweipancc  
       2024-09-25 19:50:37 +08:00 via iPhone
    countdown 原子计数 标记位
    总共三个要素
    gaifanking
        16
    gaifanking  
       2024-09-25 22:07:15 +08:00
    封装好的 CountDownLatch 比较简单,或者用标准的等待通知模型:
    volatile int target;
    消费者:
    synchronized (lock){
    while(target<2){
    lock.wait();
    }
    }
    生产者:
    synchronized (lock){
    target++;
    lock.notifyAll();
    }
    shawnsh
        17
    shawnsh  
       2024-09-25 22:51:20 +08:00 via Android
    能具体说说你的需求吗
    sagaxu
        18
    sagaxu  
       2024-09-26 08:25:58 +08:00   1
    https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdownNow()

    There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via Thread.interrupt(), so any task that fails to respond to interrupts may never terminate.

    Java 不能强制终止线程,杀死一个线程需要那个线程配合,检测 interrupt 标志,处理 InterruptedException 异常。


    //让主线程停止阻塞立即返回, 尝试下面两个方案都不行
    主线程中创建一个 CompletableFuture done ,此处完成它

    CompletableFuture.anyOf(done, CompletableFuture.allOf(list.toArray)).join()

    等待的条件,从 任务全部完成 变成 (任务全部完成 or i >= 2)
    lolico
        19
    lolico  
       2024-09-26 10:57:16 +08:00
    public static void main(String[] args) throws InterruptedException {
    List<Thread> threads = new ArrayList<>();
    final int untilSuccessCount = 2;
    final int threadCount = 10;
    AtomicInteger counter = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(1);
    for (int i = 0; i < threadCount; i++) {
    int finalI = i;
    Thread thread = new Thread(() -> {
    // 模拟耗时任务
    Random random = new Random();
    // 直接使用线程的 interrupt 中断标记
    while (!Thread.interrupted()) {
    try {
    Thread.sleep((long) finalI * 1000);
    } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return;
    }
    if (random.nextBoolean()) {
    if (counter.incrementAndGet() == untilSuccessCount) {
    latch.countDown();
    }
    return;
    }
    }
    });
    thread.start();
    threads.add(thread);
    }
    // 可以启动一个线程,等待所有 thread 完成后 latch.countDown ,防止一直等待。
    // 或者加一个 allDoneLatch 也可以实现
    latch.await();
    threads.forEach(Thread::interrupt); // 中断其他线程
    System.out.println(counter.get());
    }
    lolico
        20
    lolico  
       2024-09-26 11:11:05 +08:00
    @lolico 这里用线程池也是可以的,主线程 latch.await()后调线程池 shutdownNow 也可以中断线程池内所有线程
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2687 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 24ms UTC 07:23 PVG 15:23 LAX 00:23 JFK 03:23
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86