将Spring与AWS Kinesis集成

2023/05/13

1. 简介

Kinesis是一种用于实时收集、处理和分析数据流的工具,由亚马逊开发。它的主要优点之一是它有助于开发事件驱动的应用程序。

在本教程中,我们将探索一些库,使我们的Spring应用程序能够从Kinesis Stream生成和消费记录。代码示例将显示基本功能,但不代表生产就绪代码。

2. 先决条件

在我们进一步讨论之前,我们需要做两件事。

首先是创建一个Spring项目,因为这里的目标是从Spring项目与Kinesis交互。

第二个是创建Kinesis Data Stream。我们可以通过AWS账户中的Web浏览器执行此操作。对于我们当中的AWS CLI粉丝来说,一种替代方法是使用命令行。由于我们将从代码中与之交互,因此我们还必须手头有AWS IAM凭证、访问密钥和私有密钥以及区域。

我们所有的生产者都将创建虚拟IP地址记录,而消费者将读取这些值并将它们列在应用程序控制台中。

3. 适用于Java的AWS开发工具包

我们将使用的第一个库是适用于Java的AWS开发工具包。它的优势在于它允许我们管理使用Kinesis Data Streams的许多部分。我们可以读取数据、生产数据、创建数据流和重新分片数据流。缺点是,为了拥有生产就绪的代码,我们必须对重新分片、错误处理或守护进程等方面进行编码,以保持消费者的活力。

3.1 Maven依赖

amazon-kinesis-client Maven依赖项将带来我们拥有工作示例所需的一切。我们现在将它添加到我们的pom.xml文件中:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.11.632</version>
</dependency>

3.2 Spring设置

让我们重用与Kinesis Stream交互所需的AmazonKinesis对象。我们将在我们的@SpringBootApplication类中将其创建为@Bean:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      	.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      	.withRegion(Regions.EU_CENTRAL_1)
      	.build();
}

接下来,让我们在application.properties中定义本地机器所需的aws.access.key和aws.secret.key:

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

我们将使用@Value注解读取它们:

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

为了简单起见,我们将依赖@Scheduled方法来创建和消费记录。

3.3 消费者

AWS SDK Kinesis消费者使用拉取模型,这意味着我们的代码将从Kinesis数据流的碎片中提取记录:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      	.map(record -> new String(record.getData().array()))
      	.forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

GetRecordsRequest对象构建对流数据的请求。在我们的示例中,我们定义了每个请求25条记录的限制,并且我们会继续读取,直到没有更多内容可读取为止。

我们还可以注意到,对于我们的迭代,我们使用了GetShardIteratorResult对象。我们在@PostConstruct方法中创建了这个对象,这样我们就可以立即开始跟踪记录:

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4 生产者

现在让我们看看如何为我们的Kinesis数据流处理记录的创建

我们使用PutRecordsRequest对象插入数据。对于这个新对象,我们添加了一个包含多个PutRecordsRequestEntry对象的列表:

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

我们已经创建了一个基本的消费者和模拟IP记录的生产者。现在剩下要做的就是运行我们的Spring项目并查看应用程序控制台中列出的IP。

4. KCL和KPL

Kinesis Client Library(KCL)是一个简化记录消费的库。它也是针对Kinesis Data Streams的AWS开发工具包Java API的抽象层。在幕后,该库处理多个实例之间的负载平衡、响应实例故障、检查处理的记录以及对重新分片做出反应。

Kinesis Producer Library(KPL)是一个可用于写入Kinesis数据流的库。它还提供了一个位于Kinesis Data Streams的AWS开发工具包Java API之上的抽象层。为了获得更好的性能,该库会自动处理批处理、多线程和重试逻辑。

KCL和KPL的主要优点都是易于使用,因此我们可以专注于生产和消费记录。

4.1 Maven依赖项

如果需要的话,这两个库可以在我们的项目中单独引入。要在我们的Maven项目中包含KPLKCL,我们需要更新我们的pom.xml文件:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.11.2</version>
</dependency>

4.2 Spring设置

我们唯一需要的Spring准备工作是确保我们手头有IAM凭证。aws.access.key和aws.secret.key的值在我们的application.properties文件中定义,因此我们可以在需要时使用@Value读取它们。

4.3 消费者

首先,我们将创建一个实现IRecordProcessor接口的类,并定义我们如何处理Kinesis数据流记录的逻辑,即在控制台中打印它们:

public class IpProcessor implements IRecordProcessor {
	@Override
	public void initialize(InitializationInput initializationInput) { }

	@Override
	public void processRecords(ProcessRecordsInput processRecordsInput) {
		processRecordsInput.getRecords()
			.forEach(record -> System.out.println(new String(record.getData().array())));
	}

	@Override
	public void shutdown(ShutdownInput shutdownInput) { }
}

下一步是定义一个实现IRecordProcessorFactory接口并返回先前创建的IpProcessor对象的工厂类

public class IpProcessorFactory implements IRecordProcessorFactory {
	@Override
	public IRecordProcessor createProcessor() {
		return new IpProcessor();
	}
}

现在,对于最后一步,我们将使用Worker对象来定义我们的消费者管道。我们需要一个KinesisClientLibConfiguration对象,该对象将在需要时定义IAM凭证和AWS区域。

我们会将KinesisClientLibConfiguration和我们的IpProcessorFactory对象传递给我们的Worker,然后在单独的线程中启动它。我们通过使用Worker类来保持这种消费记录的逻辑始终存在,所以我们现在不断地读取新记录:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  	"KinesisKCLConsumer",
  	IPS_STREAM,
  	"",
  	"",
  	DEFAULT_INITIAL_POSITION_IN_STREAM,
  	new AWSStaticCredentialsProvider(awsCredentials),
  	new AWSStaticCredentialsProvider(awsCredentials),
  	new AWSStaticCredentialsProvider(awsCredentials),
  	DEFAULT_FAILOVER_TIME_MILLIS,
  	"KinesisKCLConsumer",
  	DEFAULT_MAX_RECORDS,
  	DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
  	DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
  	DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
  	DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
  	DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
  	new ClientConfiguration(),
  	new ClientConfiguration(),
  	new ClientConfiguration(),
  	DEFAULT_TASK_BACKOFF_TIME_MILLIS,
  	DEFAULT_METRICS_BUFFER_TIME_MILLIS,
  	DEFAULT_METRICS_MAX_QUEUE_SIZE,
  	DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
  	Regions.EU_CENTRAL_1.getName(),
  	DEFAULT_SHUTDOWN_GRACE_MILLIS,
  	DEFAULT_DDB_BILLING_MODE,
  	null,
  	0,
  	0, 
  	0
);
final Worker worker = new Worker.Builder()
 	.recordProcessorFactory(new IpProcessorFactory())
 	.config(consumerConfig)
 	.build();
CompletableFuture.runAsync(worker.run());

4.4 生产者

现在让我们定义KinesisProducerConfiguration对象,添加IAM凭证和AWS区域:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  	.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  	.setVerifyCertificate(false)
  	.setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

我们将包含先前在@Scheduled作业中创建的kinesisProducer对象,并持续为我们的Kinesis数据流生成记录:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  	.forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

我们已经看到了两个库,它们都是在Spring生态系统之外创建的。现在,我们将看到Spring Cloud Stream Binder Kinesis如何在构建于Spring Cloud Stream之上的同时进一步简化我们的编码

5.1 Maven依赖

我们需要在Spring Cloud Stream Binder Kinesis的应用程序中定义的Maven依赖项是:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>1.2.1.RELEASE</version>
</dependency>

5.2 Spring设置

在EC2上运行时,会自动发现所需的AWS属性,因此无需定义它们。由于我们在本地机器上运行我们的示例,因此需要为我们的AWS账户定义我们的IAM访问密钥、私有密钥和区域。我们还为应用程序禁用了自动CloudFormation堆栈名称检测:

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream捆绑了我们可以在流绑定中使用的三个接口

  • Sink用于数据引入
  • Source用于发布记录
  • Processor是两者的结合

如果需要,我们也可以定义自己的接口。

5.3 消费者

定义消费者是一项分为两部分的工作。首先,我们将在application.properties中定义我们将从中消费的数据流:

spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input

接下来,让我们使用允许我们从Kinesis流中读取的Supplier来定义一个带有@Bean的Spring @Configuration类:

@Configuration
public class ConsumerBinder {
	@Bean
	Consumer<String> input() {
		return str -> {
			System.out.println(str);
		};
	}
}

5.4 生产者

生产者也可以一分为二。首先,我们必须在application.properties中定义我们的流属性:

spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000

然后我们在Spring @Configuration上添加带有Supplier的@Bean以每隔几秒创建新的测试消息

@Configuration 
class ProducerBinder {
	@Bean 
	public Supplier output() {
		return () -> IntStream.range(1, 200)
			.mapToObj(ipSuffix ->"192.168.0." + ipSuffix)
			.map(entry ->MessageBuilder.withPayload(entry)
				.build());
	}
}

这就是Spring Cloud Stream Binder Kinesis工作所需的全部内容。我们现在可以简单地启动应用程序。

6. 总结

在本文中,我们了解了如何将Spring项目与两个AWS库集成以与Kinesis Data Stream进行交互。我们还了解了如何使用Spring Cloud Stream Binder Kinesis库来简化实现。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章