在Spring Integration中使用子流

2023/05/13

1. 概述

Spring Integration使得使用一些企业集成模式变得容易。其中一种方式是通过其DSL

在本教程中,我们将了解DSL对子流的支持,以简化我们的一些配置。

2. 我们的任务

假设我们有一个整数序列,我们想将其分成三个不同的存储桶。

如果我们想使用Spring Integration来做到这一点,我们可以从创建三个输出通道开始:

  • 像0、3、6和9这样的数字将转到multipleOfThreeChannel
  • 像1、4、7和10这样的数字将转到remainderIsOneChannel
  • 像2、5、8和11这样的数字会转到remainderIsTwoChannel

要了解子流有多大用处,让我们从不使用子流的情况开始。

然后,我们将使用子流来简化我们的配置:

  • publishSubscribeChannel
  • routeToRecipients
  • Filters,配置我们的if-then逻辑
  • Routers,配置我们的switch逻辑

3. 先决条件

在配置我们的子流之前,让我们创建这些输出通道。

我们将创建这些QueueChannels,因为它更容易演示:

@EnableIntegration
@IntegrationComponentScan
public class SubflowsConfiguration {

    @Bean
    QueueChannel multipleOfThreeChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsOneChannel() {
        return new QueueChannel();
    }

    @Bean
    QueueChannel remainderIsTwoChannel() {
        return new QueueChannel();
    }

    boolean isMultipleOfThree(Integer number) {
        return number % 3 == 0;
    }

    boolean isRemainderIOne(Integer number) {
        return number % 3 == 1;
    }

    boolean isRemainderTwo(Integer number) {
        return number % 3 == 2;
    }
}

最终,这些是我们分组的数字将结束的地方。

另请注意,Spring Integration很容易开始看起来很复杂,因此为了可读性,我们将添加一些辅助方法。

4. 无子流求解

现在我们需要定义我们的流程。

如果没有子流,简单的想法是定义三个独立的集成流,每种类型的数字一个。

我们将向每个IntegrationFlow组件发送相同的消息序列,但每个组件的输出消息将不同

4.1 定义IntegrationFlow组件

首先,让我们在SubflowConfiguration类中定义每个IntegrationFlow bean:

@Bean
public IntegrationFlow multipleOfThreeFlow() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree)
        .channel("multipleOfThreeChannel");
}

我们的流程包含两个端点-一个Splitter后跟一个Filter。

过滤器就像它听起来的那样。但是为什么我们还需要分离器呢?我们马上就会看到这一点,但基本上,它将输入Collection拆分为单独的消息。

而且,我们当然可以用相同的方式再定义两个IntegrationFlow bean。

4.2 消息网关

对于每个流,我们还需要一个消息网关。

简而言之,这些从调用者那里抽象出Spring Integration Messages API,类似于REST服务如何抽象出HTTP:

@MessagingGateway
public interface NumbersClassifier {

    @Gateway(requestChannel = "multipleOfThreeFlow.input")
    void multipleOfThree(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsOneFlow.input")
    void remainderIsOne(Collection<Integer> numbers);

    @Gateway(requestChannel = "remainderIsTwoFlow.input")
    void remainderIsTwo(Collection<Integer> numbers);
}

对于每个,我们需要使用@Gateway注解并指定输入通道的隐式名称,这只是bean的名称后跟“.input”。请注意,我们可以使用此约定,因为我们使用的是基于lambda的流程

这些方法是我们流程的入口点

4.3 发送消息和检查输出

现在,让我们测试一下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { SeparateFlowsConfiguration.class })
public class SeparateFlowsUnitTest {

    @Autowired
    private QueueChannel multipleOfThreeChannel;

    @Autowired
    private NumbersClassifier numbersClassifier;

    @Test
    public void whenSendMessagesToMultipleOf3Flow_thenOutputMultiplesOf3() {
        numbersClassifier.multipleOfThree(Arrays.asList(1, 2, 3, 4, 5, 6));
        Message<?> outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 3);
        outMessage = multipleOfThreeChannel.receive(0);
        assertEquals(outMessage.getPayload(), 6);
        outMessage = multipleOfThreeChannel.receive(0);
        assertNull(outMessage);
    }
}

请注意,我们已将消息作为List发送,这就是我们需要拆分器的原因,以获取单个“列表消息”并将其转换为多个“数字消息”。

我们使用o调用receive以获取下一条可用消息而无需等待。由于我们的列表中有两个三的倍数,我们希望能够调用它两次。第三次调用receive返回null。

当然,receive返回一个Message,所以我们调用getPayload来提取数字。

同样,我们可以对其他两个做同样的事情。

所以,这就是没有子流程的解决方案。我们要维护三个独立的流程和三个独立的网关方法。

我们现在要做的是用一个bean替换三个IntegrationFlow bean,用一个bean替换三个网关方法。

5. 使用publishSubscribeChannel

publishSubscribeChannel()方法将消息广播到所有订阅子流。这样,我们可以创建一个流,而不是三个。

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .publishSubscribeChannel(subscription -> 
            subscription
                .subscribe(subflow -> subflow
                	.<Integer> filter(this::isMultipleOfThree)
                    .channel("multipleOfThreeChannel"))
                .subscribe(subflow -> subflow
                    .<Integer> filter(this::isRemainderOne)
                    .channel("remainderIsOneChannel"))
                .subscribe(subflow -> subflow
                    .<Integer> filter(this::isRemainderTwo)
                    .channel("remainderIsTwoChannel")));
}

通过这种方式,子流是匿名的,这意味着它们不能被独立寻址

现在,我们只有一个流程,所以让我们也编辑我们的NumbersClassifier:

@Gateway(requestChannel = "classify.input")
void classify(Collection<Integer> numbers);

现在,由于我们只有一个IntegrationFlow bean和一个网关方法,所以我们只需要发送一次我们的列表

@Test
public void whenSendMessagesToFlow_thenNumbersAreClassified() {
    numbersClassifier.classify(Arrays.asList(1, 2, 3, 4, 5, 6));

    // same assertions as before
}

请注意,从现在开始,只有集成流定义会发生变化,因此我们不会再次显示测试。

6. 使用routeToRecipients

实现相同目的的另一种方法是routeToRecipients,这很好,因为它内置了过滤功能

使用这种方法,我们可以指定广播的通道和子流

6.1 recipient

在下面的代码中,我们将根据条件指定multipleof3Channel、remainderIs1Channel和remainderIsTwoChannel作为接收者:

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .routeToRecipients(route -> route
            .<Integer> recipient("multipleOfThreeChannel", this::isMultipleOfThree)       
            .<Integer> recipient("remainderIsOneChannel", this::isRemainderOne)
            .<Integer> recipient("remainderIsTwoChannel", this::isRemainderTwo));
}

我们也可以无条件调用recipient,routeToRecipients将无条件地发布到该目的地。

6.2 recipientFlow

请注意routeToRecipients允许我们定义一个完整的流程,就像publishSubscribeChannel一样。

让我们修改上面的代码并指定一个匿名子流作为第一个接收者

.routeToRecipients(route -> route
    .recipientFlow(subflow -> subflow
        .<Integer> filter(this::isMultipleOfThree)
        .channel("mutipleOfThreeChannel"))
    ...);

此子流将接收整个消息序列,因此我们需要像以前一样进行过滤以获得相同的行为。

同样,一个IntegrationFlow bean对我们来说就足够了

现在让我们继续讨论if-else组件。其中之一是Filter。

7. 使用if-then流

我们已经在前面的所有示例中使用了Filter。好消息是我们不仅可以指定进一步处理的条件,还可以指定丢弃消息的通道或流

我们可以将丢弃流和通道视为else块

@Bean
public IntegrationFlow classify() {
    return flow -> flow.split()
        .<Integer> filter(this::isMultipleOfThree, 
            notMultiple -> notMultiple
                .discardFlow(oneflow -> oneflow
                    .<Integer> filter(this::isRemainderOne,
                        twoflow -> twoflow
                    .discardChannel("remainderIsTwoChannel"))
                .channel("remainderIsOneChannel"))
            .channel("multipleofThreeChannel");
}

在这种情况下,我们已经实现了if-else路由逻辑:

  • 如果数字不是3的倍数,则将这些消息丢弃到丢弃流;我们在这里使用流,因为需要更多的逻辑来知道它的目标通道
  • 在丢弃流程中,如果数字不是余数1,则将这些消息丢弃到丢弃通道。

8. switch-ing计算值

最后,让我们试试route方法,它为我们提供了比routeToRecipients更多的控制。这很好,因为Router可以将流拆分成任意数量的部分,而Filter只能执行两部分。

8.1 channelMapping

让我们定义我们的IntegrationFlow bean:

@Bean
public IntegrationFlow classify() {
    return classify -> classify.split()
        .<Integer, Integer> route(number -> number % 3, mapping -> mapping
            .channelMapping(0, "multipleOfThreeChannel")
            .channelMapping(1, "remainderIsOneChannel")
            .channelMapping(2, "remainderIsTwoChannel"));
}

在上面的代码中,我们通过执行除法来计算路由键:

route(p -> p % 3, ...

基于此键,我们路由消息:

channelMapping(0, "multipleof3Channel")

8.2 subFlowMapping

我们可以通过指定子流来进行更多控制,将channelMapping替换为subFlowMapping:

.subFlowMapping(1, subflow -> subflow.channel("remainderIsOneChannel"))

或者通过调用handle方法而不是channel方法来进行更多控制:

.subFlowMapping(2, subflow -> subflow
    .<Integer> handle((payload, headers) -> {
        // do extra work on the payload
        return payload;
    }))).channel("remainderIsTwoChannel");

在这种情况下,子流将在route()方法之后返回到主流,因此我们需要指定通道remainderIsTwoChannel。

9. 总结

在本教程中,我们探讨了如何使用子流以某些方式过滤和路由消息。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章