RocketMQ
Categories:
了解: WebSocket
学习过 websocekt
后, 我们知道通过建立长连接可以实现 服务器和浏览器的双向推送
通过浏览器协调, 可以实现端对端的通信
webServer
初始化一个 Session 池作为核心容器, 当 webServer
在分布式中作为服务注入时, 其他服务也要调用消息怎么办?
可以让其他服务作为消费者, 让服务生产消息, 但是webserver
一旦负责消息通讯和消息生产职责就不再单一, 需要解耦
长连接传递的消息是即时的, 一旦连接断开消息就会丢失, 我们需要一个方法持久化消息
长连接会阻塞服务线程, 建立连接后双方不得不占用一个线程持续的监听, 需要把监听的任务解耦出去. 一旦并发消息过多, 服务器可能会崩溃
综上, 我们有了消息队列
-
- 异步通信
场景描述:当系统中的应用程序需要异步通信时,可以使用消息中间件来实现。
示例:在一个电商系统中,订单系统下单后需要给库存系统发送异步消息,以减少系统之间的耦合,避免同步调用导致的性能问题或响应延迟。
优势:
- 减少耦合:系统可以在不等待响应的情况下继续进行其他操作。
- 提高系统性能:减少了同步阻塞,提高了响应速度。
- 应用解耦
场景描述:当应用程序需要进行松耦合的通信时,可以使用消息中间件来实现。
示例:在微服务架构中,不同服务之间可以通过消息中间件进行通信,避免直接调用服务导致的依赖关系过强。
优势:
- 降低耦合度:服务之间通过消息中间件进行通信,不需要直接调用,提高系统的灵活性。
- 增强可维护性:服务之间的更新和维护不影响其他服务。
- 消息排队
场景描述:当系统中需要处理大量的消息时,可以使用消息中间件来实现消息排队,确保消息的顺序和可靠性。
示例:在金融行业中,需要处理大量的交易消息,消息中间件可以实现消息排队,确保每笔交易都得到正确处理。
优势:
- 保证消息顺序:确保按顺序处理每条消息。
- 消息可靠性:即使出现系统故障,未处理的消息仍然可以从队列中恢复。
- 负载均衡
场景描述:当系统需要处理大量的请求时,可以使用消息中间件来实现负载均衡。
示例:在电商系统中,订单系统下单请求可以通过消息中间件发送到多个库存系统中,从而实现负载均衡。
优势:
- 分担负载:通过将消息分发到多个消费者,实现负载均衡,避免单点压力。
- 提高系统吞吐量:通过多节点并发处理提高系统处理能力。
- 系统削峰填谷
场景描述:当系统中出现高峰期时,可以使用消息中间件来平滑处理请求。
示例:在电商系统中,双十一等促销活动可能会导致系统请求量急剧增加,消息中间件可以通过缓存请求,避免系统崩溃。
优势:
- 平滑流量:在高峰期通过消息队列缓存请求,避免系统过载。
- 提高系统稳定性:避免高并发请求导致的性能瓶颈或服务崩溃。
使用 RocketMQ
并发环境下的消息队列用阿里提供的 RocketMQ
合适
server:
port: ${sp.publish}
spring:
application:
name: ${sn.publish}
cloud:
stream:
rocketmq:
binder:
name-server: ${rocket-mq.name-server}
bindings:
output:
producer:
group: socketGroup
sync: true
bindings:
output:
destination: test-topic
content-type: application/json
#开启MQ的日志
logging:
level:
com:
alibaba:
cloud:
stream:
binder:
rocketmq: DEBUG
项目依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
写 Swagger
配置, 不同于 webSocket
mq作为异步消息组件可以用C-S模型, 也就是请求-响应模型, 我们从 swagger测试请求响应
/**
* <p> * 描述:Swagger配置
* </p>
* @author hamhuo
* @version 1.0.0
*/@Configuration
@EnableSwagger2WebMvc
public class SwaggerConfig {
@Bean
Docket rpApi() {
return SwaggerCore.defaultDocketBuilder("消息推送", "com.zeroone.star.mq.controller", "publish");
}
}
消息发布组件 RmqPublish
, 该组件封装了发布方法
/**
* <p> * 描述:消息发布组件
* </p>
* @author hamhuo
* @version 1.0.0
*/@Component
public class RmqPublish {
@Resource
Source source;
public void publish(SampleNotifyDTO dto) {
source.output().send(MessageBuilder.withPayload(dto).build());
}
}
这里通知DTO封装的是 websocket
的客户端id 和消息
/**
* <p> * 描述:示例通知数据对象
* </p>
* @author hamhuo
* @version 1.0.0
*/@Data
public class SampleNotifyDTO {
/**
* 客户端编号
*/
private String clientId;
/**
* 通知消息内容
*/
private String message;
}
继续之前, 我们要解释一下发布组件
@Component
是声明式注解, 要求Spring
作为组件注入- 通过
@Resourse
注入了一个依赖, 该注解会查找指定名称的资源/Bean, 如果不指定就通过反射拿指定类型的资源, 这里查找到了一个Source
Source
接口返回一个MessageChannel
这个可以理解为发送消息到消息队列- 构建信息后链式调用 output 返回一个 MC, 调用MC发送信息
Source 信息源接口如下
其中 @OutPut
注解 要求 Spring
容器将名字为 OUTPUT
值的消息频道与中间件绑定
因此这里我们需要做一些配置
rocketmq:
binder:
name-server: ${rocket-mq.name-server}
bindings:
output:
producer:
group: socketGroup
sync: true
bindings:
output:
destination: test-topic
content-type: application/json
我们先看mq
的配置
spring.cloud.rocketmq
下配置名称服务器地址, 这里已经通过 nacos
配置好了
RocketMQ 的 NameServer 是 RocketMQ 集群中的一个核心组件,它提供了一个简单的服务注册与发现机制。具体来说,NameServer 主要用于管理 RocketMQ Broker 的信息,客户端通过访问 NameServer 来查询和获取消息队列的路由信息,从而实现消息的发送和接收。
这是为了让 Spring Cloud Stream 知道如何将消息发送到 RocketMQ
之后再配置 bindings.output
下配置生产者
producer.group
: 配置生产者的 消费组,用于指定该生产者的分组。sync
: 如果设置为true
,则表示生产者将同步发送消息。即,发送消息后会等待消息发送确认,确保消息发送成功。
生产者就是在 RocketMQ 中生产信息的角色
然后是第二个 bindings
, 这是 Spring Cloud 的配置
bindings:
output:
destination: test-topic
content-type: application/json
这里的 output
配置与 Source.OUTPUT
直接相关。通过这个配置,Spring Cloud Stream 会将 Source.OUTPUT
这个输出通道的 消息目标(即 destination
)设置为 test-topic
,并且设置消息的 内容类型 为 application/json
。这意味着,消息将通过名为 output
的通道发送到 test-topic
主题,并且消息格式是 JSON。
总结下, 为了保证代码松耦合, Spring 将消息发送和驱动(也就是连接不同品牌的MQ)分离开, 将驱动交由第三方提供. spring只需要调用Source发送消息即可