Netty中的自定义事件处理程序和监听器

2025/03/13

1. 简介

在本教程中,我们将使用Netty创建聊天室应用。在网络编程中,Netty是一个出色的强大框架,可简化异步I/O操作的复杂性。我们将探索如何构建一个基本的聊天服务器,多个客户端可以连接并参与实时对话。

2. 场景

发送到服务器的消息将转发给所有连接的客户端,它还将保存最近发送的几条消息的列表,以便新客户端在连接时可以从当前对话中获取上下文。为此,我们只需要几个事件处理程序来维持通道之间的通信

在Netty中,通信是通过通道进行的,通道抽象了任何协议上的异步I/O操作,这样我们就可以专注于应用程序逻辑而不是网络代码。

3. 创建自定义事件处理程序

对于通道之间的通信,我们将实现SimpleChannelInboundHandler<String>,这是ChannelInboundHandlerAdapter的通用实现。此适配器允许我们专注于仅实现我们关心的事件,在本例中,它是channelRead0(),当从服务器收到消息时调用,我们将使用它来简化我们的用例,因为我们只交换字符串消息

3.1 客户端事件处理器

让我们从客户端消息的处理程序开始,它将把服务器收到的任何内容打印到控制台

public class ClientEventHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        System.out.println(msg);
    }
}

稍后,我们将通过直接写入通道来处理消息发送。

3.2 消息对象

在讨论服务器事件之前,让我们编写一个POJO来表示发送到服务器的每条消息,我们将注册发送的日期以及用户名和消息:

public class Message {

    private final Instant time;
    private final String user;
    private final String message;

    public Message(String user, String message) {
        this.time = Instant.now();
        this.user = user;
        this.message = message;
    }

    // standard getters...
}

然后,我们将包含一些工具方法

@Override
public String toString() {
    return time + " - " + user + ": " + message;
}

然后,为了解析客户端收到的消息,我们将使用CSV格式。在创建客户端应用程序时,我们将看到客户端如何以这种格式发送消息:

public static Message parse(String string) {
    String[] arr = string.split(";", 2);
    return new Message(arr[0], arr[1]);
}

将拆分限制为2很重要,因为消息部分可能包含分号。

3.3 服务器事件处理器

在我们的服务器事件处理程序中,我们首先为要覆盖的其他事件创建一个辅助方法。此外,我们需要一个已连接的客户端映射和一个最多可保存MAX_HISTORY个元素的队列

public class ServerEventHandler extends SimpleChannelInboundHandler<String> {

    static final Map<String, Channel> clients = new HashMap<>();
    static final Queue<String> history = new LinkedList<>();
    static final int MAX_HISTORY = 5;

    private void handleBroadcast(Message message, ChannelHandlerContext context) {
        String channelId = context.channel()
            .id()
            .asShortText();
        
        clients.forEach((id, channel) -> {
            if (!id.equals(channelId))
                channel.writeAndFlush(message.toString());
        });

        // history-control code...
    }

    // ...
}

首先,我们获取通道ID作为Map的键。然后,对于广播,对于每个连接的客户端(发送者除外),我们都会转发其消息

需要注意的是writeAndFlush()接收一个Object,而且,由于我们的处理程序只能处理字符串,因此必须调用toString()以便客户端可以正确接收它

最后,我们进行历史控制,每次我们添加新消息时,如果列表超过MAX_HISTORY条,我们将删除最旧的一条:

history.add(message.toString());
if (history.size() > MAX_HISTORY)
    history.poll();

现在,我们可以覆盖channelRead0()并解析从客户端收到的消息:

@Override
public void channelRead0(ChannelHandlerContext context, String msg) {
    handleBroadcast(Message.parse(msg), context);
}

然后,对于每个上线的客户端,我们将其添加到我们的clients列表中,传递旧消息以获取上下文,并发送系统消息宣布新客户端

@Override
public void channelActive(final ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.put(channel.id().asShortText(), channel);

    history.forEach(channel::writeAndFlush);

    handleBroadcast(new Message("system", "client online"), context);
}

最后,我们重写channelInactive(),当客户端离线时调用。这次,我们只需要从列表中删除客户端并发送系统消息

@Override
public void channelInactive(ChannelHandlerContext context) {
    Channel channel = context.channel();
    clients.remove(channel.id().asShortText());

    handleBroadcast(new Message("system", "client offline"), context);
}

4. 服务器应用程序

我们的处理程序不能独立执行任何操作,因此我们需要一个应用程序来引导和运行它,这是一个通用模板。

4.1 在ChannelPipeline中注册自定义组件

为了准备启动,我们选择一个通道实现并实现一个子处理程序,该子处理程序为该通道的请求提供服务

bootstrap.group(serverGroup, clientGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel channel) {
            channel.pipeline()
                .addFirst(
                    new StringDecoder(),
                    new ServerEventHandler()
                    new StringEncoder());
        }
    });

在子处理程序中,我们定义了处理管道。由于我们只关心字符串消息,因此我们将使用内置的字符串编码器和解码器,这样就无需自己对交换的字节缓冲区进行编码/解码,从而节省一些时间

最后,由于顺序很重要,我们添加了解码器、ServerEventHandler和编码器,这是因为事件从入站流经管道到出站。

我们将服务器绑定到主机/端口以完成我们的应用程序,它将返回一个ChannelFuture,我们将使用它来等待异步套接字使用sync()关闭:

ChannelFuture future = bootstrap.bind(HOST, PORT).sync();
System.out.println("server started. accepting clients.");
future.channel().closeFuture().sync();

5. 客户端应用程序

最后,我们的客户端应用遵循bootstrapping的通用客户端模板,最重要的是,当调用handler()时,我们将改用ClientEventHandler:

channel.pipeline().addFirst(
    new StringDecoder(), 
    new ClientEventHandler(), 
    new StringEncoder());

5.1 处理消息输入

最后,为了处理用户输入,在连接到服务器后,我们将使用Scanner循环,直到收到用户名,然后直到消息等于“exit”。最重要的是,我们必须使用writeAndFlush()发送消息,我们以Message.parse()所需的格式发送消息:

private static void messageLoop(Scanner scanner, Channel channel) {
    while (user.isEmpty()) {
        System.out.print("your name: ");
        user = scanner.nextLine();
    }

    while (scanner.hasNext()) {
        System.out.print("> ");
        String message = scanner.nextLine();
        if (message.equals("exit"))
            break;

        channel.writeAndFlush(user + ";" + message);
    }
}

6. 创建自定义事件监听器

在Netty中,事件监听器在通道的整个生命周期中处理异步事件方面起着至关重要的作用。事件监听器本质上是一种回调机制,我们可以使用它来对返回ChannelFuture的任何操作的完成做出反应

我们实现ChannelFutureListener接口,以便在完成后自定义行为。ChannelFuture表示异步操作的结果,例如连接尝试或I/O操作。

ChannelFutureListener很有用,因为它定义了CLOSE_ON_FAILURE或FIRE_EXCEPTION_ON_FAILURE等默认实现。但是,由于我们不会使用这些,因此让我们实现一个GenericFutureListener来用于操作确认。

我们将保留一个自定义事件名称作为上下文,并检查我们的Future是否成功完成。否则,我们将在记录之前将状态标记为“FAILED”

public class ChannelInfoListener implements GenericFutureListener<ChannelFuture> {

    private final String event;

    public ChannelInfoListener(String event) {
        this.event = event;
    }

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        String status = "OK";

        if (!future.isSuccess()) {
            status = "FAILED";
            future.cause().printStackTrace();
        }

        System.out.printf("%s - channel#%s %s: %s%n", Instant.now(), channel.id().asShortText(), status, event);
    }
}

6.1 事件收据

让我们回到代码的某些部分来包含监听器,首先,对于客户端,让我们包含“connected to server”确认

future.addListener(new ChannelInfoListener("connected to server"));

然后,让我们在消息循环中包含一个“message sent”确认

ChannelFuture sent = channel.writeAndFlush(user + ";" + message);
sent.addListener(new ChannelInfoListener("message sent"));

这样我们就可以确保在发送消息时仍连接到服务器。最后,对于服务器处理程序,让我们在广播期间发送“message relayed”确认

clients.forEach((id, channel) -> {
    if (!id.equals(channelId)) {
        ChannelFuture relay = channel.writeAndFlush(message.toString());
        relay.addListener(new ChannelInfoListener("message relayed to " + id));
    }
});

7. 实际操作

Netty允许我们使用EmbeddedChannel测试管道,但对于客户端/服务器交互,让我们看看从终端运行时是什么样子,让我们启动服务器(为了便于阅读,省略包名称):

$ mvn exec:java -Dexec.mainClass=ChatServerMain
chat server started. ready to accept clients.

然后,我们启动第一个客户端,输入一个名字,并发送两条消息:

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:47:02 - channel#03c40ad4 OK: connected to server
your name: Bob
> Hello
2024-01-12 3:47:02 - channel#03c40ad4 OK: message sent
> Anyone there?!
2024-01-12 3:47:03 - channel#03c40ad4 OK: message sent

当我们连接到第二个客户端时,我们会在输入姓名之前获取消息历史记录

$ mvn exec:java -Dexec.mainClass=ChatClientMain
2024-01-12 3:49:33 - channel#daa64476 OK: connected to server
2024-01-12 3:46:55 - system: client online: 03c40ad4
2024-01-12 3:47:03 - Bob: Hello
2024-01-12 3:48:40 - Bob: Anyone there?!

当然,选择名字并发送消息后:

your name: Alice
> Hi, Bob!
2024-01-12 3:51:05 - channel#daa64476 OK: message sent

第一个客户端将会收到它:

2024-01-12 3:49:33 - system: client online: daa64476
2024-01-12 3:51:05 - Alice: Hi, Bob!

8. 总结

在本文中,我们成功地使用Netty构建了一个功能齐全的聊天服务器,展示了该框架在处理异步通信方面的强大功能和简便性。通过实现事件处理程序,我们成功地在连接的客户端之间传递消息并维护上下文历史记录。

Show Disqus Comments
0 comments
Anonymous
Error: Not Found.
Markdown is supported

Be the first guy leaving a comment!

Post Directory

1. 简介
2. 场景
3. 创建自定义事件处理程序
  - 3.1 客户端事件处理器
  - 3.2 消息对象
  - 3.3 服务器事件处理器
4. 服务器应用程序
  - 4.1 在ChannelPipeline中注册自定义组件
5. 客户端应用程序
  - 5.1 处理消息输入
6. 创建自定义事件监听器
  - 6.1 事件收据
7. 实际操作
8. 总结
扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章