RxJava中的调度器

2023/05/10

1. 概述

在本文中,我们将重点关注不同类型的调度器,我们将在编写基于RxJava Observable的subscribeOn和observeOn方法的多线程程序时使用这些调度器。

调度器提供了指定在何处以及何时执行与Observable链操作相关的任务的机会。

我们可以从类Schedulers中描述的工厂方法中获得一个Scheduler。

2. 默认线程行为

默认情况下,Rx是单线程的,这意味着Observable和我们可以应用于它的运算符链将在调用其subscribe()方法的同一线程上通知其观察者。

observeOn和subscribeOn方法将Scheduler作为参数,顾名思义,它是我们可以用来调度单个操作的工具。

我们将使用createWorker方法创建我们的Scheduler实现,该方法返回一个Scheduler.Worker。worker接收操作并在单个线程上按顺序执行它们。

在某种程度上,worker本身就是一个调度器,但为了避免混淆,我们不会将其称为调度器。

2.1 安排操作

我们可以通过创建一个新的worker并安排一些操作来在任何Scheduler上安排作业:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> result += "action");
 
Assert.assertTrue(result.equals("action"));

然后,该操作在分配给worker的线程上排队。

2.2 取消操作

Scheduler.Worker扩展了Subscription,在worker上调用unsubscribe方法将导致队列被清空并取消所有挂起的任务。我们可以通过示例看到:

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += "First_Action";
    worker.unsubscribe();
});
worker.schedule(() -> result += "Second_Action");
 
Assert.assertTrue(result.equals("First_Action"));

第二个任务永远不会执行,因为它之前的任务取消了整个操作。正在执行的动作将被中断。

3. Schedulers.newThread

每次通过subscribeOn()或observeOn()请求时,此调度器都会简单地启动一个新线程。

这几乎不是一个好的选择,不仅因为启动线程时涉及延迟,还因为该线程未被重用:

Observable.just("Hello")
    .observeOn(Schedulers.newThread())
    .doOnNext(s ->
        result2 += Thread.currentThread().getName()
    )
    .observeOn(Schedulers.newThread())
    .subscribe(s ->
        result1 += Thread.currentThread().getName()
    );
Thread.sleep(500);
Assert.assertTrue(result1.equals("RxNewThreadScheduler-1"));
Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

当worker完成后,线程将简单地终止。只有当任务是粗粒度的时候才能使用这个调度器:它需要很多时间才能完成,但是任务数量很少,以至于线程根本不可能被重用。

Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(3000);
Assert.assertTrue(result.equals("RxNewThreadScheduler-1_Start_End_worker_"));

当我们在NewThreadScheduler上调度worker时,我们看到worker被绑定到一个特定的线程。

4. Schedulers.immediate

Schedulers.immediate是一个特殊的调度器,它以阻塞方式而不是异步方式在客户端线程中调用任务,并在操作完成时返回:

Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "_Start";
    worker.schedule(() -> result += "_worker_");
    result += "_End";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("main_Start_worker__End"));

事实上,通过immediate调度器订阅Observable通常与根本不订阅任何特定调度器具有相同的效果:

Observable.just("Hello")
    .subscribeOn(Schedulers.immediate())
    .subscribe(s ->
        result += Thread.currentThread().getName()
    );
Thread.sleep(500);
Assert.assertTrue(result.equals("main"));

5. Schedulers.trampoline

trampoline调度器与immediate非常相似,因为它也在同一个线程中调度任务,有效地阻塞。

但是,即将执行的任务会在所有先前计划的任务完成时执行:

Observable.just(2, 4, 6, 8)
    .subscribeOn(Schedulers.trampoline())
    .subscribe(i -> result += "" + i);
Observable.just(1, 3, 5, 7, 9)
    .subscribeOn(Schedulers.trampoline())
    .subscribe(i -> result += "" + i);
Thread.sleep(500);
Assert.assertTrue(result.equals("246813579"));

immediate立即调用给定任务,而trampoline等待当前任务完成。

trampoline的worker在调度第一个任务的线程上执行每个任务。对schedule的第一次调用是阻塞的,直到队列被清空:

Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
    result += Thread.currentThread().getName() + "Start";
    worker.schedule(() -> {
        result += "_middleStart";
        worker.schedule(() ->
            result += "_worker_"
        );
        result += "_middleEnd";
    });
    result += "_mainEnd";
});
Thread.sleep(500);
Assert.assertTrue(result.equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers.from

调度器在内部比java.util.concurrent中的执行器(Executor)更复杂-因此需要一个单独的抽象。

但是因为它们在概念上非常相似,所以毫不奇怪有一个包装器可以使用from工厂方法将Executor转换为Scheduler:

private ThreadFactory threadFactory(String pattern) {
    return new ThreadFactoryBuilder()
        .setNameFormat(pattern)
        .build();
}

@Test
public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException {
    ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched-A-%d"));
    Scheduler schedulerA = Schedulers.from(poolA);
    ExecutorService poolB = newFixedThreadPool(10, threadFactory("Sched-B-%d"));
    Scheduler schedulerB = Schedulers.from(poolB);

    Observable<String> observable = Observable.create(subscriber -> {
        subscriber.onNext("Alfa");
        subscriber.onNext("Beta");
        subscriber.onCompleted();
    });;

    observable
        .subscribeOn(schedulerA)
        .subscribeOn(schedulerB)
        .subscribe(
            x -> result += Thread.currentThread().getName() + x + "_",
            Throwable::printStackTrace,
            () -> result += "_Completed"
        );
    Thread.sleep(2000);
    Assert.assertTrue(result.equals("Sched-A-0Alfa_Sched-A-0Beta__Completed"));
}

schedulerB的使用时间很短,但它几乎不会在schedulerA上安排一个新的操作,它会完成所有工作。因此,多个subscribeOn方法不仅会被忽略,而且还会引入少量开销。

7. Schedulers.io

这个调度器类似于newThread,除了已经启动的线程被回收并且可以处理未来的请求。

此实现的工作方式类似于java.util.concurrent中的ThreadPoolExecutor,具有无限线程池。每次请求一个新的worker时,要么启动一个新线程(然后保持空闲一段时间),要么重用空闲线程:

Observable.just("io")
    .subscribeOn(Schedulers.io())
    .subscribe(i -> result += Thread.currentThread().getName());
 
Assert.assertTrue(result.equals("RxIoScheduler-2"));

我们需要小心处理任何类型的无限资源-在Web服务等缓慢或无响应的外部依赖项的情况下,io调度器可能会启动大量线程,导致我们自己的应用程序变得无响应。

在实践中,遵循Schedulers.io几乎总是更好的选择。

8. Schedulers.computation

默认情况下,Computation调度器将并行运行的线程数限制为availableProcessors()的值,如Runtime.getRuntime()实用程序类中所示。

因此,当任务完全受CPU限制时,我们应该使用computation调度器;也就是说,它们需要计算能力并且没有阻塞代码。

它在每个线程前面使用了一个无界队列,因此如果任务被调度,但是所有的核心都被占用,它就会被排队。但是,每个线程之前的队列将继续增长:

Observable.just("computation")
    .subscribeOn(Schedulers.computation())
    .subscribe(i -> result += Thread.currentThread().getName());
Assert.assertTrue(result.equals("RxComputationScheduler-1"));

如果出于某种原因,我们需要与默认值不同的线程数,我们始终可以使用rx.scheduler.max-computation-threads系统属性。

通过使用更少的线程,我们可以确保始终有一个或多个CPU核心空闲,即使在重负载下,computation线程池也不会使服务器饱和。根本不可能拥有比核心更多的计算线程。

9. Schedulers.test

此调度器仅用于测试目的,我们永远不会在生产代码中看到它。它的主要优点是能够提前时钟,模拟任意流逝的时间:

List<String> letters = Arrays.asList("A", "B", "C");
TestScheduler scheduler = Schedulers.test();
TestSubscriber<String> subscriber = new TestSubscriber<>();

Observable<Long> tick = Observable
    .interval(1, TimeUnit.SECONDS, scheduler);

Observable.from(letters)
    .zipWith(tick, (string, index) -> index + "-" + string)
    .subscribeOn(scheduler)
    .subscribe(subscriber);

subscriber.assertNoValues();
subscriber.assertNotCompleted();

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValues("0-A");

scheduler.advanceTimeTo(3, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(3);
assertThat(
    subscriber.getOnNextEvents(), 
    hasItems("0-A", "1-B", "2-C"));

10. 默认调度器

RxJava中的一些Observable运算符具有替代形式,允许我们设置运算符将用于其操作的调度器。其他人不在任何特定的Scheduler上运行或在特定的默认Scheduler上运行。

例如,delay运算符获取上游事件并在给定时间后将它们推送到下游。显然,它不能在那段时间保持原来的线程,所以它必须使用不同的Scheduler:

ExecutorService poolA = newFixedThreadPool(10, threadFactory("Sched1-"));
Scheduler schedulerA = Schedulers.from(poolA);
Observable.just('A', 'B')
    .delay(1, TimeUnit.SECONDS, schedulerA)
    .subscribe(i -> result+= Thread.currentThread().getName() + i + " ");

Thread.sleep(2000);
Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

在不提供自定义schedulerA的情况下,delay以下的所有运算符都将使用computation调度器。

其他支持自定义调度器的重要运算符有buffer、interval、range、timer、skip、take、timeout等。如果我们不为此类运算符提供调度器,则会使用computation调度器,这在大多数情况下是安全的默认设置。

11. 总结

在真正的响应式应用程序中,所有长时间运行的操作都是异步的,因此需要很少的线程和调度器。

掌握调度器对于使用RxJava编写可扩展且安全的代码至关重要。subscribeOn和observeOn之间的区别在高负载下尤其重要,因为每个任务都必须在我们期望的时候精确执行。

最后但同样重要的是,我们必须确保下游使用的调度器能够跟上上游调度器产生的负载。有关更多信息,请参阅这篇关于背压的文章。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章