This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

1 - RocketMQ

MQ的一种, 高并发常用

了解: WebSocket

学习过 websocekt 后, 我们知道通过建立长连接可以实现 服务器和浏览器的双向推送 通过浏览器协调, 可以实现端对端的通信 webServer 初始化一个 Session 池作为核心容器, 当 webServer 在分布式中作为服务注入时, 其他服务也要调用消息怎么办?

可以让其他服务作为消费者, 让服务生产消息, 但是webserver 一旦负责消息通讯和消息生产职责就不再单一, 需要解耦

长连接传递的消息是即时的, 一旦连接断开消息就会丢失, 我们需要一个方法持久化消息

长连接会阻塞服务线程, 建立连接后双方不得不占用一个线程持续的监听, 需要把监听的任务解耦出去. 一旦并发消息过多, 服务器可能会崩溃

综上, 我们有了消息队列

    1. 异步通信

场景描述:当系统中的应用程序需要异步通信时,可以使用消息中间件来实现。
示例:在一个电商系统中,订单系统下单后需要给库存系统发送异步消息,以减少系统之间的耦合,避免同步调用导致的性能问题或响应延迟。

优势

  • 减少耦合:系统可以在不等待响应的情况下继续进行其他操作。
  • 提高系统性能:减少了同步阻塞,提高了响应速度。

  1. 应用解耦

场景描述:当应用程序需要进行松耦合的通信时,可以使用消息中间件来实现。
示例:在微服务架构中,不同服务之间可以通过消息中间件进行通信,避免直接调用服务导致的依赖关系过强。

优势

  • 降低耦合度:服务之间通过消息中间件进行通信,不需要直接调用,提高系统的灵活性。
  • 增强可维护性:服务之间的更新和维护不影响其他服务。

  1. 消息排队

场景描述:当系统中需要处理大量的消息时,可以使用消息中间件来实现消息排队,确保消息的顺序和可靠性。
示例:在金融行业中,需要处理大量的交易消息,消息中间件可以实现消息排队,确保每笔交易都得到正确处理。

优势

  • 保证消息顺序:确保按顺序处理每条消息。
  • 消息可靠性:即使出现系统故障,未处理的消息仍然可以从队列中恢复。

  1. 负载均衡

场景描述:当系统需要处理大量的请求时,可以使用消息中间件来实现负载均衡。
示例:在电商系统中,订单系统下单请求可以通过消息中间件发送到多个库存系统中,从而实现负载均衡。

优势

  • 分担负载:通过将消息分发到多个消费者,实现负载均衡,避免单点压力。
  • 提高系统吞吐量:通过多节点并发处理提高系统处理能力。

  1. 系统削峰填谷

场景描述:当系统中出现高峰期时,可以使用消息中间件来平滑处理请求。
示例:在电商系统中,双十一等促销活动可能会导致系统请求量急剧增加,消息中间件可以通过缓存请求,避免系统崩溃。

优势

  • 平滑流量:在高峰期通过消息队列缓存请求,避免系统过载。
  • 提高系统稳定性:避免高并发请求导致的性能瓶颈或服务崩溃。

使用 RocketMQ

并发环境下的消息队列用阿里提供的 RocketMQ 合适

server:  
  port: ${sp.publish}  
spring:  
  application:  
    name: ${sn.publish}  
  cloud:  
    stream:  
      rocketmq:  
        binder:  
          name-server: ${rocket-mq.name-server}  
        bindings:  
          output:  
            producer:  
              group: socketGroup  
              sync: true  
      bindings:  
        output:  
          destination: test-topic  
          content-type: application/json  
#开启MQ的日志  
logging:  
  level:  
    com:  
      alibaba:  
        cloud:  
          stream:  
            binder:  
              rocketmq: DEBUG

项目依赖

<dependency>  
    <groupId>com.alibaba.cloud</groupId>  
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>  
</dependency>

Swagger 配置, 不同于 webSocket mq作为异步消息组件可以用C-S模型, 也就是请求-响应模型, 我们从 swagger测试请求响应

/**  
 * <p> * 描述:Swagger配置  
 * </p>  
 * @author hamhuo  
 * @version 1.0.0  
 */@Configuration  
@EnableSwagger2WebMvc  
public class SwaggerConfig {  
    @Bean  
    Docket rpApi() {  
        return SwaggerCore.defaultDocketBuilder("消息推送", "com.zeroone.star.mq.controller", "publish");  
    }  
}

消息发布组件 RmqPublish , 该组件封装了发布方法

/**  
 * <p> * 描述:消息发布组件  
 * </p>  
 * @author hamhuo  
 * @version 1.0.0  
 */@Component  
public class RmqPublish {  
  
    @Resource  
    Source source;  
  
    public void publish(SampleNotifyDTO dto) {  
        source.output().send(MessageBuilder.withPayload(dto).build());  
    }  
}

这里通知DTO封装的是 websocket 的客户端id 和消息

  
/**  
 * <p> * 描述:示例通知数据对象  
 * </p>  
 * @author hamhuo  
 * @version 1.0.0  
 */@Data  
public class SampleNotifyDTO {  
    /**  
     * 客户端编号  
     */  
    private String clientId;  
  
    /**  
     * 通知消息内容  
     */  
    private String message;  
}

继续之前, 我们要解释一下发布组件

  • @Component 是声明式注解, 要求 Spring 作为组件注入
  • 通过 @Resourse 注入了一个依赖, 该注解会查找指定名称的资源/Bean, 如果不指定就通过反射拿指定类型的资源, 这里查找到了一个 Source
  • Source 接口返回一个 MessageChannel 这个可以理解为发送消息到消息队列
  • 构建信息后链式调用 output 返回一个 MC, 调用MC发送信息

Source 信息源接口如下

image.png

其中 @OutPut 注解 要求 Spring 容器将名字为 OUTPUT 值的消息频道与中间件绑定

因此这里我们需要做一些配置

rocketmq:  
  binder:  
    name-server: ${rocket-mq.name-server}  
  bindings:  
    output:  
      producer:  
        group: socketGroup  
        sync: true  
bindings:  
  output:  
    destination: test-topic  
    content-type: application/json

我们先看mq的配置

spring.cloud.rocketmq 下配置名称服务器地址, 这里已经通过 nacos 配置好了

RocketMQNameServer 是 RocketMQ 集群中的一个核心组件,它提供了一个简单的服务注册与发现机制。具体来说,NameServer 主要用于管理 RocketMQ Broker 的信息,客户端通过访问 NameServer 来查询和获取消息队列的路由信息,从而实现消息的发送和接收。

这是为了让 Spring Cloud Stream 知道如何将消息发送到 RocketMQ

之后再配置 bindings.output 下配置生产者

  • producer.group: 配置生产者的 消费组,用于指定该生产者的分组。
  • sync: 如果设置为 true,则表示生产者将同步发送消息。即,发送消息后会等待消息发送确认,确保消息发送成功。

生产者就是在 RocketMQ 中生产信息的角色

image.png

然后是第二个 bindings, 这是 Spring Cloud 的配置

bindings:  
  output:  
    destination: test-topic  
    content-type: application/json

这里的 output 配置与 Source.OUTPUT 直接相关。通过这个配置,Spring Cloud Stream 会将 Source.OUTPUT 这个输出通道的 消息目标(即 destination)设置为 test-topic,并且设置消息的 内容类型application/json。这意味着,消息将通过名为 output 的通道发送到 test-topic 主题,并且消息格式是 JSON。

总结下, 为了保证代码松耦合, Spring 将消息发送和驱动(也就是连接不同品牌的MQ)分离开, 将驱动交由第三方提供. spring只需要调用Source发送消息即可