1. 概述
在本教程中,我们将了解如何使用WebSockets从服务器向浏览器发送调度消息。另一种方法是使用服务器发送事件(SSE),但我们不会在本文中介绍它。
Spring提供了多种调度选项。首先,我们将介绍@Scheduled注解。然后,我们将看到一个使用Project Reactor提供的Flux::interval方法的示例。这个库对于Webflux应用程序来说是开箱即用的,并且可以在任何Java项目中用作独立的库。
此外,还存在更高级的机制,例如Quartz调度程序,但我们不会介绍它们。
2. 一个简单的聊天应用程序
在上一篇文章中,我们使用WebSockets构建了一个聊天应用程序。让我们用一个新功能来扩展它:聊天机器人。这些机器人是将预定消息推送到浏览器的服务器端组件。
2.1 Maven依赖
让我们从在Maven中设置必要的依赖关系开始。要构建这个项目,我们的pom.xml应该有:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
2.2 JavaFaker依赖
我们将使用JavaFaker库来生成机器人的消息。此库通常用于生成测试数据。在这里,我们将向我们的聊天室添加一位名为“Chuck Norris”的客人。
让我们看看代码:
Faker faker = new Faker();
ChuckNorris chuckNorris = faker.chuckNorris();
String messageFromChuck = chuckNorris.fact();
Faker将为各种数据生成器提供工厂方法。我们将使用ChuckNorris生成器。调用chuckNorris.fact()将显示预定义消息列表中的随机句子。
2.3 数据模型
聊天应用程序使用一个简单的POJO作为消息包装器:
public class OutputMessage {
private String from;
private String text;
private String time;
// standard constructors, getters/setters, equals and hashcode
}
综上所述,以下是我们如何创建聊天消息的示例:
OutputMessage message = new OutputMessage("Chatbot 1", "Hello there!", new SimpleDateFormat("HH:mm").format(new Date())));
2.4 客户端
我们的聊天客户端是一个简单的HTML页面。它使用SockJS客户端和STOMP消息协议。
让我们看看客户端如何订阅主题:
<html>
<head>
<script src="./js/sockjs-0.3.4.js"></script>
<script src="./js/stomp.js"></script>
<script type="text/javascript">
// ...
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
// ...
stompClient.subscribe('/topic/pushmessages', function(messageOutput) {
showMessageOutput(JSON.parse(messageOutput.body));
});
});
// ...
</script>
</head>
<!-- ... -->
</html>
首先,我们通过SockJS协议创建了一个Stomp客户端。然后,主题订阅充当服务器和连接的客户端之间的通信通道。
在我们的仓库中,此代码位于webapp/bots.html中。我们在http://localhost:8080/bots.html本地运行时访问它。当然,我们需要根据我们部署应用程序的方式来调整主机和端口。
2.5 服务器端
在上一篇文章中,我们已经了解了如何在Spring中配置WebSockets。让我们稍微修改一下配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// ...
registry.addEndpoint("/chatwithbots");
registry.addEndpoint("/chatwithbots").withSockJS();
}
}
为了推送我们的消息,我们使用实用程序类SimpMessagingTemplate。默认情况下,它在Spring上下文中作为@Bean提供。当AbstractMessageBrokerConfiguration在类路径中时,我们可以看到它是如何通过自动配置声明的。因此,我们可以将它注入到任何Spring组件中。
之后,我们使用它向主题/topic/pushmessages发布消息。我们假设我们的类将该bean注入到名为simpMessagingTemplate的变量中:
simpMessagingTemplate.convertAndSend("/topic/pushmessages", new OutputMessage("Chuck Norris", faker.chuckNorris().fact(), time));
如前面在我们的客户端示例中所示,客户端订阅该主题以在消息到达时对其进行处理。
3. 定时推送消息
在Spring生态系统中,我们可以选择多种调度方式。如果我们使用Spring MVC,@Scheduled注解因其简单性而成为自然的选择。如果我们使用Spring Webflux,我们也可以使用Project Reactor的Flux::interval方法。我们将看到每个示例。
3.1 配置
我们的聊天机器人将使用JavaFaker的Chuck Norris生成器。我们将把它配置为一个bean,这样我们就可以将它注入到我们需要的地方。
@Configuration
class AppConfig {
@Bean
public ChuckNorris chuckNorris() {
return (new Faker()).chuckNorris();
}
}
3.2 使用@Scheduled
我们的示例机器人是调度方法。当它们运行时,它们使用SimpMessagingTemplate通过WebSocket发送我们的OutputMessage POJO。
顾名思义,@Scheduled注解允许重复执行方法。有了它,我们可以使用简单的基于速率的调度或更复杂的“cron”表达式。
让我们编写我们的第一个聊天机器人:
@Service
public class ScheduledPushMessages {
@Scheduled(fixedRate = 5000)
public void sendMessage(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
String time = new SimpleDateFormat("HH:mm").format(new Date());
simpMessagingTemplate.convertAndSend("/topic/pushmessages", new OutputMessage("Chuck Norris (@Scheduled)", chuckNorris().fact(), time));
}
}
我们用@Scheduled(fixedRate = 5000)标注sendMessage方法,这使得sendMessage每5秒运行一次。然后,我们使用simpMessagingTemplate实例向主题发送OutputMessage。simpMessagingTemplate和chuckNorris实例作为方法参数从Spring上下文注入。
3.3 使用Flux::interval()
如果我们使用WebFlux,我们可以使用Flux::interval运算符。它将发布由所选Duration分隔的无限长元素流。
现在,让我们在前面的示例中使用Flux。目标是每5秒发送一次Chuck Norris的报价。首先,我们需要实现InitializingBean接口以在应用程序启动时订阅Flux:
@Service
public class ReactiveScheduledPushMessages implements InitializingBean {
private SimpMessagingTemplate simpMessagingTemplate;
private ChuckNorris chuckNorris;
@Autowired
public ReactiveScheduledPushMessages(SimpMessagingTemplate simpMessagingTemplate, ChuckNorris chuckNorris) {
this.simpMessagingTemplate = simpMessagingTemplate;
this.chuckNorris = chuckNorris;
}
@Override
public void afterPropertiesSet() throws Exception {
Flux.interval(Duration.ofSeconds(5L))
// discard the incoming Long, replace it by an OutputMessage
.map((n) -> new OutputMessage("Chuck Norris (Flux::interval)",
chuckNorris.fact(),
new SimpleDateFormat("HH:mm").format(new Date())))
.subscribe(message -> simpMessagingTemplate.convertAndSend("/topic/pushmessages", message));
}
}
在这里,我们使用构造函数注入来设置simpMessagingTemplate和chuckNorris实例。这一次,调度逻辑在afterPropertiesSet()中,我们在实现InitializingBean时覆盖了它。该方法将在服务启动后立即运行。
interval运算符每5秒发出一个Long。然后,map运算符丢弃该值并用我们的消息替换它。最后,我们subscribe Flux以触发我们对每条消息的逻辑。
4. 总结
在本教程中,我们已经看到实用程序类SimpMessagingTemplate可以轻松地通过WebSocket推送服务器消息。此外,我们还看到了两种调度一段代码执行的方法。
与往常一样,本教程的完整源代码可在GitHub上获得。