1. 概述
在本文中,我们将使用新的Spring 5 WebSockets API以及Spring WebFlux提供的响应式功能创建一个快速示例。
WebSocket是一种众所周知的协议,可以实现客户端和服务器之间的全双工通信,通常用于客户端和服务器需要以高频率和低延迟交换事件的Web应用程序。
Spring 5在框架中对WebSockets支持进行了现代化改造,为该通信通道添加了响应式功能。
我们可以在此处找到有关Spring WebFlux的更多信息。
2. Maven依赖
我们将为spring-boot-integration和spring-boot-starter-webflux使用spring-boot-starters依赖项,目前可在Spring Milestone Repository获得。
在此示例中,我们使用最新的可用版本2.0.0.M7,但应该始终获取Maven仓库中可用的最新版本:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
3. Spring中的WebSocket配置
我们的配置非常简单:我们将注入WebSocketHandler来处理Spring WebSocket应用程序中的套接字会话。
@Configuration
public class ReactiveWebSocketConfiguration {
@Autowired
private WebSocketHandler webSocketHandler;
}
此外,让我们创建一个HandlerMapping bean注解方法,该方法负责请求和处理程序对象之间的映射:
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
我们可以连接到的URL是:ws://localhost:
4. Spring中的WebSocket消息处理
我们的ReactiveWebSocketHandler类将负责管理服务器端的WebSocket会话。
它实现了WebSocketHandler接口,因此我们可以重写handle方法,该方法将用于将消息发送到WebSocket客户端:
@Component("ReactiveWebSocketHandler")
public class ReactiveWebSocketHandler implements WebSocketHandler {
private static final ObjectMapper json = new ObjectMapper();
private final Flux<String> eventFlux = Flux.generate(sink -> {
Event event = new Event(randomUUID().toString(), now().toString());
try {
sink.next(json.writeValueAsString(event));
} catch (JsonProcessingException e) {
sink.error(e);
}
});
private final Flux<String> intervalFlux = Flux.interval(Duration.ofMillis(1000L))
.zipWith(eventFlux, (time, event) -> event);
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(intervalFlux.map(session::textMessage))
.and(session.receive().map(WebSocketMessage::getPayloadAsText).log());
}
}
5. 创建一个简单的响应式WebSocket客户端
现在让我们创建一个Spring Reactive WebSocket客户端,它将能够与我们的WebSocket服务器连接并交换信息。
5.1 Maven依赖
首先,Maven依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
在这里,我们使用与之前相同的spring-boot-starter-webflux来设置我们的响应式WebSocket服务器应用程序。
5.2 WebSocket客户端
现在,让我们创建ReactiveClientWebSocket类,负责启动与服务器的通信:
public class ReactiveJavaClientWebSocket {
public static void main(String[] args) throws InterruptedException {
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(URI.create("ws://localhost:8080/event-emitter"),
session -> session.send(
Mono.just(session.textMessage("event-spring-reactive-client-websocket")))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.log())
.then())
.block(Duration.ofSeconds(10L));
}
}
在上面的代码中,我们可以看到我们使用的是ReactorNettyWebSocketClient,这是与Reactor Netty一起使用的WebSocketClient实现。
此外,客户端通过URL ws://localhost:8080/event-emitter连接到WebSocket服务器,在连接到服务器后立即建立会话。
我们还可以看到,我们正在向服务器发送一条消息(“event-spring-reactive-client-websocket”)以及连接请求。
此外,方法send被调用,期望将Publisher<T>类型的变量作为参数,在我们的例子中,Publisher<T>是Mono<T>,T是一个简单的字符串“event-me-from-reactive-java-client-websocket“。
然后,调用了期望类型为String的Flux的thenMany(…)方法。receive()方法获取传入消息的Flux,然后将其转换为字符串。
最后,block()方法强制客户端在给定时间(在我们的示例中为10秒)后与服务器断开连接。
5.3 启动客户端
要运行它,请确保响应式WebSocket服务器已启动并正在运行。然后,启动ReactiveJavaClientWebSocket类,我们可以在日志中看到正在发出的事件:
[reactor-http-nio-4] INFO reactor.Flux.Map.1 -
onNext({"eventId":"6042b94f-fd02-47a1-911d-dacf97f12ba6",
"eventDt":"2022-09-16T21:29:26.900"})
我们还可以在响应式WebSocket服务器的日志中看到客户端在连接尝试期间发送的消息:
[reactor-http-nio-2] reactor.Flux.Map.1:
onNext(event-me-from-reactive-java-client)
此外,我们可以在客户端完成其请求后看到终止连接的消息(在我们的例子中,为10秒之后):
[reactor-http-nio-2] reactor.Flux.Map.1: onComplete()
6. 浏览器WebSocket客户端
让我们创建一个简单的HTML/Javascript客户端WebSocket来使用我们的响应式WebSocket服务器应用程序。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>tuyucheng: Spring 5 Reactive Client WebSocket (Browser)</title>
</head>
<body>
<div class="events"></div>
<script>
var clientWebSocket = new WebSocket("ws://localhost:8080/event-emitter");
clientWebSocket.onopen = function () {
console.log("clientWebSocket.onopen", clientWebSocket);
console.log("clientWebSocket.readyState", "websocketstatus");
clientWebSocket.send("event-me-from-browser");
}
clientWebSocket.onclose = function (error) {
console.log("clientWebSocket.onclose", clientWebSocket, error);
events("Closing connection");
}
clientWebSocket.onerror = function (error) {
console.log("clientWebSocket.onerror", clientWebSocket, error);
events("An error occured");
}
clientWebSocket.onmessage = function (error) {
console.log("clientWebSocket.onmessage", clientWebSocket, error);
events(error.data);
}
function events(responseEvent) {
document.querySelector(".events").innerHTML += responseEvent + "<br>";
}
</script>
</body>
</html>
在WebSocket服务器运行的情况下,在浏览器(例如:Chrome、Internet Explorer、Mozilla Firefox等)中打开这个HTML文件,我们应该看到屏幕上打印的事件,每个事件延迟1秒,如WebSocket服务器所定义。
{"eventId":"c25975de-6775-4b0b-b974-b396847878e6","eventDt":"2022-09-16T21:56:09.780"}
{"eventId":"ac74170b-1f71-49d3-8737-b3f9a8a352f9","eventDt":"2022-09-16T21:56:09.781"}
{"eventId":"40d8f305-f252-4c14-86d7-ed134d3e10c6","eventDt":"2022-09-16T21:56:09.782"}
7. 总结
在这里,我们展示了一个示例,说明如何使用Spring 5框架在服务器和客户端之间创建WebSocket通信,实现Spring Webflux提供的新响应式功能。
与往常一样,本教程的完整源代码可在GitHub上获得。