使用JGroups实现可靠的消息传递

2025/04/21

1. 概述

JGroups是一个用于可靠消息交换的Java API,它具有一个简单的接口,可提供:

  • 灵活的协议栈,包括TCP和UDP
  • 大型消息的分片和重组
  • 可靠的单播和多播
  • 故障检测
  • 流量控制

以及许多其他功能。

在本教程中,我们将创建一个简单的应用程序,用于在应用程序之间交换String消息并在新应用程序加入网络时为其提供共享状态。

2. 设置

2.1 Maven依赖

我们需要向我们的pom.xml添加一个依赖项:

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups</artifactId>
    <version>4.0.10.Final</version>
</dependency>

2.2 网络

默认情况下,JGroups将尝试使用IPV6。根据我们的系统配置,这可能会导致应用程序无法通信。

为了避免这种情况,我们在此处运行应用程序时将java.net.preferIPv4Stack属性设置为true:

java -Djava.net.preferIPv4Stack=true cn.tuyucheng.taketoday.jgroups.JGroupsMessenger

3. JChannels

我们与JGroups网络的连接是JChannel。该通道加入集群并发送和接收消息,以及有关网络状态的信息。

3.1 创建Channel

我们创建一个带有配置文件路径的JChannel。如果我们省略文件名,它将在当前工作目录中查找udp.xml。

我们将使用明确命名的配置文件创建一个通道:

JChannel channel = new JChannel("src/main/resources/udp.xml");

JGroups配置可能非常复杂,但默认的UDP和TCP配置对于大多数应用程序来说已经足够了。我们已将UDP文件包含在我们的代码中,并将在本教程中使用它。

有关配置传输的更多信息,请参阅此处的JGroups手册。

3.2 连接Channel

创建通道后,我们需要加入集群,集群是一组交换消息的节点

加入集群需要集群名称:

channel.connect("Tuyucheng");

如果集群不存在,第一个尝试加入集群的节点将创建它,我们会在下面看到这个过程的实际效果。

3.3 命名Channel

节点由名称标识,以便对等方可以发送定向消息并接收有关谁进入和离开集群的通知。JGroups将自动分配一个名称,或者我们可以设置自己的名称:

channel.name("user1");

我们将在下面使用这些名称来跟踪节点何时进入和离开集群。

3.4 关闭Channel

如果我们希望对等方及时收到我们已退出的通知,通道清理是必不可少的

我们用它的close方法关闭一个JChannel:

channel.close()

4. 集群视图变化

创建JChannel后,我们现在可以查看集群中对等点的状态并与它们交换消息。

JGroups在View类中维护集群状态。每个通道都有一个网络视图。当视图改变时,它通过viewAccepted()回调传递。

对于本教程,我们将扩展ReceiverAdaptor API类,该类实现应用程序所需的所有接口方法。这是实现回调的推荐方式

下面我们将viewAccepted添加到我们的应用程序中:

public void viewAccepted(View newView) {
    private View lastView;

    if (lastView == null) {
        System.out.println("Received initial view:");
        newView.forEach(System.out::println);
    } else {
        System.out.println("Received new view.");

        List<Address> newMembers = View.newMembers(lastView, newView);
        System.out.println("New members: ");
        newMembers.forEach(System.out::println);

        List<Address> exMembers = View.leftMembers(lastView, newView);
        System.out.println("Exited members:");
        exMembers.forEach(System.out::println);
    }
    lastView = newView;
}

每个视图都包含一个Address对象集合,代表集群的每个成员。JGroups提供了将一个视图与另一个视图进行比较的便捷方法,我们用它来检测集群的新成员或退出成员。

5. 发送消息

JGroups中的消息处理非常简单,消息包含一个字节数组和对应于发送方和接收方的Address对象。

对于本教程,我们使用从命令行读取的字符串,但很容易看出应用程序如何交换其他数据类型。

5.1 广播消息

Message是用一个目标和一个字节数组创建的;JChannel为我们设置了发送者。如果目标为null,则整个集群都会收到消息

我们将从命令行接收文本并将其发送到集群:

System.out.print("Enter a message: ");
String line = in.readLine().toLowerCase();
Message message = new Message(null, line.getBytes());
channel.send(message);

如果我们运行程序的多个实例并发送此消息(在我们实现下面的receive()方法之后),则所有实例都会收到它,包括发送者。

5.2 阻塞消息

如果我们不想看到我们的消息,我们可以为此设置一个属性:

channel.setDiscardOwnMessages(true);

当我们运行前面的测试时,消息发送者不会收到它的广播消息。

5.3 直接消息

发送直接消息需要有效的地址,如果我们按名称引用节点,我们需要一种方法来查找Address。幸运的是,我们有视图。

当前视图始终可以从JChannel获得:

private Optional<address> getAddress(String name) { 
    View view = channel.view(); 
    return view.getMembers().stream()
        .filter(address -> name.equals(address.toString()))
        .findAny(); 
}

地址名称可通过类的toString()方法获得,因此我们只需在集群成员集合中搜索我们想要的名称。

因此,我们可以从控制台接收一个名字,找到相关的目的地,并发送一条直接消息:

Address destination = null;
System.out.print("Enter a destination: ");
String destinationName = in.readLine().toLowerCase();
destination = getAddress(destinationName).orElseThrow(() -> new Exception("Destination not found"); 
Message message = new Message(destination, "Hi there!"); 
channel.send(message);

6. 接收消息

我们可以发送消息,现在让我们尝试接收消息。

我们重写ReceiverAdaptor的空receive方法:

public void receive(Message message) {
    String line = Message received from: " 
        + message.getSrc() 
        + " to: " + message.getDest() 
        + " -> " + message.getObject();
    System.out.println(line);
}

因为我们知道消息包含一个字符串,所以我们可以安全地将getObject()传递给System.out。

7. 状态交换

当节点进入网络时,它可能需要检索有关集群的状态信息。JGroups为此提供了一种状态转移机制。

当一个节点加入集群时,它只需调用getState()。集群通常从组中最早的成员(协调者)那里检索状态。

让我们向我们的应用程序添加一个广播消息计数器,我们将添加一个新的成员变量并在receive()中递增它:

private Integer messageCount = 0;

public void receive(Message message) {
    String line = "Message received from: " 
      + message.getSrc() 
      + " to: " + message.getDest() 
      + " -> " + message.getObject();
    System.out.println(line);

    if (message.getDest() == null) {
        messageCount++;
        System.out.println("Message count: " + messageCount);
    }
}

我们检查一个空目的地,因为如果我们计算直接消息,每个节点将有一个不同的数字。

接下来,我们重写ReceiverAdaptor中的另外两个方法:

public void setState(InputStream input) {
    try {
        messageCount = Util.objectFromStream(new DataInputStream(input));
    } catch (Exception e) {
        System.out.println("Error deserialing state!");
    }
    System.out.println(messageCount + " is the current messagecount.");
}

public void getState(OutputStream output) throws Exception {
    Util.objectToStream(messageCount, new DataOutputStream(output));
}

与消息类似,JGroups以字节数组的形式传输状态。

JGroups向协调器提供一个InputStream来写入状态,并为新节点提供一个OutputStream来读取。API提供了用于序列化和反序列化数据的便利类。

请注意,在生产代码中,对状态信息的访问必须是线程安全的。

最后,在我们连接到集群后,我们将对getState()的调用添加到我们的启动中:

channel.connect(clusterName);
channel.getState(null, 0);

getState()接收请求状态的目的地和以毫秒为单位的超时。空目标表示协调器,0表示不超时。

当我们使用一对节点运行这个应用程序并交换广播消息时,我们看到messageCount增加。

然后,如果我们添加第三个客户端或停止并启动其中一个,我们将看到新连接的节点打印正确的messageCount。

8. 总结

在本教程中,我们使用JGroups创建了一个用于交换消息的应用程序。我们使用API来监控哪些节点连接到集群和离开集群,并在新节点加入时将集群状态传输到新节点。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章