1. 简介
在本文中,我们将演示如何使用Spring Cloud App Starters-它提供了引导式和现成的应用程序,可以作为未来开发的起点。
简而言之,Task App Starters专用于数据库迁移和分布式测试等用例,而Stream App Starters提供与外部系统的集成。
总体而言,有超过55个Starters;在此处和此处查看官方文档以获取有关这两者的更多信息。
接下来,我们将构建一个小型分布式Twitter应用程序,它将Twitter帖子流式传输到Hadoop分布式文件系统中。
2. 开始设置
我们将使用consumer-key和access-token来创建一个简单的Twitter应用程序。
然后,我们设置Hadoop,以便为未来的大数据目的保留我们的Twitter流。
最后,我们可以选择使用提供的Spring GitHub仓库使用Maven编译和组装sources–processors-sinks架构模式的独立组件,或者通过它们的Spring Stream绑定接口组合sources、processors和sinks。
我们将介绍执行此操作的两种方法。
值得注意的是,以前,所有Stream App Starters都被整理到一个大型仓库中,位于github.com/spring-cloud/spring-cloud-stream-app-starters。每个Starter都经过简化和隔离。
3. 推特凭证
首先,让我们设置我们的Twitter开发人员凭据。要获取Twitter开发人员凭据,请按照官方Twitter开发人员文档中的步骤设置应用程序并创建访问令牌。
具体来说,我们需要:
- Consumer Key
- Consumer Key Secret
- Access Token Secret
- Access Token
确保保持该窗口打开或记下它们,因为我们将在下面使用它们!
4. 安装Hadoop
接下来,让我们安装Hadoop!我们可以按照官方文档或简单地利用Docker:
$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1
5. 编译我们的App Starters
要使用独立的、完全独立的组件,我们可以从他们的GitHub仓库中单独下载和编译所需的Spring Cloud Stream App Starters。
5.1 Twitter Spring Cloud Stream App Starter
让我们将Twitter Spring Cloud Stream App Starter(org.springframework.cloud.stream.app.twitterstream.source)添加到我们的项目中:
git clone https://github.com/spring-cloud-stream-app-starters/twitter.git
然后,我们运行Maven:
./mvnw clean install -PgenerateApps
生成的已编译Starter App将在本地项目根目录的“/target”中可用。
然后我们可以运行编译后的.jar并传入相关的应用程序属性,如下所示:
java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
--accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>
我们还可以使用熟悉的Spring application.properties传递我们的凭据:
twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...
5.2 HDFS Spring Cloud Stream App Starter
现在(已经设置好Hadoop),让我们将HDFS Spring Cloud Stream App Starter(org.springframework.cloud.stream.app.hdfs.sink)依赖项添加到我们的项目中。
首先,克隆相关的仓库:
git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git
然后,运行Maven作业:
./mvnw clean install -PgenerateApps
生成的已编译Starter App将在本地项目根目录的“/target”中可用。然后我们可以运行编译后的.jar并传入相关的应用程序属性:
java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/
‘hdfs://127.0.0.1:50010/’是Hadoop的默认端口,但你的默认HDFS端口可能会有所不同,具体取决于你配置实例的方式。
根据我们之前传入的配置,我们可以在“http://0.0.0.0:50070”看到数据节点列表(及其当前端口) 。
我们还可以在编译之前使用熟悉的Spring application.properties传递我们的凭据-因此我们不必总是通过CLI传递这些凭据。
让我们将application.properties配置为使用默认的Hadoop端口:
hdfs.fs-uri=hdfs://127.0.0.1:50010/
6. 使用AggregateApplicationBuilder
或者,我们可以通过org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder将我们的Spring Stream Source和Sink组合成一个简单的Spring Boot应用程序!
首先,我们将两个Stream App Starter添加到我们的pom.xml中:
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
</dependencies>
然后,我们将开始组合我们的两个Stream App Starter依赖项,方法是将它们包装到各自的子应用程序中。
6.1 构建我们的应用程序组件
我们的SourceApp指定要转换或消费的Source:
@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
@InboundChannelAdapter(Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
请注意,我们将SourceApp绑定到org.springframework.cloud.stream.messaging.Source并注入适当的配置类以从我们的环境属性中获取所需的设置。
接下来,我们设置一个简单的org.springframework.cloud.stream.messaging.Processor绑定:
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
log.info("Payload received!");
return payload;
}
}
然后,我们创建我们的消费者(Sink):
@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
@ServiceActivator(inputChannel= Sink.INPUT)
public void loggerSink(Object payload) {
log.info("Received: " + payload);
}
}
在这里,我们将SinkApp绑定到org.springframework.cloud.stream.messaging.Sink并再次注入正确的配置类以使用我们指定的Hadoop设置。
最后,我们在AggregateApp main方法中使用AggregateApplicationBuilder组合我们的SourceApp、ProcessorApp和SinkApp:
@SpringBootApplication
public class AggregateApp {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApp.class).args("--fixedDelay=5000")
.via(ProcessorApp.class)
.to(SinkApp.class).args("--debug=true")
.run(args);
}
}
与任何Spring Boot应用程序一样,我们可以通过application.properties或以编程方式将指定设置作为环境属性注入。
由于我们使用的是Spring Stream框架,因此我们还可以将参数传递给AggregateApplicationBuilder构造函数。
6.2 运行完成的应用程序
然后我们可以使用以下命令行指令编译并运行我们的应用程序:
$ mvn install
$ java -jar twitterhdfs.jar
请记住将每个@SpringBootApplication类保存在一个单独的包中(否则,将抛出几个不同的绑定异常)!有关如何使用AggregateApplicationBuilder的更多信息,请查看官方文档。
在编译并运行我们的应用程序之后,我们应该在控制台中看到类似以下的内容(当然,内容会因推文而异):
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
c.t.t.twitterhdfs.processor.ProcessorApp : Payload received!
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
c.t.t.twitterhdfs.sink.SinkApp : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...
这些演示了我们的Processor和Sink在从Source接收数据时的正确操作!在这个例子中,我们没有配置我们的HDFS Sink来做很多事情-它只会打印消息“Payload received!”
7. 总结
在本教程中,我们学习了如何将两个很棒的Spring Stream App Starters组合成一个漂亮的Spring Boot示例!
这里有一些关于Spring Boot Starters以及如何创建自定义Starter的其他很棒的官方文章!
与往常一样,本教程的完整源代码可在GitHub上获得。