如何访问Flux的第一个元素

2023/05/13

1. 概述

在本教程中,我们将探索使用Spring 5 WebFlux访问Flux的第一个元素的各种方法。

首先,我们将使用API的非阻塞方法,例如next()和take()。之后,我们将看到如何在elementAt()方法的帮助下实现相同的目标,我们需要在其中指定索引。

最后,我们将了解API的阻塞方法,我们将使用blockFirst()来访问flux的第一个元素。

2. 测试设置

对于本文中的代码示例,我们将使用Payment类,该类只有一个字段,即付款金额amount:

public class Payment {
    private final int amount;
    // constructor and getter
}

在测试中,我们将使用名为fluxOfThreePayments的测试工具方法法来构造一个Payment流:

private Flux<Payment> fluxOfThreePayments() {
    return Flux.just(paymentOf100, new Payment(200), new Payment(300));
}

之后,我们将使用Spring Reactor的StepVerifier来测试结果。

3. next()

首先,让我们尝试next()方法。此方法将返回flux的第一个元素,包装到响应式Mono类型中:

@Test
void givenAPaymentFlux_whenUsingNext_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().next();

    StepVerifier.create(firstPayment)
        .expectNext(paymentOf100)
        .verifyComplete();
}

另一方面,如果我们在一个空的flux上调用next(),结果将是一个空的Mono。因此,阻塞它将返回null:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.next();

    StepVerifier.create(firstPayment)
        .verifyComplete();
}

4. take()

响应式flux的take()方法等同于Java 8 Streams的limit(),换句话说,我们可以使用take(1)将flux限制为恰好一个元素,然后以阻塞或非阻塞的方式使用它:

@Test
void givenAPaymentFlux_whenUsingTake_thenGetTheFirstPaymentAsFlux() {
    Flux<Payment> firstPaymentFlux = fluxOfThreePayments().take(1);

    StepVerifier.create(firstPaymentFlux)
        .expectNext(paymentOf100)
        .verifyComplete();
}

类似地,对于空(empty)flux,take(1)将返回空flux:

@Test
void givenAEmptyFlux_whenUsingNext_thenGetAnEmptyFlux() {
    Flux<Payment> emptyFlux = Flux.empty();

    Flux<Payment> firstPaymentFlux = emptyFlux.take(1);

    StepVerifier.create(firstPaymentFlux)
        .verifyComplete();
}

5. elementAt()

Flux API还提供了elementAt()方法,我们可以使用elementAt(0)以非阻塞方式获取flux的第一个元素:

@Test
void givenAPaymentFlux_whenUsingElementAt_thenGetTheFirstPaymentAsMono() {
    Mono<Payment> firstPayment = fluxOfThreePayments().elementAt(0);

    StepVerifier.create(firstPayment)
        .expectNext(paymentOf100)
        .verifyComplete();
}

但是,如果作为参数传递的索引大于flux发出的元素数,则会发出错误:

@Test
void givenAEmptyFlux_whenUsingElementAt_thenGetAnEmptyMono() {
    Flux<Payment> emptyFlux = Flux.empty();

    Mono<Payment> firstPayment = emptyFlux.elementAt(0);

    StepVerifier.create(firstPayment)
        .expectError(IndexOutOfBoundsException.class);
}

6. blockFirst()

或者,我们也可以使用blockFirst()。顾名思义,这是一种阻塞方法。因此,如果我们使用blockFirst(),我们将离开响应式世界,并将失去它的所有好处:

@Test
void givenAPaymentFlux_whenUsingBlockFirst_thenGetTheFirstPayment() {
    Payment firstPayment = fluxOfThreePayments().blockFirst();

    assertThat(firstPayment).isEqualTo(paymentOf100);
}

7. toStream()

最后,我们可以将flux转换为Java 8流,然后访问第一个元素:

@Test
void givenAPaymentFlux_whenUsingToStream_thenGetTheFirstPaymentAsOptional() {
    Optional<Payment> firstPayment = fluxOfThreePayments().toStream()
        .findFirst();

    assertThat(firstPayment).contains(paymentOf100);
}

但是,再一次,如果我们这样做,我们将无法继续使用响应式管道

8. 总结

在本文中,我们讨论了Java的响应流API。我们已经看到了访问Flux的第一个元素的各种方法,并且我们了解到如果我们想使用响应式管道,我们应该坚持使用非阻塞解决方案。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章