开始使用Spring Cloud Data Flow进行流处理

2023/05/13

1. 简介

Spring Cloud Data Flow是一种用于可组合数据微服务的云原生编程和操作模型。

借助Spring Cloud Data Flow,开发人员可以为常见用例(例如数据摄取、实时分析和数据导入/导出)创建和编排数据管道。

这种数据管道有两种形式,流式处理管道和批处理数据管道。

在第一种情况下,无限量的数据通过消息传递中间件被消费或产生。而在第二种情况下,短期任务处理一组有限的数据,然后终止。

本文将重点介绍流式处理。

2. 架构概述

这些类型的架构的关键组件是应用程序、数据流服务器和目标运行时。

此外,除了这些关键组件之外,我们通常在体系结构中还有一个Data Flow Shell和一个消息代理。

让我们更详细地了解所有这些组件。

2.1 应用程序

通常,流数据管道包括消费来自外部系统的事件、数据处理和多语言持久性。这些阶段在Spring Cloud术语中通常称为Source、Processor和Sink:

  • Source:是消费事件的应用程序
  • Processor:消费来自Source的数据,对其进行一些处理,并将处理后的数据发送到管道中的下一个应用程序
  • Sink:从Source或Processor消费并将数据写入所需的持久层

这些应用程序可以通过两种方式打包:

  • 托管在Maven仓库、文件、http或任何其他Spring资源实现中的Spring Boot uber-jar(本文将使用此方法)
  • Docker

Spring Cloud Data Flow团队已经提供了许多用于常见用例(例如jdbc、hdfs、http、router)的源、处理器和接收器应用程序,并可供使用。

2.2 运行时

此外,这些应用程序的执行需要运行时。支持的运行时包括:

  • Cloud Foundry
  • Apache YARN
  • Kubernetes
  • Apache Mesos
  • 用于开发Local Server(将在本文中使用)

2.3 Data Flow Server

负责将应用程序部署到运行时的组件是数据流服务器。为每个目标运行时提供了一个数据流服务器可执行jar。

数据流服务器负责解释:

  • 描述通过多个应用程序的数据逻辑流的流DSL
  • 描述应用程序到运行时的映射的部署清单

2.4 Data Flow Shell

Data Flow Shell是Data Flow Server的客户端。shell允许我们执行与服务器交互所需的DSL命令。

例如,描述从http源到jdbc接收器的数据流的DSL将写为“http jdbc”。DSL中的这些名称在数据流服务器中注册,并映射到可以托管在Maven或Docker仓库中的应用程序工件。

Spring还提供了一个名为Flo的图形界面,用于创建和监控流数据管道。但是,它的使用不在本文的讨论范围之内。

2.5 消息代理

正如我们在上一节的示例中看到的,我们在数据流的定义中使用了管道符号。管道符号表示两个应用程序之间通过消息传递中间件进行的通信。

这意味着我们需要一个在目标环境中启动并运行的消息代理。

支持的两个消息传递中间件代理是:

  • Apache Kafka
  • RabbitMQ

因此,现在我们已经对架构组件有了一个概述-是时候构建我们的第一个流处理管道了

3. 安装消息代理

正如我们所看到的,管道中的应用程序需要一个消息传递中间件来进行通信。出于本文的目的,我们将使用RabbitMQ。

有关安装的完整详细信息,你可以按照官方网站上的说明进行操作。

4. 本地数据流服务器

为了加快生成应用程序的过程,我们将使用Spring Initializr;在它的帮助下,我们可以在几分钟内获得我们的Spring Boot应用程序。

到达网站后,只需选择一个Group和一个Artifact名称。

完成此操作后,单击“Generate Project”按钮开始下载Maven工件。

下载完成后,解压项目并将其作为Maven项目导入到你选择的IDE中。

让我们向项目添加一个Maven依赖项。由于我们需要数据流本地服务器库,因此让我们添加spring-cloud-starter-dataflow-server-local依赖项:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>

现在我们需要用@EnableDataFlowServer注解来标注Spring Boot主类:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowServerApplication.class, args);
    }
}

就这样。我们的本地数据流服务器已准备好执行:

mvn spring-boot:run

该应用程序将在端口9393上启动。

5. Data Flow Shell

再次,转到Spring Initializr并选择一个Group和Artifact名称。

下载并导入项目后,让我们添加一个spring-cloud-dataflow-shell依赖项:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
</dependency>

现在我们需要在Spring Boot主类中添加@EnableDataFlowShell注解:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

我们现在可以运行shell:

mvn spring-boot:run

shell运行后,我们可以在提示符中键入help命令以查看我们可以执行的命令的完整列表。

6. 源应用程序

同样,在Spring Initializr上,我们现在将创建一个简单的应用程序并添加一个名为spring-cloud-starter-stream-rabbit的Stream Rabbit依赖项:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

然后我们将@EnableBinding(Source.class)注解添加到Spring Boot主类:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowTimeSourceApplication.class, args);
    }
}

现在我们需要定义必须处理的数据源。此源可能是任何潜在的无穷无尽的工作负载(物联网传感器数据、24/7事件处理、在线交易数据摄取)。

在我们的示例应用程序中,我们使用Poller每10秒生成一个事件(为简单起见,使用new的时间戳)

@InboundChannelAdapter注解将消息发送到源的输出通道,使用返回值作为消息的有效负载:

@Bean
@InboundChannelAdapter(
    value = Source.OUTPUT, 
    poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

我们的数据源已准备就绪。

7. 处理器应用程序

接下来,我们将创建一个应用程序并添加Stream Rabbit依赖项。

然后我们将@EnableBinding(Processor.class)注解添加到Spring Boot主类:

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowTimeProcessorApplication.class, args);
    }
}

接下来,我们需要定义一个方法来处理来自源应用程序的数据。

要定义一个转换器,我们需要用@Transformer注解来标注这个方法:

@Transformer(inputChannel = Processor.INPUT, 
    outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

它将时间戳从“输入”通道转换为格式化日期,该日期将发送到“输出”通道。

8. 接收器应用程序

最后一个要创建的应用程序是接收器应用程序

再次,转到Spring Initializr并选择一个Group,一个Artifact名称。下载项目后,让我们添加Stream Rabbit依赖项。

然后在Spring Boot主类中添加@EnableBinding(Sink.class)注解:

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowLoggingSinkApplication.class, args);
    }
}

现在我们需要一种方法来拦截来自处理器应用程序的消息。

为此,我们需要将@StreamListener(Sink.INPUT)注解添加到我们的方法中:

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

该方法只是将以格式化日期形式转换的时间戳打印到日志文件中。

9. 注册一个Stream App

Spring Cloud Data Flow Shell允许我们使用app register命令向App Registry注册一个Stream App。

我们必须提供一个唯一的名称、应用程序类型和一个可以解析为应用工件的URI。对于类型,指定“source”、“processor ”或“sink”。

使用Maven方案提供URI时,格式应符合以下内容:

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>

要注册之前创建的Source、Processor和Sink应用程序,请转到Spring Cloud Data Flow Shell并从提示符执行以下命令:

app register --name time-source --type source 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:1.0.0

app register --name time-processor --type processor 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:1.0.0

app register --name logging-sink --type sink 
  --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:1.0.0

10. 创建和部署流

要创建新的流定义,请转到Spring Cloud Data Flow Shell并执行以下shell命令:

stream create --name time-to-log 
  --definition 'time-source | time-processor | logging-sink'
这根据DSL表达式’time-source time-processor logging-sink’定义了一个名为time-to-log的流。

然后,要部署流,请执行以下shell命令:

stream deploy --name time-to-log

数据流服务器将time-source、time-processor和logging-sink解析为Maven坐标,并使用这些坐标来启动流的time-source、time-processor和logging-sink应用程序。

如果流已正确部署,你将在数据流服务器日志中看到模块已启动并绑定在一起:

2016-08-24 12:29:10.516  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink
2016-08-24 12:29:17.600  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-processor instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor
2016-08-24 12:29:23.280  INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time-source instance 0
   Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. 查看结果

在此示例中,源只是每秒将当前时间戳作为消息发送,处理器对其进行格式化,日志接收器使用日志记录框架输出格式化的时间戳。

日志文件位于数据流服务器日志输出中显示的目录中,如上所示。要查看结果,我们可以跟踪日志:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log
2016-08-24 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01
2016-08-24 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11
2016-08-24 12:41:02.030  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. 总结

在本文中,我们了解了如何通过使用Spring Cloud Data Flow构建用于流处理的数据管道。

此外,我们还看到了源、处理器和接收器应用程序在流中的作用,以及如何通过使用Data Flow Shell将此模块插入和绑定到数据流服务器中。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章