本文介绍了主线程完成后,Reactor Schedulers 继续运行很长时间?如何处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于如何在使用 Reactor 3 时清理调度程序工作线程的问题

Flux.range(1, 10000).publishOn(Schedulers.newElastic("Y")).doOnComplete(() -> {//应该怎么做才能确保清理工作线程logger.info("关闭所有调度器工作线程");}).subscribe(x -> logger.debug(x+ "**"));

我在执行上述代码时看到的是,一旦主线程完成运行,工作线程仍处于 WAITING 状态一段时间.

sun.misc.Unsafe.park(原生方法)java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)java.lang.Thread.run(Thread.java:748)

有没有办法控制这个?即它们可以被处理 onComplete() 吗?我试过 Schedulers.shutdownNow() 但它没有帮助.>

另一方面,当我这样做时,我能够控制调度程序的处理.哪种方式是首选/提倡的方式?

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");Flux.range(1, 10000).concatWith(Flux.empty()).publishOn(s).doOnComplete(() -> {s.dispose();logger.info("关闭所有调度器工作线程");}).subscribe(x -> logger.debug(x+ "**"));
解决方案

如果您使用 Schedulers.new[Elastic|...],那么您有责任跟踪结果 Scheduler 如果你想关闭它.Schedulers.shutdownNow() 只会在您不明确时关闭库使用的默认调度程序,例如 Schedulers.elastic()(注意没有 new 前缀).

在所有操作运行后进行清理的最佳方法是使用 doFinally.这将异步执行清理回调after onError|onComplete|cancel 事件.最好确保它是链中的最后一个操作符,尽管它试图在所有情况下真正最后执行.

唯一需要注意的是,它与之前的操作符在同一个线程中运行,换句话说,您试图关闭的线程...a s.dispose()<doFinally 回调中的/code> 会在执行器的任务队列处理完毕后关闭,因此在这种情况下,线程消失之前会有一点延迟.

这是一个转储线程信息、切换到自定义弹性线程并在 doFinally 中关闭它的示例(添加过滤器和物化以提供更短的日志,更好地了解事件的播放方式出):

@Testpublic void schedulerFinallyShutdown() 抛出 InterruptedException {ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();Logger logger = Loggers.getLogger("foo");CountDownLatch 锁存器 = 新 CountDownLatch(1);reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");Flux.range(1, 10000).publishOn(s).concatWith(Flux.empty().doOnComplete(() -> {for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {System.out.println("最后一个元素\t" + ti.getThreadName() + " " + ti.getThreadState());}})).doFinally(sig -> {s.dispose();logger.info("关闭所有调度器工作线程");闩锁.countDown();}).filter(x -> x % 1000 == 0).物化().subscribe(x -> logger.info(x+ "**"));闩锁.等待();for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {System.out.println("清理后\t" + ti.getThreadName() + " " + ti.getThreadState());}}

打印出来:

11:24:36.608 [X-2] INFO foo - onNext(1000)**11:24:36.611 [X-2] 信息 foo - onNext(2000)**11:24:36.611 [X-2] 信息 foo - onNext(3000)**11:24:36.612 [X-2] 信息 foo - onNext(4000)**11:24:36.612 [X-2] 信息 foo - onNext(5000)**11:24:36.612 [X-2] 信息 foo - onNext(6000)**11:24:36.612 [X-2] 信息 foo - onNext(7000)**11:24:36.613 [X-2] 信息 foo - onNext(8000)**11:24:36.613 [X-2] 信息 foo - onNext(9000)**11:24:36.613 [X-2] 信息 foo - onNext(10000)**最后一个元素 X-2 RUNNABLE最后一个元素 elastic-evictor-1 TIMED_WAITING最后一个元素 Monitor Ctrl-Break RUNNABLE最后一个元素 Signal Dispatcher RUNNABLE最后一个元素终结器等待最后一个元素引用处理程序 WAITING最后一个元素主要等待11:24:36.626 [X-2] 信息 foo - onComplete()**11:24:36.627 [X-2] INFO foo - 关闭所有调度程序工作线程清理后 Monitor Ctrl-Break RUNNABLE清理后信号调度器RUNNABLE清理结束后等待清理后参考处理程序 WAITING清理后主要 RUNNABLE

I have a question on how to clean up the Scheduler worker threads while using Reactor 3

Flux.range(1, 10000)
.publishOn(Schedulers.newElastic("Y"))
.doOnComplete(() -> { 
    // WHAT should one do to ensure the worker threads are cleaned up
    logger.info("Shut down all Scheduler worker threads");
})
.subscribe(x -> logger.debug(x+ "**"));

What I see when I execute the above code is that once the main thread has finished running the worker thread(s) is/are still in WAITING state for some time.

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:748)

Is there a way to control this ?i.e can they be disposed onComplete() ?I tried Schedulers.shutdownNow() and it doesn't help.

On the other hand when I do this I'm able to control the Scheduler disposal.Which is the preferred/advocated way ?

reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
        Flux.range(1, 10000)
        .concatWith(Flux.empty())
        .publishOn(s)
        .doOnComplete(() -> {           
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
        })
        .subscribe(x -> logger.debug(x+ "**"));
解决方案

If you use Schedulers.new[Elastic|...], then it is your responsibility to keep track of the resulting Scheduler if you want to close it. Schedulers.shutdownNow() will only close the default schedulers used by the library when you're not explicit, like Schedulers.elastic() (notice no new prefix).

The best way to clean-up after all the operations have run is to use doFinally. This will asynchronously perform a cleanup callback after onError|onComplete|cancel events. Better ensure it is the last operator in the chain, although it tries to really execute last in all cases.

The only caveat is that it runs in the same thread as the previous operators, in other words the very thread you are trying to shutdown... a s.dispose() in the doFinally callback would shutdown the executor after its queue of tasks has been processed, so in this case there would be a slight delay before the thread disappears.

Here is a sample that dumps thread info, switches to the custom elastic thread and shuts it down in a doFinally (added filter and materialize to give a shorter log with a better view of how events play out):

@Test
public void schedulerFinallyShutdown() throws InterruptedException {
    ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
    Logger logger = Loggers.getLogger("foo");
    CountDownLatch latch = new CountDownLatch(1);
    reactor.core.scheduler.Scheduler s = Schedulers.newElastic("X");
    Flux.range(1, 10000)
        .publishOn(s)
        .concatWith(Flux.<Integer>empty().doOnComplete(() -> {
            for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
                System.out.println("last element\t" + ti.getThreadName() + " " + ti.getThreadState());
            }
        }))
        .doFinally(sig -> {
            s.dispose();
            logger.info("Shut down all Scheduler worker threads");
            latch.countDown();
        })
        .filter(x -> x % 1000 == 0)
        .materialize()
        .subscribe(x -> logger.info(x+ "**"));

    latch.await();
    for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
        System.out.println("after cleanup\t" + ti.getThreadName() + " " + ti.getThreadState());
    }
} 

This prints out:

11:24:36.608 [X-2] INFO  foo - onNext(1000)**
11:24:36.611 [X-2] INFO  foo - onNext(2000)**
11:24:36.611 [X-2] INFO  foo - onNext(3000)**
11:24:36.612 [X-2] INFO  foo - onNext(4000)**
11:24:36.612 [X-2] INFO  foo - onNext(5000)**
11:24:36.612 [X-2] INFO  foo - onNext(6000)**
11:24:36.612 [X-2] INFO  foo - onNext(7000)**
11:24:36.613 [X-2] INFO  foo - onNext(8000)**
11:24:36.613 [X-2] INFO  foo - onNext(9000)**
11:24:36.613 [X-2] INFO  foo - onNext(10000)**
last element    X-2 RUNNABLE
last element    elastic-evictor-1 TIMED_WAITING
last element    Monitor Ctrl-Break RUNNABLE
last element    Signal Dispatcher RUNNABLE
last element    Finalizer WAITING
last element    Reference Handler WAITING
last element    main WAITING
11:24:36.626 [X-2] INFO  foo - onComplete()**
11:24:36.627 [X-2] INFO  foo - Shut down all Scheduler worker threads
after cleanup   Monitor Ctrl-Break RUNNABLE
after cleanup   Signal Dispatcher RUNNABLE
after cleanup   Finalizer WAITING
after cleanup   Reference Handler WAITING
after cleanup   main RUNNABLE

这篇关于主线程完成后,Reactor Schedulers 继续运行很长时间?如何处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 18:03