统一消息发送
[一]背景
各种组件和模块可以便捷的集成是 Spring Boot 框架的一大亮点和特色。微服务系统由于其架构的复杂性以及不同场景的需求,往往会集成多种不同的“消息系统”,例如:消息队列、WebSocket、Email 等,以满足服务与服务间、系统与用户间的消息交互需求。
不同的消息系统,使用的组件、实现方式以及调用方式均不相同,这无形之中就增加了代码的复杂度。在开发过程中,如果任由开发人员在服务中随意添加相关代码,无序的使用和依赖,不仅会让代码以及服务越来越臃肿,而且会让代码逻辑越来越混乱,越来越难以维护。
为了解决这个问题,Dante Cloud 实现了一种统一的消息发送机制。所有的“消息系统”的配置和支持均由 Message
服务负责,提供统一的消息发送接口通过传递不同的参数,就可以在任意的服务中通过代码的形式发送任意类型的消息。
提示
这里所说的统一消息发送,指的是在后端服务代码层面,使用统一的方法在任意的微服务中发送消息。不包含前端与后端的调用和交互,对于前端来说,有提供有专门的 Rest API 供使用,而且并不是所有的消息系统都需要与前端存在调用关系。
[二]使用
消息统一发送,是指在 Message
服务以外的服务中,以代码的形式,通过统一的接口将不同的类型消息统一发送至 Message
服务中,再由 Message
服务将不同类型的消息转发至不同的消息系统中。
所以,在使用时要区分 Message
服务以及非 Message
服务。
注意
Gateway
是特殊的服务,不建议在 Gateway
服务中直接发送消息。这样做不仅让 Gateway
的职责混乱,还会降低系统整体的安全性。
[1]支持内容
目前,以下“消息系统”支持使用统一发送接口进行消息发送:
Mail
:发送 Email 通知Mail Notification
:也是发送 Email 通知,主要用于系统内置的通知消息,包含:登录地址异常通知,Email 验证码和电子邮件确认(不支持 HTML 格式邮件)Mqtt
:向 Mqtt 中某个 Topic 发送消息。主要用于物联网应用支持RSocket 广播消息
:发送基于 RSocket 的 WebSocket 广播通知(仅使用于Message
服务为 Reactive 模式)RSocket 用户消息
:向指定用户发送基于 RSocket 的 WebSocket 用户消息,支持跨示例发送(仅使用于Message
服务为 Reactive 模式)WebSocket 广播消息
:发送基于 Stomp 的 WebSocket 广播通知(仅使用于Message
服务为 Servlet 模式)WebSocket 用户消息
:向指定用户发送基于 Stomp 的 WebSocket 用户消息,支持跨示例发送(仅使用于Message
服务为 Servlet 模式)Stream 消息
:即消息队列消息(本系统主要使用 Spring Cloud Stream 和 Spring Cloud Bus 实现消息队列需求)
[2]非Message服务使用
想要在非Message
服务的代码中使用统一发送消息,仅需要在你的代码中,注入 MessageSendingEventManager
Bean,然后通过调用其中相应的方法就可以发送消息。
例如,新建一个 MyMessageService
,在这个 Service 代码中注入MessageSendingEventManager
Bean,如下例所示:
@Service
public class MyMessageService {
private final MessageSendingEventManager messageSendingEventManager;
public MyMessageService(MessageSendingEventManager messageSendingEventManager) {
this.messageSendingEventManager = messageSendingEventManager;
}
}
通过调用 MessageSendingEventManager
不同的方法,就可以实现不同类型消息系统的消息发送。具体的接口定义如下:
public interface MessageSendingEventManager extends ApplicationStrategyEventManager<Message<?>> {
/**
* 发送电子邮件消息
* <p>
* 注意:这个方法目前不支持 HTML 模版解析
*
* @param mailMessage {@link MailMessage}
*/
default void mail(MailMessage mailMessage) {
postProcess(new Message<>(MessageCategory.MAIL, mailMessage));
}
/**
* 发送内置电子邮件通知。
* <p>
* 注意:这个方法仅支持系统内置 HTML 模版电子邮件。具体解析方式参见:<code>message-mail-spring-boot-starter</code>
*
* @param mailNotification {@link MailNotification}
*/
default void mailNotification(MailNotification mailNotification) {
postProcess(new Message<>(MessageCategory.MAIL_NOTIFICATION, mailNotification));
}
/**
* 向 Mqtt Broker 发送消息
*
* @param mqttMessage {@link MqttMessage}
*/
default void mqtt(MqttMessage mqttMessage) {
postProcess(new Message<>(MessageCategory.MQTT, mqttMessage));
}
/**
* 发送 RSocket 广播消息
* <p>
* 当 Message 服务为 Reactive 模式时,可使用该方法发送基于 RSocket 的广播消息。
* <p>
* 注意:因为 RSocket 的特性使然,只有当有客户端已经连接了 RSocket Server(例如:使用前端登录某个或者某些用户,RSocket 服务内存中已经有了用户信息)才能接收到消息。
*
* @param broadcastMessage {@link BroadcastMessage}
*/
default void rsocketBroadcast(BroadcastMessage broadcastMessage) {
postProcess(new Message<>(MessageCategory.RSOCKET_BROADCAST, broadcastMessage));
}
/**
* 向某个在线用户发送基于 RSocket 的消息
* <p>
* 当 Message 服务为 Reactive 模式时,可使用该方法发送基于 RSocket 的广播消息。
* <p>
* 注意:因为 RSocket 的特性使然,只有当有客户端已经连接了 RSocket Server(例如:使用前端登录某个或者某些用户,RSocket 服务内存中已经有了用户信息)才能接收到消息。
*
* @param userMessage {@link UserMessage}
*/
default void rsocketToUser(UserMessage userMessage) {
postProcess(new Message<>(MessageCategory.RSOCKET_USER, userMessage));
}
/**
* 发送 Websocket 广播消息
* <p>
* 当 Message 服务为 Servlet 模式时,可使用该方法发送基于 Websocket 的广播消息。
* <p>
* 注意:因为 Websocket 的特性使然,只有当有客户端已经连接了 Websocket Server(例如:使用前端登录某个或者某些用户,Websocket 服务内存中已经有了用户信息)才能接收到消息。
*
* @param broadcastMessage {@link BroadcastMessage}
*/
default void websocketBroadcast(BroadcastMessage broadcastMessage) {
postProcess(new Message<>(MessageCategory.WEBSOCKET_BROADCAST, broadcastMessage));
}
/**
* 向某个在线用户发送基于 Websocket 的消息
* <p>
* 当 Message 服务为 Servlet 模式时,可使用该方法发送基于 Websocket 的广播消息。
* <p>
* 注意:因为 Websocket 的特性使然,只有当有客户端已经连接了 Websocket Server(例如:使用前端登录某个或者某些用户,Websocket 服务内存中已经有了用户信息)才能接收到消息。
*
* @param userMessage {@link UserMessage}
*/
default void websocketToUser(UserMessage userMessage) {
postProcess(new Message<>(MessageCategory.WEBSOCKET_USER, userMessage));
}
}
[3]Message服务使用
前端想要集成消息支持,直接调用 Message
服务的 Rest API 即可;后端服务想要发送消息,使用统一的发送接口即可。
如果想要在 Message
服务中发送消息,本质上就是在扩展 Message
服务的代码,所以直接修改和使用现有代码即可。当然,本项目中也考虑到了使用的便捷性,如果你的业务需求并不复杂,仅是需要在 Message
服务中发送消息,使用以下事件就可以发送消息。
具体的事件如下:
MailMessageSendingEvent
:发送 Email 消息,不支持 HTML 模版邮件MailNotificationSendingEvent
:发送内置的邮件通知,包含登录地址异常通知,Email 验证码和电子邮件确认,支持 HTML 模版邮件MqttMessageSendingEvent
:向指定的 Topic 发送 Mqtt 消息RSocketBroadcastMessageSendingEvent
:发送基于 RSocket 的 WebSocket 广播消息RSocketFireAndForgetMessageReceivingEvent
:向指定用户发送基于 RSocket 的 WebSocket 用户消息WebSocketBroadcastMessageSendingEvent
:发送基于 Stomp 的 WebSocket 广播消息WebSocketUserMessageSendingEvent
:向指定用户发送基于 Stomp 的 WebSocket 用户消息StreamMessageSendingEvent
:发送 MQ 消息
[三]注意事项
- 系统代码已经在所有的非
Message
服务中,配置了MessageSendingEventManager
Bean,所以在自己的代码中直接注入使用即可。 - 虽然支持各种消息的统一发送,但是相关的消息系统还需要在
Message
服务中进行配置。如果没有配置相关的组件,消息发送不会出错,但是不会有具体效果。 - 因 RSocket 和 WebSocket 自身特性所限,不管是发送广播还是用户消息,前提是在
Message
服务中已经有了相关用户信息,即有用户通过前端登录了系统,否则看不到具体消息效果。