Netty中的HTTP/2

2025/03/13

1. 概述

Netty是一个基于NIO的客户端-服务器框架,它使Java开发人员能够操作网络层。使用此框架,开发人员可以构建任何已知协议的实现,甚至是自定义协议。

为了对该框架有基本的了解,Netty介绍是一个很好的开始。

在本教程中,我们将了解如何在Netty中实现HTTP/2服务器和客户端

2. 什么是HTTP/2?

顾名思义,HTTP/2是超文本传输协议的较新版本。

大约在1989年互联网诞生的时候,HTTP/1.0诞生了。1997年,它升级为1.1版本。然而,直到2015年,它才迎来了一次重大升级,即2版本。

在撰写本文时,HTTP/3也已可用,尽管并非所有浏览器都默认支持它。

HTTP/2仍然是被广泛接受和实现的最新版本的协议,它与之前的版本有显著的不同,其中包括多路复用和服务器推送功能等。

HTTP/2中的通信通过一组称为帧的字节进行,多个帧形成一个流。

在我们的代码示例中,我们将看到Netty如何处理HEADERSDATASETTINGS帧的交换

3. 服务器

现在让我们看看如何在Netty中创建HTTP/2服务器。

3.1 SSL上下文

Netty支持通过TLS进行HTTP/2的APN协商,因此,我们需要创建服务器的第一件事是SslContext

SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
    .sslProvider(SslProvider.JDK)
    .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
    .applicationProtocolConfig(
        new ApplicationProtocolConfig(Protocol.ALPN, SelectorFailureBehavior.NO_ADVERTISE,
            SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2))
    .build();

在这里,我们使用JDK SSL提供程序为服务器创建了一个上下文,添加了几个密码,并为HTTP/2配置了应用层协议协商。

这意味着我们的服务器将仅支持HTTP/2及其底层协议标识符h2

3.2 使用ChannelInitializer引导服务器

接下来,我们需要一个ChannelInitializer来为我们的复用子通道设置一个Netty管道。

我们将使用此通道中先前的sslContext来启动管道,然后引导服务器:

public final class Http2Server {

    static final int PORT = 8443;

    public static void main(String[] args) throws Exception {
        SslContext sslCtx = // create sslContext as described above
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.group(group)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        if (sslCtx != null) {
                            ch.pipeline()
                                .addLast(sslCtx.newHandler(ch.alloc()), Http2Util.getServerAPNHandler());
                        }
                    }
            });
            Channel ch = b.bind(PORT).sync().channel();

            logger.info("HTTP/2 Server is listening on https://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

作为此通道初始化的一部分,我们在实用程序方法getServerAPNHandler()中向管道添加了一个APN处理程序,该方法是在我们自己的实用程序类Http2Util中定义的:

public static ApplicationProtocolNegotiationHandler getServerAPNHandler() {
    ApplicationProtocolNegotiationHandler serverAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
        @Override
        protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
            if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
                ctx.pipeline().addLast(Http2FrameCodecBuilder.forServer().build(), new Http2ServerResponseHandler());
                return;
            }
            throw new IllegalStateException("Protocol: " + protocol + " not supported");
        }
    };
    return serverAPNHandler;
}

反过来,该处理程序使用其构建器和名为Http2ServerResponseHandler的自定义处理程序添加了Netty提供的Http2FrameCodec。

我们的自定义处理程序扩展了Netty的ChannelDuplexHandler,并充当服务器的入站和出站处理程序,它主要准备要发送到客户端的响应。

为了本教程的目的,我们将在io.netty.buffer.ByteBuf中定义一个静态Hello World响应-这是在Netty中读取和写入字节的首选对象:

static final ByteBuf RESPONSE_BYTES = Unpooled.unreleasableBuffer(
    Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));

这个缓冲区将在处理程序的channelRead方法中设置为DATA帧,并写入ChannelHandlerContext:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof Http2HeadersFrame msgHeader) {
        if (msgHeader.isEndStream()) {
            ByteBuf content = ctx.alloc().buffer();
            content.writeBytes(RESPONSE_BYTES.duplicate());

            Http2Headers headers = new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText());
            ctx.write(new DefaultHttp2HeadersFrame(headers).stream(msgHeader.stream()));
            ctx.write(new DefaultHttp2DataFrame(content, true).stream(msgHeader.stream()));
        }
    } else {
        super.channelRead(ctx, msg);
    }
}

就这样,我们的服务器已经准备好输出“Hello World”了。

为了进行快速测试,启动服务器并使用–http2选项发送curl命令:

curl -k -v --http2 https://127.0.0.1:8443

这将给出类似如下的响应:

> GET / HTTP/2
> Host: 127.0.0.1:8443
> User-Agent: curl/7.64.1
> Accept: */*
> 
* Connection state changed (MAX_CONCURRENT_STREAMS == 4294967295)!
< HTTP/2 200 
< 
* Connection #0 to host 127.0.0.1 left intact
Hello World* Closing connection 0

4. 客户端

接下来我们看一下客户端,它的作用是发送请求,然后处理从服务器得到的响应。

我们的客户端代码将包含几个处理程序、一个用于在管道中设置它们的初始化类,以及最后一个用于引导客户端并将所有内容整合在一起的JUnit测试

4.1 SSL上下文

首先让我们看看如何设置客户端的SslContext,我们将在设置客户端JUnit时编写此代码:

@Before
public void setup() throws Exception {
    SslContext sslCtx = SslContextBuilder.forClient()
        .sslProvider(SslProvider.JDK)
        .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
        .trustManager(InsecureTrustManagerFactory.INSTANCE)
        .applicationProtocolConfig(
            new ApplicationProtocolConfig(Protocol.ALPN, SelectorFailureBehavior.NO_ADVERTISE,
                SelectedListenerFailureBehavior.ACCEPT, ApplicationProtocolNames.HTTP_2))
        .build();
}

我们可以看到,它与服务器的SslContext非常相似,只是我们在这里不提供任何SelfSignedCertificate。另一个区别是我们添加了一个InsecureTrustManagerFactory来信任任何证书而无需任何验证。

重要的是,此信任管理器仅用于演示目的,不应在生产中使用。要使用受信任的证书,Netty的SslContextBuilder提供了许多替代方案。

最后我们将回到这个JUnit来引导客户端。

4.2 处理程序

现在,让我们看一下处理程序。

首先,我们需要一个名为Http2SettingsHandler的处理程序来处理HTTP/2的SETTINGS帧,它扩展了Netty的SimpleChannelInboundHandler:

public class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
    private final ChannelPromise promise;

    // constructor

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception {
        promise.setSuccess();
        ctx.pipeline().remove(this);
    }
}

该类只是初始化一个ChannelPromise并将其标记为成功。

它还有一个实用方法awaitSettings,我们的客户端将使用它来等待初始握手完成:

public void awaitSettings(long timeout, TimeUnit unit) throws Exception {
    if (!promise.awaitUninterruptibly(timeout, unit)) {
        throw new IllegalStateException("Timed out waiting for settings");
    }
}

如果在规定的超时期限内没有发生通道读取,则会抛出IllegalStateException。

其次,我们需要一个处理程序来处理从服务器获得的响应,我们将其命名为Http2ClientResponseHandler:

public class Http2ClientResponseHandler extends SimpleChannelInboundHandler {

    private final Map<Integer, MapValues> streamidMap;

    // constructor
}

此类还扩展了SimpleChannelInboundHandler并声明了一个MapValues的streamidMap,这是我们的Http2ClientResponseHandler的内部类:

public static class MapValues {
    ChannelFuture writeFuture;
    ChannelPromise promise;

    // constructor and getters
}

我们添加了这个类,以便能够为给定的整数键存储两个值。

该处理程序还有一个实用方法put,用于将值放入streamidMap中:

public MapValues put(int streamId, ChannelFuture writeFuture, ChannelPromise promise) {
    return streamidMap.put(streamId, new MapValues(writeFuture, promise));
}

接下来我们看看当管道中读取通道时,这个处理程序会做什么。

基本上,这是我们从服务器获取DATA帧或ByteBuf内容作为FullHttpResponse的地方,并且可以按照我们想要的方式对其进行操作。

在此示例中,我们仅记录它:

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
    Integer streamId = msg.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
    if (streamId == null) {
        logger.error("HttpResponseHandler unexpected message received: " + msg);
        return;
    }

    MapValues value = streamidMap.get(streamId);

    if (value == null) {
        logger.error("Message received for unknown stream id " + streamId);
    } else {
        ByteBuf content = msg.content();
        if (content.isReadable()) {
            int contentLength = content.readableBytes();
            byte[] arr = new byte[contentLength];
            content.readBytes(arr);
            logger.info(new String(arr, 0, contentLength, CharsetUtil.UTF_8));
        }

        value.getPromise().setSuccess();
    }
}

在方法的最后,我们将ChannelPromise标记为成功以指示正确完成。

与我们描述的第一个处理程序一样,此类还包含一个供客户端使用的实用方法,该方法使我们的事件循环等待,直到ChannelPromise成功;或者换句话说,它等到响应处理完成:

public String awaitResponses(long timeout, TimeUnit unit) {
    Iterator<Entry<Integer, MapValues>> itr = streamidMap.entrySet().iterator();        
    String response = null;

    while (itr.hasNext()) {
        Entry<Integer, MapValues> entry = itr.next();
        ChannelFuture writeFuture = entry.getValue().getWriteFuture();

        if (!writeFuture.awaitUninterruptibly(timeout, unit)) {
            throw new IllegalStateException("Timed out waiting to write for stream id " + entry.getKey());
        }
        if (!writeFuture.isSuccess()) {
            throw new RuntimeException(writeFuture.cause());
        }
        ChannelPromise promise = entry.getValue().getPromise();

        if (!promise.awaitUninterruptibly(timeout, unit)) {
            throw new IllegalStateException("Timed out waiting for response on stream id " + entry.getKey());
        }
        if (!promise.isSuccess()) {
            throw new RuntimeException(promise.cause());
        }
        logger.info("---Stream id: " + entry.getKey() + " received---");
        response = entry.getValue().getResponse();
            
        itr.remove();
    }        
    return response;
}

4.3 Http2ClientInitializer

正如我们在服务器案例中看到的,ChannelInitializer的目的是设置管道:

public class Http2ClientInitializer extends ChannelInitializer {

    private final SslContext sslCtx;
    private final int maxContentLength;
    private Http2SettingsHandler settingsHandler;
    private Http2ClientResponseHandler responseHandler;
    private String host;
    private int port;

    // constructor

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        settingsHandler = new Http2SettingsHandler(ch.newPromise());
        responseHandler = new Http2ClientResponseHandler();
        
        if (sslCtx != null) {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
            pipeline.addLast(Http2Util.getClientAPNHandler(maxContentLength, settingsHandler, responseHandler));
        }
    }
    // getters
}

在这种情况下,我们使用新的SslHandler启动管道,以在握手过程开始时添加TLS SNI扩展

然后, ApplicationProtocolNegotiationHandler负责在管道中排列连接处理程序和我们的自定义处理程序:

public static ApplicationProtocolNegotiationHandler getClientAPNHandler(int maxContentLength, Http2SettingsHandler settingsHandler, Http2ClientResponseHandler responseHandler) {
    final Http2FrameLogger logger = new Http2FrameLogger(INFO, Http2ClientInitializer.class);
    final Http2Connection connection = new DefaultHttp2Connection(false);

    HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder().frameListener(
        new DelegatingDecompressorFrameListener(connection, 
            new InboundHttp2ToHttpAdapterBuilder(connection)
                .maxContentLength(maxContentLength)
                .propagateSettings(true)
                .build()))
            .frameLogger(logger)
            .connection(connection)
            .build();

    ApplicationProtocolNegotiationHandler clientAPNHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
        @Override
        protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
            if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
                ChannelPipeline p = ctx.pipeline();
                p.addLast(connectionHandler);
                p.addLast(settingsHandler, responseHandler);
                return;
            }
            ctx.close();
            throw new IllegalStateException("Protocol: " + protocol + " not supported");
        }
    };
    return clientAPNHandler;
}

现在剩下要做的就是启动客户端并发送请求。

4.4 启动客户端

客户端的启动在某种程度上与服务器类似,之后,我们需要添加一些功能来处理发送请求和接收响应。

如前所述,我们将其写为JUnit测试:

@Test
public void whenRequestSent_thenHelloWorldReceived() throws Exception {
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    Http2ClientInitializer initializer = new Http2ClientInitializer(sslCtx, Integer.MAX_VALUE, HOST, PORT);

    try {
        Bootstrap b = new Bootstrap();
        b.group(workerGroup);
        b.channel(NioSocketChannel.class);
        b.option(ChannelOption.SO_KEEPALIVE, true);
        b.remoteAddress(HOST, PORT);
        b.handler(initializer);

        channel = b.connect().syncUninterruptibly().channel();

        logger.info("Connected to [" + HOST + ':' + PORT + ']');

        Http2SettingsHandler http2SettingsHandler = initializer.getSettingsHandler();
        http2SettingsHandler.awaitSettings(60, TimeUnit.SECONDS);
  
        logger.info("Sending request(s)...");

        FullHttpRequest request = Http2Util.createGetRequest(HOST, PORT);

        Http2ClientResponseHandler responseHandler = initializer.getResponseHandler();
        int streamId = 3;

        responseHandler.put(streamId, channel.write(request), channel.newPromise());
        channel.flush();
 
        String response = responseHandler.awaitResponses(60, TimeUnit.SECONDS);

        assertEquals("Hello World", response);

        logger.info("Finished HTTP/2 request(s)");
    } finally {
        workerGroup.shutdownGracefully();
    }
}

值得注意的是,这些是我们在服务器启动方面采取的额外步骤:

  • 首先,我们利用Http2SettingsHandler的awaitSettings方法等待初次握手
  • 其次,我们将请求创建为FullHttpRequest
  • 第三,我们将streamId放入Http2ClientResponseHandler的streamIdMap中,并调用其awaitResponses方法
  • 最后我们验证了响应中确实收到了Hello World

简而言之,发生了以下事情-客户端发送了一个HEADERS帧,进行了初始SSL握手,然后服务器在HEADERS和DATA帧中发送了响应。

5. 总结

在本教程中,我们了解了如何使用代码示例在Netty中实现HTTP/2服务器和客户端,以使用HTTP/2帧获取Hello World响应。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章