使用Nats Java客户端发布和接收消息

2025/03/25

1. 概述

在本教程中,我们将使用NATS的Java客户端连接到NATS服务器,以便我们可以发布和接收消息。

NATS提供三种主要的消息交换模式:

  • 发布/订阅语义将消息传递给某个主题的所有订阅者。
  • 请求/回复消息传递将请求发送给主题的所有订阅者,并将响应路由回请求者。返回给请求者的第一个响应将被处理,我们可以轻松构建一个类似请求/回复的系统,其中所有响应都将被处理,此功能可能会在未来版本中添加。
  • 订阅者在订阅主题时也可以加入消息队列组,发送到相关主题的消息只会传递给队列组中的一个订阅者,此语义可用于发布/订阅或请求/回复。

2. 设置

本节讨论运行示例所需的基本设置。

2.1 Maven依赖

首先,我们需要将NATS库添加到我们的pom.xml文件中:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.17.6</version>
</dependency>

Java客户端REAMDE文件的Maven部分提供了更详细的说明,Gradle用户可以在README文件的Gradle部分找到说明

2.2 Nats服务器

其次,我们需要一个Nats服务器来交换消息,NATS文档网站上有针对所有主要平台的说明。

我们假设在localhost:4222上有一个服务器正在运行。

3. 连接到Nats服务器

要交换消息,首先需要连接到服务器。客户端连接后,可以通过发布和订阅主题来交换消息。

3.1 使用或不使用自定义Options进行连接

静态Nats类中的Nats.connect()方法创建一个Connection。

要创建连接,我们可以提供服务器的主机和端口等信息。还有其他可配置的连接选项,例如用户名和密码、如何处理安全连接以及影响连接行为方式的许多其他选项。如果我们不向Nats.connect()方法提供Options对象,它将使用所有默认值并尝试连接到localhost上的服务器,端口为4222:

Connection natsConnection = Nats.connect();

为了演示如何使用自定义选项创建连接,让我们编写一个示例,使用以下内容构建自定义Options对象:

  • 接收连接事件并打印出来的自定义ConnectionListener
  • 接收错误事件并打印出来的自定义ErrorListener

此函数为特定URI创建一个Options对象并将其传递给Nats.connect,如果URI为null或为空,则Options将采用默认服务器位置:

Options options = new Options.Builder()
    .server(uri)
    .connectionListener((connection, event) -> log.info("Connection Event: " + event))
    .errorListener(new CustomErrorListener())
    .build();

接下来要做的事情是使用该Options实例进行连接:

Connection natsConnection = Nats.connect(options);

NATS连接持久,API将自动尝试重新连接丢失的连接。

作为连接选项的一部分,我们安装了一个ConnectionListener实现Lambda,用于在发生连接事件(如断开连接或重新连接)时通知我们。我们还安装了一个ErrorListener。ErrorListener接口包含多个方法,因此无法使用Lambda创建。示例代码有一个名为CustomErrorListener的实现。

让我们进行一个快速测试。首先,我们创建一个新的NatsClient,它将初始化一个连接。然后,我们添加60秒的睡眠以保持进程运行:

Connection conn = createConnection("nats://localhost:4222");
Thread.sleep(60000);

当我们运行这个程序时,连接监听器将报告已打开的连接:

Connection Event: nats: connection opened

然后,让我们停止并启动我们的Nats服务器:

Exception Occurred: java.io.IOException: Read channel closed.
Connection Event: nats: connection disconnected
Exception Occurred: java.net.ConnectException: Connection refused: no further information
Connection Event: nats: connection disconnected
Exception Occurred: java.net.ConnectException: Connection refused: no further information
Connection Event: nats: connection disconnected
Connection Event: nats: connection reconnected
Connection Event: nats: subscriptions re-established

我们可以看到ConnectionListener和CustomErrorListener接收并处理异常,断开连接,重新连接等事件。

4. 交换信息

现在我们有了Connection,我们可以进行消息处理了。交换消息需要在主题上发布和订阅。

当我们向某个主题发布消息时,订阅该主题的程序将收到该消息的副本,订阅者必须在消息发布前做好准备。

NATS消息是字节数组的容器,除了预期的setData(byte[])和byte[] getData()方法外,还有用于设置和获取消息目标主题和回复主题的方法。

4.1 订阅消息

我们订阅的主题是String。

NATS支持同步和异步订阅

4.2 异步订阅

异步订阅需要Dispatcher,Dispatcher包含将传入消息发送给异步处理程序的线程。

由于一个Connection可以有多个订阅,因此我们可以决定是否为每个订阅设置一个Dispatcher,或者在多个订阅之间共享一个Dispatcher。每个Dispatcher都有一个在其自己的线程上执行的运行循环。运行循环调用消息处理程序的onMessage(Message)方法并等待处理程序返回。

如果我们有多个负载较高的订阅,或者我们的消息处理程序需要很长时间才能处理每条消息,那么最好为每个订阅设置一个Dispatcher。如果偶尔有消息或处理时间很短,那么让它们共享一个Dispatcher也是可以的,这通常根据用例针对每个应用程序进行调整。

在此示例中,让我们创建一个Dispatcher并将其用于多个订阅。无论是否用于多个订阅,创建Dispatcher的过程都是相同的:

Dispatcher dispatcher = natsConnection.createDispatcher();
Subscription subscription = dispatcher.subscribe(subject, msg -> log.info("Subscription received message " + msg));
Subscription qSubscription = dispatcher.subscribe(subject, queueGroup, msg -> log.info("Queue subscription received message " + msg));

订阅完成后,最好取消订阅。如果我们不取消订阅,服务器将继续向主题发送消息,直到连接关闭。要取消使用Dispatcher进行的订阅,我们调用unsubscribe(…):

dispatcher.unsubscribe(subscription);

4.3 同步订阅

同步订阅要求每个订阅者在其自己的线程中使用nextMessage方法轮询消息。在这里,我们创建一个同步订阅。请注意,不需要调度程序:

Subscription subscription = natsConnection.subscribe("mySubject");
Message message = subscription.nextMessage(1000);

nextMessage(…)方法调用将等待指定的毫秒数或直到收到消息。如果在超时时间内没有可用的消息,则nextMessage返回null。

我们将使用同步订阅进行测试,以使测试用例保持简单。完成同步订阅后,我们需要直接从订阅中调用取消订阅:

subscription.unsubscribe();

4.4 发布消息

发布消息是一种“发射后不管”的操作-我们不知道是否有人订阅了该消息。我们可以使用几种不同的API来发布消息。最简单的方法只需要一个主题字符串和消息字节:

natsConnection.publish("mySubject", "Hi there!".getBytes());

对于其他一些发布组合,也有过载,比如传递消息而不是字节。

4.5 消息响应

有两种方法可以获取已发布消息的响应。我们可以使用reply-to进行常规发布,也可以使用内置的请求功能。如果我们想收到对任何给定主题的多个响应,则需要使用发布模式。如果我们只需要对任何给定主题的第一个响应,则可以使用请求模式。

4.6 发布以获得大量回应

消息中的reply-to字段是为发布者提供的,用于提供响应者将订阅的主题。我们可以在发布消息 时自己提供reply-to主题。然后,作为发布者的我们可以订阅该reply-to主题并等待我们想要的任意数量的响应:

Subscription replySideSubscription1 = client.subscribeSync("publishSubject");
Subscription replySideSubscription2 = client.subscribeSync("publishSubject");
Subscription publishSideSubscription = client.subscribeSync("replyToSubject");
natsConnection.publish("publishSubject", "replyToSubject", "Please respond!".getBytes());

回复端订阅者将订阅发布主题。请注意,我们不必对回复主题进行硬编码,因为它随消息一起提供:

Message m1 = replySideSubscription1.nextMessage(1000);
natsConnection.publish(m1.getReplyTo(), "Message Received By Subscription 1".getBytes());
Message m2 = replySideSubscription2.nextMessage(1000);
natsConnection.publish(m2.getReplyTo(), "Message Received By Subscription 2".getBytes());

发布端订阅可以得到所有回复,回复的顺序与服务器接收的顺序一致:

Message m1 = publishSideSubscription.nextMessage(1000);
Message m2 = publishSideSubscription.nextMessage(1000);

4.7 使用请求(/回复)仅获取一个响应

当我们使用请求方法时,我们无法提供回复主题-客户端会为我们完成这项工作。它会等待第一个响应,这意味着即使多个订阅者回复,也只会返回第一个收到的响应。所有其他响应都会被忽略。

Message response = natsConnection.request("requestSubject", "Please respond!".getBytes(), Duration.ofMillis(1000));

回复端订阅是相同的,但请求者(发布者)不需要设置订阅,只需等待响应即可。请求方法也有重载,使用CompletableFuture<Message>代替内置超时。

4.8 通配符订阅

NATS服务器支持主题通配符。

通配符作用于由“.”字符分隔的主题标记。星号“*”匹配单个标记。大于号“>”是通配符,用于匹配主题的其余部分,这些部分可能不止一个标记。

例如:

  • foo.*匹配foo.bar,但不匹配foo.bar.requests
  • foo.>匹配foo.bar、foo.bar.requests、foo.bar.tuyucheng等等

让我们尝试几个测试:

Subscription starSubscription = natsConnection.subscribe("segment.*");
natsConnection.publish("segment.another", convertStringToBytes("hello segment star"));

Message message = segmentStarSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello segment star sub", convertMessageDataBytesToString(message));

natsConnection.publish("segment.one.two", "hello there");
message = segmentStarSubscription.nextMessage(200);
assertNull("Got message!", message);

Subscription segmentGreaterSubscription = natsConnection.subscribe("segment.>");

client.publishMessage("segment.one.two", "hello segment greater sub");

message = greaterSubscription.nextMessage(200);
assertNotNull("No message!", message);
assertEquals("hello segment greater sub", new String(message.getData()));

5. 消息队列

订阅者可以在订阅时指定队列组。当消息发布到该组时,NATS将把它投递给一个且只有一个订阅者。

队列组不持久化消息。如果没有可用的监听器,则消息将被丢弃。

5.1 发布到队列

将消息发布到队列组与正常发布没有什么不同-它只需要发布到相关主题:

natsConnection.publish("mySubject", convertStringToBytes("data"));

5.2 订阅队列

订阅者以字符串形式指定队列组名称:

Subscription subscription = natsConnection.subscribe("mySubject", "myQueue");

当然还有一个异步版本:

List<Message> messages = new ArrayList<>();
Dispatcher dispatcher = connection.createDispatcher();
Subscription subscription = dispatcher.subscribe("mySubject", "myQueue", messages::add);

订阅在Nats服务器上创建队列,NATS服务器将消息路由到队列并选择消息接收者:

Subscription queue1 = natsConnection.subscribe("mySubject", "myQueue");
Subscription queue2 = natsConnection.subscribe("mySubject", "myQueue");

只有一个订阅(queue1或queue2)会收到任何特定消息。如果我们订阅同一主题但不订阅队列,如下所示:

Subscription subscription1 = natsConnection.subscribe("mySubject");
Subscription subscription2 = natsConnection.subscribe("mySubject");

那么subscription1和subscription2都会收到所有消息。如果同时订阅了四个订阅:queue1、queue2、subscription1和subscription2,那么queue1和queue2仍然会作为一个队列工作,并且subscription1和subscription2仍然会收到所有消息。

6. 总结

在这个简短的介绍中,我们连接到Nats服务器并发送发布/订阅消息和负载平衡队列消息。我们研究了Nats对通配符订阅的支持,我们还使用了请求/回复消息传递。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章