RxJava Observables中的concat()与merge()运算符

2025/04/05

1. 概述

concat()和merge()是两个强大的运算符,用于在RxJava中组合多个Observable实例。

concat()按顺序从每个Observable发出元素,等待每个元素完成后再移动到下一个,而merge()在生成所有Observable实例时同时发出它们。

在本教程中,我们将探讨concat()和merge()显示相似和不同行为的场景

2. 同步源

当两个源都是同步的时,concat()和merge()运算符的行为完全相同,让我们模拟这种情况以更好地理解它。

2.1 场景设置

让我们首先使用Observable.just()工厂方法创建三个同步源

Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4, 5, 6);
Observable<Integer> observable3 = Observable.just(7, 8, 9);

此外,让我们创建两个订阅者,即testSubscriberForConcat和testSubscriberForMerge:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

2.2 concat()和merge()

首先,让我们应用concat()运算符并使用testSubscriberForConcat订阅生成的Observable:

Observable.concat(observable1, observable2, observable3)
    .subscribe(testSubscriberForConcat);

进一步,让我们验证一下发射是否按顺序进行,其中observable1中的元素出现在observable2之前,而observable2出现在observable3之前

testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);

类似地,我们可以应用merge()运算符并使用testSubscriberForMerge订阅结果:

Observable.merge(observable1, observable2, observable3).subscribe(testSubscriberForMerge);

接下来,让我们验证通过merge发出的元素是否遵循与concat相同的顺序:

testSubscriberForMerge.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9);

最后,我们必须注意,同步Observable实例会立即发出所有元素,然后发出完成信号。此外,每个Observable都会在下一个Observable开始之前完成其发射。因此,两个运算符都会按顺序处理每个Observable,从而产生相同的输出。

因此,无论源是同步还是异步的,一般规则是,如果我们需要保持源的发射顺序,则应使用concat()。另一方面,如果我们想要合并从多个源发射的元素,则应使用merge()。

3. 可预测的异步源

在本节中,让我们模拟一个具有异步源的场景,其中发射顺序是可预测的。

3.1 场景设置

让我们创建两个异步源,即observable1和observable2:

Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    .map(i -> i.intValue() + 1)
    .take(3);

Observable<Integer> observable2 = Observable.interval(30, TimeUnit.MILLISECONDS)
    .map(i -> i.intValue() + 4)
    .take(7);

我们必须注意到observable1的发射分别在100ms、200ms和300ms后到达。另一方面,observable2的发射以30ms的间隔到达。

现在,让我们创建testSubscriberForConcat和testSubscriberForMerge:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

3.2 concat()与merge()

首先,让我们应用concat()运算符并使用testSubscribeForConcat调用subscribe():

Observable.concat(observable1, observable2)
    .subscribe(testSubscriberForConcat);

接下来,我们必须调用awaitTerminalEvent()方法来确保收到所有发射

testSubscriberForConcat.awaitTerminalEvent();

现在,我们可以验证结果包含来自observable1的所有元素以及来自observable2的所有元素:

testSubscriber.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

此外,让我们应用merge()运算符并使用testSubscriberForMerge调用subscribe():

Observable.merge(observable1, observable2)
  .subscribe(testSubscriberForMerge);

最后,让我们等待发射并检查发射的值:

testSubscriberForMerge.awaitTerminalEvent();
testSubscriberForMerge.assertValues(4, 5, 6, 1, 7, 8, 9, 2, 10, 3);

结果包含按照observer1和observer2的实际发射顺序交错在一起的所有元素

4. 具有竞争条件的异步源

在本节中,我们将模拟具有两个异步源的场景,其中组合发射的顺序相当难以预测。

4.1 场景设置

首先,让我们创建两个具有完全相同延迟的异步源

Observable<Integer> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS)
    .map(i -> i.intValue() + 1)
    .take(3);

Observable<Integer> observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
    .map(i -> i.intValue() + 4)
    .take(3);

我们知道每个源的一次发射分别在100ms、200ms和300ms后到达。但是,由于竞争条件,我们无法预测确切的顺序

接下来,让我们创建两个测试订阅者:

TestSubscriber<Integer> testSubscriberForConcat = new TestSubscriber<>();
TestSubscriber<Integer> testSubscriberForMerge = new TestSubscriber<>();

4.2 concat()与merge()

首先,让我们应用concat()运算符,然后订阅testSubscribeForConcat:

Observable.concat(observable1, observable2)
    .subscribe(testSubscriberForConcat);
testSubscriberForConcat.awaitTerminalEvent();

现在,让我们验证concat()运算符的结果是否保持不变

testSubscriberForConcat.assertValues(1, 2, 3, 4, 5, 6);

此外,让我们使用testSubscriberForMerge进行merge()和订阅:

Observable.merge(observable1, observable2)
    .subscribe(testSubscriberForMerge);
testSubscriberForMerge.awaitTerminalEvent();

接下来,让我们将所有发射累积到一个列表中,并验证它是否包含所有值

List<Integer> actual = testSubscriberForMerge.getOnNextEvents();
List<Integer> expected = Arrays.asList(1, 2, 3, 4, 5, 6);
assertTrue(actual.containsAll(expected) && expected.containsAll(actual));

最后,让我们记录发射以观察其实际作用:

21:05:43.252 [main] INFO actual emissions: [4, 1, 2, 5, 3, 6]

我们可能针对不同的运行得到不同的顺序。

5. 总结

在本文中,我们了解了RxJava中的concat()和merge()运算符如何处理同步和异步数据源。此外,我们比较了涉及可预测和不可预测发射模式的场景,强调了这两个运算符之间的差异。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章