使用Spring Data Redis的PubSub消息传递

2023/05/18

1. 概述

在探讨Spring Data Redis系列的第二篇文章中,我们将了解发布/订阅消息队列。

在Redis中,发布者没有被编程为将消息发送给特定的订阅者。相反,发布的消息被描述为通道,而不知道可能存在哪些(如果有)订阅者。

同样,订阅者表示对一个或多个主题感兴趣,并且只接收感兴趣的消息,而不知道有哪些(如果有)发布者。

发布者和订阅者的这种解耦可以允许更大的可扩展性和更动态的网络拓扑。

2. Redis配置

让我们开始添加消息队列所需的配置。

首先,我们将定义一个MessageListenerAdapter bean,其中包含一个名为RedisMessageSubscriber的MessageListener接口的自定义实现。这个bean充当pub-sub消息传递模型中的订阅者:

@Bean
MessageListenerAdapter messageListener() { 
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

RedisMessageListenerContainer是Spring Data Redis提供的一个类,它为Redis消息监听器提供异步行为。这是在内部调用的,根据Spring Data Redis文档-“处理监听、转换和消息调度的低级细节”。

@Bean
RedisMessageListenerContainer redisContainer() {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer(); 
    container.setConnectionFactory(jedisConnectionFactory()); 
    container.addMessageListener(messageListener(), topic()); 
    return container; 
}

我们还将使用自定义构建的MessagePublisher接口和RedisMessagePublisher实现创建一个bean。这样,我们就可以拥有一个通用的消息发布API,并让Redis实现将redisTemplate和topic作为构造函数参数:

@Bean
MessagePublisher redisPublisher() { 
    return new RedisMessagePublisher(redisTemplate(), topic());
}

最后,我们将设置一个主题,发布者将向其发送消息,而订阅者将接收消息:

@Bean
ChannelTopic topic() {
    return new ChannelTopic("messageQueue");
}

3. 发布消息

3.1 定义MessagePublisher接口

Spring Data Redis不提供用于消息分发的MessagePublisher接口。我们可以定义一个自定义接口,它将在实现中使用redisTemplate:

public interface MessagePublisher {
    void publish(String message);
}

3.2 RedisMessagePublisher实现

我们的下一步是提供MessagePublisher接口的实现,添加消息发布细节并使用redisTemplate中的方法。

该模板包含一组非常丰富的方法,适用于广泛的操作-其中convertAndSend能够通过主题将消息发送到队列:

public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher() {
    }

    public RedisMessagePublisher(RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

如你所见,发布者实现很简单。它使用redisTemplate的convertAndSend()方法格式化给定消息并将其发布到配置的主题。

主题实现发布和订阅语义:当消息发布时,它会发送给所有注册监听该主题的订阅者。

4. 订阅消息

RedisMessageSubscriber实现了Spring Data Redis提供的MessageListener接口:

@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<String>();

    public void onMessage(Message message, byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + message.toString());
    }
}

请注意,还有一个名为pattern的第二个参数,我们在此示例中没有使用它。Spring Data Redis文档指出此参数表示“与通道匹配的模式(如果指定)”,但它可以为null。

5. 发送和接收消息

现在我们把它们放在一起。让我们创建一条消息,然后使用RedisMessagePublisher发布它:

String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);

当我们调用publish(message)时,内容被发送到Redis,在那里它被路由到我们的发布者中定义的消息队列主题。然后将其分发给该主题的订阅者。

你可能已经注意到RedisMessageSubscriber是一个监听器,它将自身注册到队列中以检索消息。

消息到达时,触发订阅者定义的onMessage()方法。

在我们的示例中,我们可以通过检查RedisMessageSubscriber中的messageList来验证我们是否收到了已发布的消息:

RedisMessageSubscriber.messageList.get(0).contains(message)

6. 总结

在本文中,我们研究了使用Spring Data Redis的发布/订阅消息队列实现。

与往常一样,本教程的完整源代码可在GitHub上获得。

Show Disqus Comments

Post Directory

扫码关注公众号:Taketoday
发送 290992
即可立即永久解锁本站全部文章