RxJava的RxRelay简介

2023/05/10

1. 简介

RxJava的普及促进创建了多个扩展其功能的第三方库。

其中许多库都是对开发人员在使用RxJava时遇到的典型问题的答案,RxRelay就是这些解决方案之一。

2. 处理Subject

简单地说,Subject充当Observable和Observer之间的桥梁。由于它是一个Observer,它可以订阅一个或多个Observables并从它们接收事件。

此外,鉴于它同时是一个Observable,它可以重新发送事件或向其订阅者发送新事件。有关Subject的更多信息,请参见本文

Subject的问题之一是在它收到onComplete()或onError()之后-它不再能够移动数据。有时这是期望的行为,但有时不是。

在不需要这种行为的情况下,我们应该考虑使用RxRelay。

3. Relay

一个Relay基本上是一个Subject,但是没有调用onComplete()和onError()的能力,因此它能够不断地发射数据

这使我们能够在不同类型的API之间创建桥梁,而不必担心意外触发终止状态。

要使用RxRelay,我们需要将以下依赖项添加到我们的项目中:

<dependency>
    <groupId>com.jakewharton.rxrelay2</groupId>
    <artifactId>rxrelay</artifactId>
    <version>1.2.0</version>
</dependency>

4. Relay的类型

库中提供了三种不同类型的Relay,我们将在这里快速探索所有这三个。

4.1 PublishRelay

一旦观察者订阅了这种类型的Relay,它就会重新发送所有事件。

事件将发送给所有订阅者:

public void whenObserverSubscribedToPublishRelay_itReceivesEmittedEvents() {
    PublishRelay<Integer> publishRelay = PublishRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    
    publishRelay.subscribe(firstObserver);
    firstObserver.assertSubscribed();
    publishRelay.accept(5);
    publishRelay.accept(10);
    publishRelay.subscribe(secondObserver);
    secondObserver.assertSubscribed();
    publishRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    
    // second receives only the last event
    secondObserver.assertValue(15);
}

在这种情况下没有事件缓冲,因此这种行为类似于冷Observable。

4.2 BehaviorRelay

一旦观察者订阅,这种类型的Relay将重新发送最近观察到的事件和所有后续事件:

public void whenObserverSubscribedToBehaviorRelay_itReceivesEmittedEvents() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    behaviorRelay.accept(5);     
    behaviorRelay.subscribe(firstObserver);
    behaviorRelay.accept(10);
    behaviorRelay.subscribe(secondObserver);
    behaviorRelay.accept(15);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(10, 15);
}

当我们创建BehaviorRelay时,我们可以指定默认值,如果没有其他事件可发出,则默认值将被发出。

要指定默认值,我们可以使用createDefault()方法:

public void whenObserverSubscribedToBehaviorRelay_itReceivesDefaultValue() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.createDefault(1);
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertValue(1);
}

如果我们不想指定默认值,我们可以使用create()方法:

public void whenObserverSubscribedToBehaviorRelayWithoutDefaultValue_itIsEmpty() {
    BehaviorRelay<Integer> behaviorRelay = BehaviorRelay.create();
    TestObserver<Integer> firstObserver = new TestObserver<>();
    behaviorRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

4.3 ReplayRelay

这种类型的Relay缓冲它接收到的所有事件,然后将其重新发送给所有订阅它的订阅者:

public void whenObserverSubscribedToReplayRelay_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.create();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    replayRelay.subscribe(firstObserver);
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.subscribe(secondObserver);
    firstObserver.assertValues(5, 10, 15);
    secondObserver.assertValues(5, 10, 15);
}

所有元素都被缓冲并且所有订阅者都会收到相同的事件,因此这种行为类似于冷Observable

当我们创建ReplayRelay时,我们可以为事件提供最大缓冲区大小和生存时间。

要创建具有有限缓冲区大小的Relay,我们可以使用createWithSize()方法。当要缓冲的事件多于设置的缓冲区大小时,前面的元素将被丢弃:

public void whenObserverSubscribedToReplayRelayWithLimitedSize_itReceivesEmittedEvents() {
    ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertValues(15, 20);
}

我们还可以使用createWithTime()方法创建ReplayRelay,为缓冲事件留出最大时间:

public void whenObserverSubscribedToReplayRelayWithMaxAge_thenItReceivesEmittedEvents() {
    SingleScheduler scheduler = new SingleScheduler();
    ReplayRelay<Integer> replayRelay =
        ReplayRelay.createWithTime(2000, TimeUnit.MILLISECONDS, scheduler);
    long current =  scheduler.now(TimeUnit.MILLISECONDS);
    TestObserver<Integer> firstObserver = TestObserver.create();
    replayRelay.accept(5);
    replayRelay.accept(10);
    replayRelay.accept(15);
    replayRelay.accept(20);
    Thread.sleep(3000);
    replayRelay.subscribe(firstObserver);
    firstObserver.assertEmpty();
}

5. 自定义Relay

上面描述的所有类型都扩展了公共抽象类Relay,这使我们能够编写自己的自定义Relay类。

要创建自定义Relay,我们需要实现三个方法:accept()、hasObservers()和subscribeActual()。

让我们编写一个简单的Relay,它将向随机选择的订阅者之一重新发送事件:

public class RandomRelay extends Relay<Integer> {
    Random random = new Random();

    List<Observer<? super Integer>> observers = new ArrayList<>();

    @Override
    public void accept(Integer integer) {
        int observerIndex = random.nextInt() % observers.size();
        observers.get(observerIndex).onNext(integer);
    }

    @Override
    public boolean hasObservers() {
        return observers.isEmpty();
    }

    @Override
    protected void subscribeActual(Observer<? super Integer> observer) {
        observers.add(observer);
        observer.onSubscribe(Disposables.fromRunnable(
              () -> System.out.println("Disposed")));
    }
}

现在,我们可以测试只有一个订阅者会收到事件:

public void whenTwoObserversSubscribedToRandomRelay_thenOnlyOneReceivesEvent() {
    RandomRelay randomRelay = new RandomRelay();
    TestObserver<Integer> firstObserver = TestObserver.create();
    TestObserver<Integer> secondObserver = TestObserver.create();
    randomRelay.subscribe(firstObserver);
    randomRelay.subscribe(secondObserver);
    randomRelay.accept(5);
    if(firstObserver.values().isEmpty()) {
        secondObserver.assertValue(5);
    } else {
        firstObserver.assertValue(5);
        secondObserver.assertEmpty();
    }
}

6. 总结

在本教程中,我们了解了RxRelay,它是一种类似于Subject但无法触发终端状态的类型。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章