RocketMQ原理

2025/8/4 MQ
  • 基本要素
  1. 消息(Message)

    • 消息是 RocketMQ 中最小的数据传输单元。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到代理服务器。然后,代理服务器根据相关语义将消息投递给消费者。每条消息必须属于一个主题,且只能属于一个主题。

    • 特点:

      • 不可变性:消息是已生成的事件。消息生成后,其内容不会更改。即使消息通过传输通道,其内容也保持不变。消费者获取的消息是只读消息。

      • 持久性:默认情况下,收到的消息存储在 RocketMQ 代理服务器的存储文件中,以确保在系统发生故障时可以跟踪和恢复消息。

    • 类型:

      • 普通消息:具备基础功能的消息。通常应用于微服务解耦、数据集成以及事件驱动等场景。

      • 顺序消息:顺序消息按照消息的发送顺序传递给消费者。通常应用于使用状态同步来保持强一致性的业务场景。

      • 延时消息:允许消费者在消息发送到服务器后,仅在指定时间段或指定时间点后才消费消息。通常应用于分布式定时调度和任务超时处理等场景。

      • 事务消息:事务消息确保消息生产与本地事务之间的最终一致性。通常应用于分布式事务,来解决核心业务和下游业务之间执行结果的一致性。

    • 约束:

      • 消息最大大小:4MB。作业帮线上 RocketMQ 集群默认消息体大小限制为32KB。
  2. 主题(Topic)

    • Topic 是消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。它是消息分类的核心单元,每个主题包含若干条消息,每条消息只能属于一个主题。

    • 作用:

      • 数据分类隔离:将不同业务类型的消息拆分到不同的 Topic 中,实现存储和订阅的隔离性

      • 身份与权限管理:对特定类别的消息执行身份和权限管理

  3. 生产者(Producer)

    • 负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。

    • 作用:

      • 消息封装:将业务数据转换格式,包含主题(Topic)、标签(Tag)、消息体(Body)等属性

      • 消息发送:通过网络将消息发送到 RocketMQ 集群中的 Broker

      • 负载均衡:自动选择 Broker 和 队列,实现消息的均匀分布

      • 可靠性保障:支持同步发送、异步发送、事务消息等机制,确保消息不丢失

  4. 消息代理者(Broker)

    • 在 RocketMQ 中,Broker 是核心组件之一,负责消息的存储、转发、高可用性保障以及消息系统的整体协调。它是消息从生产者到消费者的“中间枢纽”,直接决定了系统的吞吐量、可靠性和扩展性。

    • 作用:

      • 消息存储

      • 消息转发与调度

      • 高可用性(HA)机制

      • 消费进度管理

      • 服务注册与发现

  5. 消费者(Consumer)

    • 负责从消息队列中获取并处理消息的组件,是消息驱动架构的核心部分。

    • 类型:

      • 推送消费者:全托管的消费者类型,用户只需注册消息监听器即可

      • 简单消费者:消费者主动拉取消息,但消费状态由服务端托管

      • 拉取消费者:业务方完全自主管理消息拉取和消费逻辑

    • 核心流程:

      • 消息获取

      • 消息处理

      • 消费状态提交

  6. 生产者组(Producer Group)

    • 生产者组是一个逻辑概念,用于将一组生产者实例组织在一起,共同负责发送相同类型的消息。

    • 作用:

      • 事务消息支持:在事务消息场景中,如果某个生产者实例崩溃,组内其他实例可以继续处理未完成的事务

      • 故障转移:当某个生产者实例宕机时,组内其他实例可以接管其任务,确保消息发送的连续性

      • 负载均衡:生产者组内的多个实例可以协作发送消息到不同的 Broker,实现消息发送的负载均衡

  7. 消费者组(Consumer Group)

    • 消费者组是消费者的逻辑分组,用于管理多个消费者实例。属于同一组的消费者可以共同消费同一个主题(Topic)中的消息,且消费行为一致。

    • 作用:

      • 负载均衡:同一组内的消费者实例会协同消费消息,避免消息重复处理

      • 容错:当某个消费者实例故障时,消息会由组内其他实例接管

    • 模式:

      • 集群消费(Clustering):每条消息仅被组内的一个消费者实例消费

      • 广播消费(Broadcasting):每条消息会被组内所有消费者实例消费

  8. 消息队列(Message Queue)

    • 消息队列是消息存储和传输的实际容器,是消息的最小存储和调度单元,所有消息资源以Topic为逻辑分类,但实际操作是面向Queue的。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(队列)。每个队列通过 QueueId 唯一标识,队列与 Broker 绑定,分布在不同的 Broker 节点上以实现负载均衡。

    • 作用:

      • 有序存储:消息以入队时的相同顺序写入存储。最早的消息在队列的开头,最新的消息在队列的末尾。消费位点用于标记消息在队列中的位置和顺序。

      • 流式操作语义:基于队列的存储允许消费者从一个位点读取一条或多条消息。这有助于实现聚合读取和回溯读取等功能。

      • 负载均衡与横向扩展:生产消息发送到不同队列实现消息的均匀分布,消费者负载均衡消费不同队列,确保消息高并发处理,通过增加队列数量来横向扩展系统的吞吐能力。

  9. 消息标签(Tag)

    • 为消息设置的标识符,用于同一Topic下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子topic的不同消费逻辑,实现更好的扩展性。

    • 作用:

      • 消息过滤:消费者可以根据 Tag 订阅特定类型的消息,避免全量消费所有消息

      • 业务逻辑解耦:通过 Tag 将不同业务流程的消息分离,降低系统耦合度

      • 动态路由:生产者可以通过 Tag 将消息路由到不同的消费者组,实现灵活的业务路由

  10. 服务发现(NameServer)

  • NameServer 是一个轻量级的路由注册中心,负责管理 Broker 的注册、Topic路由信息的存储与更新,以及为生产者和消费者提供路由查询服务。它是 RocketMQ 架构中实现高可用性和动态扩展的核心组件之一。

  • 作用:

    • Broker 注册与发现

    • Topic路由信息管理

    • 容错与高可用

  • 核心数据结构:

    • topicQueueTable:存储 Topic 的路由信息

    • brokerAddrTable:存储 Broker 的地址和主从信息

    • clusterAddrTable:存储集群名称与 Broker 名称的映射关系

    • brokerLiveTable:记录 Broker 的存活状态和心跳时间戳

  1. 代理(Proxy)
  • 为客户端(生产者/消费者)提供一个接口代理层,转发请求至 NameServer。
  1. SDK
  • 开源SDK:

    • 自 RocketMQ 项目启动以来,已演进到当前版本 5.x。目前,RocketMQ 主要支持基于底层通信协议差异的两个系列客户端 SDK,即 Remoting 协议和 gRPC 协议。
对比项 Remoting SDK gRPC SDK
多语言支持 Java/Go Java/C/C++/.NET/Go/Rust
特性与接口 生产者 PushConsumer
兼容版本 支持 4.x 和 5.x 服务端 仅支持服务端 5.0 版本
演进方向 Remoting 协议主要用于服务端内部组件的通信演进 gRPC 协议是首选的轻量级多语言客户端,后续推广将逐步完善所有能力
  • golib集成

  • rocketmq-sidecar:以sidecar的形式,基于uds以http协议,为odp 等非goweb标准框架搭建的服务,提供生产、消费RocketMQ消息的能力。

  • 领域模型

图片

  1. 通信模型

    • 同步发送(Synchronous):

      • Producer 等待 Broker 返回确认结果(ACK)。

      • 适用场景:金融交易、支付确认等要求强一致性的场景。

    • 异步发送(Asynchronous):

      • Producer 发送消息后立即返回,通过回调处理发送结果。

      • 适用场景:高并发任务分发(如订单创建)。

    • 单向发送(Oneway):

      • Producer 发送消息后不等待响应,仅确保消息到达网络层。

      • 适用场景:日志收集、监控数据上报(允许少量丢弃)。

  2. 存储模型

    • 存储流程:

      • 消息写入 CommitLog(物理存储):

        • Producer 发送消息到 Broker。

        • Broker 将消息按顺序追加写入 CommitLog 文件。

          • 顺序写入:消息按到达顺序追加写入,避免随机写入的性能损耗。

          • 文件大小:默认每个 CommitLog 文件大小为 1GB(可通过配置调整),文件名以起始偏移量命名(如 00000000000000000000 表示起始偏移量为 0)。

          • 消息格式:每条消息包含固定头(如魔数、长度、CRC 校验、Topic、Tag 等)和消息体,总长度固定(通常为 19 字段)。

        • 写入完成后,根据配置选择 同步刷盘 或 异步刷盘 将数据持久化到磁盘。

      • 构建 ConsumeQueue(逻辑消费队列) 和 IndexFile(消息索引):

        • Broker 后台线程(ReputMessageService)异步解析 CommitLog,生成:

          • ConsumeQueue 条目:为每个 (Topic, MessageQueue) 分组创建逻辑索引,用于加速消息的查找和消费。

            • 每个 ConsumeQueue 文件对应一个 (Topic, MessageQueue) 组合,默认大小为 48MB。

            • 每个条目(20 字节)包含:

              • CommitLog 物理偏移量(8 字节):消息在 CommitLog 中的起始位置。

              • 消息大小(4 字节):消息体长度。

              • Tag 哈希值(8 字节):用于过滤消息。

          • IndexFile 条目:若消息设置了 Key,则生成索引条目。主要用于运维排查和审计。

            • 哈希索引结构:使用哈希槽(Hash Slot)链表解决冲突,每个索引条目(20 字节)包含:

              • 消息 Key 的哈希值。

              • CommitLog 物理偏移量。

              • 时间戳。

              • 链表指针(前一个相同哈希值的索引下标)。

            • 最大索引数:默认最多 2000 万条(可配置)。

            • 文件命名:index_{timestamp}。

    • 刷盘机制:

刷盘类型 原理 优点 缺点
同步刷盘 消息写入内存后立即刷盘,等待磁盘 I/O 完成再返回成功。 数据可靠性高,断电不丢失。 性能较低,吞吐量受限。
异步刷盘 消息写入内存后立即返回成功,由后台线程定期批量刷盘。 性能高,吞吐量大。 存在数据丢失风险(极端情况下)。
  • 高可用与数据复制

    • RocketMQ 通过 主从架构 实现高可用性:

    • 主从复制:

      • Master 节点负责消息的写入和读取,Slave 节点通过同步或异步复制数据。

      • 同步复制:Master 等待 Slave 确认数据写入后才返回成功(强一致性,但性能较低)。

      • 异步复制:Master 立即返回成功,Slave 异步拉取数据(高性能,但可能存在数据延迟)。

    • 故障转移:

      • 当 Master 节点故障时,Slave 可自动接管服务,确保消息不丢失。
  • 消息清理

    • RocketMQ 的消息清理基于 存储时间 而非消费状态:

    • 清理策略:

      • 消息保留时间默认为 48 小时(可通过 fileReservedTime 配置调整)。

      • 超过保留时间的 CommitLog 文件会被删除。

      • ConsumeQueue 和 IndexFile 会同步清理。

  • 系统架构

    • 外部连接

      • 客户端连接:

        • golib集成了rmq-clinet-go:标准的SDK模式,用户只需要提供对应的配置即可连接使用rmq。

        • sidecar:

        • nmq:?

      • 服务接入:

        • proxy:
功能 说明
协议转换 支持多协议接入(gRPC/HTTP/Kafka兼容协议)
负载均衡 将客户端请求分发到不同Broker节点
客户端解耦 避免客户端直连Broker,提升架构灵活性
跨云调度 实现多云环境下的消息路由(需特殊配置)
     * 直连ns:一般是比较简单的环境下使用。

  * 客户端模型:

     * 生产:生产者和 Topic 具有 N 对 N 的关系。一个生产者可以将消息发送到多个 Topic,一个 Topic 可以接收来自多个生产者的消息。这种多对多关系有助于性能扩展和灾难恢复。

图片 * 消费:消费者和 Topic 具有 N 对 N 的关系。一个消费者可以消费多个 Topic的消息,一个 Topic 可以被多个消费者消费。

图片

  • 内部协同

图片 * 协同:

     * NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

     * Proxy 提供一个 接口代理层,转发请求至 Broker。便于客户端接入、统一鉴权和监控,同时屏蔽底层 RocketMQ 协议细节。适合异构系统接入 RocketMQ。 

     * Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master, Master与Slave 的对应关系通过指定相同的BrokerName,不同的Brokerld 来定义,Brokerld为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有Brokerld=1的从服务器才会参与消息的读负载。

     * Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

     * Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取消费位点与最大消费位点的距离(判断是否读老消息,产生读|/0),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

     * Consumer通过发送HEART_BEAT请求给Broker,将自己添加到Broker中维护的某个消费者组中。需要注意的是,每个Consumer都会向所有的Broker进行心跳,因此每个Broker都维护了所有消费者的信息。

     * Exporter:用于将 RocketMQ 的监控数据导出为 Prometheus 可采集的格式。 主要用于集群监控,采集如 Topic、Broker、消费者等的运行状态和性能指标。 

     * Console:官方提供的可视化管理控制台,用于管理 Topic、消费者、Broker,查看消息堆积、消费进度、运行状态等信息。适合日常运维使用。

  * 交互
视角 交互方 交互方式 交互内容 交互作用
生产者 NameServer TCP 长连接 定时请求(30s) 拉取 Topic 路由信息
Broker TCP 短连接 同步/异步发送 消息内容+属性(Tag/Key等)
Proxy(5.x) gRPC/HTTP 单次请求 封装后的消息请求
Proxy(自研) gRPC 拉取 Topic 路由信息 获取 Broker 地址和队列分布
消费者 NameServer TCP 长连接 定时请求(30s) 拉取 Topic 路由信息
Broker TCP 长轮询 Push/Pull 模式 拉取消息请求+消费位点提交
Proxy(5.x) gRPC Stream 长连接 消息流+位点提交
Proxy(自研) gRPC 拉取 Topic 路由信息 获取 Broker 地址和队列分布
Broker NameServer TCP 长连接 定时注册(30s) Broker ID/Topic 配置/队列数据
Producer TCP 短连接 写入消息请求 接收并存储消息
Consumer TCP 长连接 拉取请求+位点提交 投递消息和管理消费状态
其他Broker Netty 异步复制 CommitLog 数据块 主从数据同步 (HA)
Console HTTP RESTful 配置查询/消息追踪 提供管理接口
Exporter JMX 采集 运行指标(JVM/IO/队列深度) 暴露监控数据
NameServer Broker TCP 长连接 心跳机制(30s) 路由注册信息
Producer TCP 短连接 路由查询请求 提供 Broker 地址信息
Consumer TCP 短连接 路由查询请求 提供消费队列分布
Console HTTP RESTful 路由数据查询 支持管理操作
Proxy (5.x) 生产者/消费者 gRPC/HTTP/AMQP 协议转换后的消息 统一接入和多协议支持
NameServer TCP 长连接 路由拉取请求 获取最新 Broker 地址
Broker Remoting 协议 原生 RocketMQ 协议包 透传消息到底层 Broker
Console HTTP RESTful 连接数/流量统计 提供运维监控数据
Console NameServer HTTP RESTful 路由数据查询 展示集群拓扑
Broker HTTP RESTful 配置管理/消息查询 运维操作执行
Exporter Prometheus API 监控指标拉取 数据可视化展示
用户 WebSocket/HTTP 控制台操作指令 提供管理界面
Exporter Broker JMX 采集/HTTP API 运行时指标收集 抓取监控数据
Proxy HTTP Metrics Endpoint 连接数/请求延迟 收集代理层指标
Prometheus Pull 模型 格式化指标数据 供监控系统存储
Grafana 数据源查询 SQL 格式指标查询 可视化展示

交互全景图:

图片 * 集群工作流程:

     * 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

     * Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

     * 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

     * Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

     * Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
  • 功能特性
  1. 生产消息

    • 功能描述:生产者(Producer)将消息发送到 RocketMQ 的 Broker 集群。消息需要指定 Topic 和 Tag,并携带业务数据(消息体)。

    • 核心流程:

      • 消息构建:设置 Topic、Tag、Key、Body 等属性。

      • 消息发送:支持四种模式:

        • 同步发送(Synchronous):等待 Broker 响应,确保消息发送成功(适用于金融交易等高可靠性场景)。

        • 异步发送(Asynchronous):异步回调处理结果,提高吞吐量(适用于订单创建等场景)。

        • 单向发送(One-way):仅发送消息,不等待响应(适用于日志收集等场景)。

        • 批量发送(Batch):一次发送多条消息,减少网络开销,提升吞吐量。需注意消息总大小限制(默认不超过 4MB)。

      • 消息路由:Broker 根据负载均衡策略(如轮询或哈希)将消息分配到具体的队列(MessageQueue)。

    • 关键参数:

      • Message:消息体,包含业务数据。

      • Topic:消息分类标识,决定消息的存储和消费路径。

      • Tag:消息标签,用于过滤和分类。

      • Key:消息唯一标识,用于后续查询。

    • 具体过程:

      • Producer发消息之前,会先向 NameServer 发出获取消息Topic的路由信息请求,并将信息缓存,定时向NameServer更新

      • NameServer 返回该 Topic 的路由表Broker列表

        • 路由表:是一个map;key是topic名称,value是一个queuedatas实例列表,queuedatas包含brokername的属性信息

        • 图片

        • Broker列表:也是一个map,key是brokername,value是一个brokerdatas,brokerdatas是一套名称是brokername的brokeraddrs list,包含master brokeraddrs,slave Brokeraddrs等信息。其实brokeraddrs也是一个map,key是brokerid,value是ip:port

        • 图片

      • Producer 根据代码中指定的Queue选择策略,从Queue列表里面选出一个

      • Producer 对消息做一些特殊处理,如大于4M的数据进行压缩等

      • Producer 向选择出的 Queue 所在 Borker 发出 rpc 请求,将消息发送到选择出的 Queue

      • Broker收到一条消息后,需要向生产者返回一个 ACK 响应,并将消息存储起来

      • Producer 收到 ACK,一条消息生产流程结束

    • 负载均衡

      • Broker选择:

        • 从 NameServer 获取路由信息:生产者会定期从 NameServer 查询目标 Topic 的路由信息,包括所有可用的 Broker 列表及其对应的 Message Queue 分布。

        • 动态调整 Broker 列表:如果某个 Broker 不可用(如网络故障或宕机),生产者会自动从路由信息中剔除该 Broker,并重新选择其他可用的 Broker。

      • Queue选择:

        • 默认轮询策略(Round-robin):生产者会按顺序依次选择队列。适合普通消息发送。

        • 基于 Sharding Key 的哈希策略:如果消息设置了 Sharding Key(如订单ID、用户ID等),生产者会通过哈希算法(如一致性哈希或简单哈希)计算队列索引。适合顺序消息或需要保证相同业务逻辑的消息被发送到同一队列的场景。

        • 自定义策略(SendSelector):可以通过实现 SendSelector 接口自定义队列选择逻辑。适合特殊业务需求(如地域就近分配、特定队列优先级等)。

      • 动态调整机制

        • 定期更新路由信息:生产者每隔一定时间(默认 30 秒)从 NameServer 获取最新的路由信息,确保队列列表和 Broker 状态的准确性。

        • 故障转移(Failover):如果当前选择的 Broker 不可用,生产者会尝试发送到下一个可用的 Broker 和队列。例如:初始选择 BrokerA.Queue0,若发送失败,则尝试 BrokerB.Queue1。

        • 队列权重分配:在某些版本中,RocketMQ 支持为不同队列设置权重(如 MessageQueue 的 setWeight 方法),生产者会根据权重比例分配消息。

    • 最佳实践:

      • 设置合理的生产者组名:

        • 通过 setProducerGroup("group_name") 明确标识业务逻辑。
      • 合理使用同步/异步发送:

        • 关键业务使用同步发送,非关键业务使用异步或单向发送。
      • 处理发送失败和重试:

        • 配置重试次数(setRetryTimesWhenSendFailed(3))并实现降级策略(如写入本地日志)。
      • 监控和日志:

        • 记录消息发送结果(SendResult.getMsgId())和异常信息,便于排查问题。
      • 避免消息堆积:

        • 监控 Broker 的队列积压情况,及时调整生产速率或增加消费者实例。
  2. 消费消息

    • 功能描述:消费者(Consumer)从 Broker 拉取消息并处理。RocketMQ 提供了灵活且高效的消息消费机制,支持多种消费模式和策略,以满足不同业务场景的需求。

    • 核心流程:

      • 消息获取:

        • PushConsumer:Broker 主动推送消息到消费者。

        • PullConsumer/SimpleConsumer:消费者主动向 Broker 请求消息。

      • 消息处理:

        • 消费者根据业务逻辑处理消息(如解析、存储、转发)。
      • 消费状态提交:

        • PushConsumer:通过返回 CONSUME_SUCCESS 或 RECONSUME_LATER 提交消费状态。

        • SimpleConsumer/PullConsumer:需显式调用 updateConsumeOffset 提交 Offset。

    • 关键参数:

      • 消费位点(Offset):用于记录消费者消费到的消息位置,便于在消费者重启或故障恢复后从正确的位置继续消费。

        • 管理方式:

          • 自动提交:消费者在消息处理完成后自动提交 Offset。

          • 手动提交:开发者显式控制 Offset 提交时机(更可靠)。

        • 存储方式:

          • Broker 存储:默认存储在 Broker 的内存中,重启后需重新拉取。

          • 本地文件存储:通过 offsetStore 配置持久化到本地文件。

    • 支持两种消费模式:

      • Push 模式:Broker 主动推送消息给 Consumer(默认模式)。

      • Pull 模式:Consumer 主动向 Broker 拉取消息(需手动管理消费位点)。

    • 消费类型:

      • 集群消费(竞争消费):同一消费组内多个 Consumer 共享消息,每条消息仅被一个 Consumer 处理(适合高并发场景)。

      • 广播消费:同一消费组内所有 Consumer 都消费每条消息(适合通知类场景)。

    • 最佳实践:

      • 合理选择消费者类型:

        • PushConsumer:适用于大多数业务场景。

        • SimpleConsumer/PullConsumer:适用于需要高度自定义的场景。

      • 避免 ConsumerGroup 混用:

        • 禁止在同一个 ConsumerGroup 中混用 PullConsumer 和其他类型,否则会导致消息消费异常。
      • 监控与告警:

        • 使用 RocketMQ 自带的监控工具(如 RocketMQ Dashboard)或第三方工具(如 Prometheus + Grafana)监控消费进度、积压量等指标。
      • 消费幂等性:

        * 在业务逻辑中实现幂等性(如基于消息 ID 或业务 Key 去重),防止重复消费。
        
  3. 查询消息

    • 功能描述:通过 Topic、Message ID、Message Key 或 Message Tag 查询特定消息,用于排查问题或分析消息轨迹。

    • 查询方式:

      • 按 Topic 查询:查询某时间段内指定 Topic 的所有消息(范围查询)。

      • 按 Message ID 查询:精确查找某条消息(快速定位)。

      • 按 Message Key/Tag 查询:模糊查询符合条件的消息(需提前设置 Key/Tag)。

    • 消息清理机制:

      • 默认保留 48 小时,最长可达 720 小时。

      • Broker 每天凌晨 4 点清理过期的 CommitLog 文件(磁盘使用率超过阈值时也会触发)。

  4. 重平衡

    • 功能描述:重平衡(Reblance),指将一个Topic下的多个Queue在同一个Consumer Group中的多个Consumer间进行重新分配,确保负载均衡的过程。只有在集群消费模式下才会有Reblance概念,广播消费是每个Consumer实例消费全部Queue。注意:重平衡过程完全在消费者客户端进行,由消费者组内的每个消费者实例独立执行。

    • 核心流程:

      • 获取重平衡所需数据

        • 消息队列列表(MQSet):从本地缓存的 topicSubscribeInfoTable 中获取 Topic 对应的所有队列(来源自 NameServer 的路由信息)。

        • 消费者列表(CIDAll):通过 MQClientInstance.findConsumerIdList() 从 Broker 获取当前消费者组的所有活跃消费者 ID。

      • 执行队列分配策略

        • 调用 AllocateMessageQueueStrategy 接口(如默认的 AllocateMessageQueueAveragely),根据当前消费者 ID、队列列表和消费者列表计算分配结果。

        • 分配原则:一个队列仅分配给一个消费者,但一个消费者可分配多个队列。

      • 更新本地处理队列(ProcessQueueTable)

        • 释放队列:若某队列不再分配给当前消费者,将其从 ProcessQueueTable 移除,并提交消费位移。

        • 新增队列:为新分配的队列创建 ProcessQueue 对象,生成 PullRequest 请求并提交给 PullMessageService 拉取消息。

      • 同步状态到 Broker

        • 消费者将新的分配结果和消费位移通过心跳包上报给 Broker,由 Broker 更新消费组元数据。
    • 触发原因:

      • 导致Rebalance产生的原因有两个:消费者所订阅的Topic的Queue数量发生变化、消费者组中的消费者实例数量发生变化,导致二者发生变化的典型场景如下所示:

        • Queue数量发生变化场景:

          • Broker扩缩容

          • Broker升级运维

          • Broker与NameServer之间网络异常

          • Queue扩缩容

        • 消费实例数量发生变化场景:

          • Consumer Group扩容或缩容(Consumer实例增加或者减少)

          • Consumer升级运维

          • Consumer与Broker之间网络异常

    • 触发时机:

      • 消费者启动/停止:新消费者加入或退出消费者组时。

      • 定时触发:默认每 20 秒由 RebalanceService 线程自动执行一次。

      • Broker 通知:当 Broker 检测到消费者组变化(如心跳包中的消费者列表更新)时,向组内所有消费者发送 NOTIFY_CONSUMER_IDS_CHANGED 指令,触发立即重平衡。

      • Topic 路由变更:Topic 的队列数量增减或 Broker 扩容/缩容

    • 触发问题:

      • 消费暂停:在只有一个Consumer时,其负责消费所有队列,在新增一个Consumer后会触发Rebalance的发生。此时原Consumer需要暂停所有队列的消费,等到这些队列分配给新的Consumer后,消费队列才能继续被消费。

      • 消费重复:Consumer在消费新分配给自己的队列时,必须接着之前Consumer提交的消息进度的offset继续消费,默认情况下,offset是异步提交的,这个异步性会导致提交到Broker的offset与Consumer实际消费的消息并不一致,这不一致差值就是会重复消费的消息。

      • 消费突刺:由于Rebalance可能导致重复消费,如果需要重复消费的消息过多,或因为Rebalance暂停时间长从而导致积压了部分消息。那就有可能会导致在Rebalance结束后瞬间需要消费很多消息。

    • 关键过程:

      • 在 RocketMQ 中,重平衡的触发是近乎同时的,但执行过程是分散且异步的,消费者实例之间不会协调执行时序。

        • 触发:近似同时(Broker 广播通知 + 客户端定时任务同步)。

        • 执行:完全分散(各消费者独立拉取状态、计算分配、释放/申请队列)

      • RocketMQ 通过分配策略的确定性 + Broker 仲裁机制 + 客户端状态验证三重保障,确保不会出现多个消费者同时消费同一个消息队列的情况(最终一致性)。

        • 分配策略的确定性(核心基础)

          • 所有消费者使用完全相同的输入参数和完全相同的算法计算分配结果:

            • 消费者列表 CIDAll 按字典序排序(如 [ConsumerA, ConsumerB, ConsumerC])

            • 消息队列 MQSet 按 Broker 名称+队列ID 排序

            • 分配算法无状态且确定:相同输入必然得到相同输出

          • 这意味着所有消费者对“谁该消费哪个队列”的结论完全一致

        • Broker 的仲裁机制(最终防线)

          • 当客户端计算出现偏差时(如网络延迟导致状态不一致),Broker 通过心跳上报进行仲裁:
        • 图片

        • 客户端状态验证(实时拦截)

          • 消费者在拉取消息前进行本地校验队列所有权:
      • 算法

        • 平均分配(AllocateMessageQueueAveragely)

        • 环形平均分配(AllocateMessageQueueAveragelyByCircle)

        • 一致性哈希分配(AllocateMessageQueueConsistentHash)

        • 机房亲和分配(AllocateMachineRoomNearby)

        • 手动配置分配(AllocateMessageQueueByConfig)

        • 按 Broker 机房分配(AllocateMessageQueueByMachineRoom)

    • 最佳实践:

      • 减少重平衡频率

      • 避免敏感时段触发

        • 避免在流量高峰时扩容/缩容消费者。

        • 使用优雅下线:消费者停机前主动通知 Broker,加速组状态更新。

      • 监控重平衡风暴

  5. 消费积压

    • 描述:

      • 消息消费流程中,如果Consumer的消费速度跟不上Producer的发送速度,MQ中未处理的消息会越来越多(进多出少),这部分消息就被称为消息积压,消息出现积压进而会造成消息的消费延迟。以下场景需要重点关注消息堆积和消费延迟问题:

        • 业务系统上下游能力不匹配造成持续堆积,且无法自行恢复

        • 业务系统对消息的消费实时性要求高,即使是短暂的堆积造成的消费延迟也无法接受

    • 产生原因分析

图片 Consumer使用长轮询Pull模式消费消息时,分为以下两个阶段:

  * 拉取消息

     * Consumer通过长轮询Pull模式批量拉取的方式从服务端获取消息,将拉取的消息缓存到本地缓冲队列中,对于拉取模式消费,在内网环境下会有很高的吞吐量,一般这个过程不会成为消息堆积的瓶颈。

     * 一个单线程单队列的低规格主机(Consumer实例4G8C规格),其可达几万TPS(吞吐量),如果是多队列多线程,则可以轻易到达十几万TPS

  * 消费消息

     * Consumer将本地缓冲区的消息提交到消费线程中,使用业务消费逻辑对消息进行处理,处理完毕后获取到一个结果,这是真正的消息消费过程。此时Consumer的消费能力完全依赖于消费耗时和消费并发度。如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,此时就会导致Consumer本地缓冲队列到达上限,停止从服务端拉取消息。

  * 结论

     * 消息堆积的主要瓶颈在于Consumer实例的消费能力,而Consumer实例的消费能力由消费耗时和消费并发度决定的。注意:消费耗时的优先级高于消费并发度,即:在保证消费耗时合理性的前提下,再考虑消费并发度的问题。
  • 消费耗时

    • 影响消息消费耗时的主要因素是代码逻辑。而代码逻辑中可能会影响处理时长的代码主要有两种类型:Cpu内部计算型代码和外部I/O操作型代码。

    • 通常情况下代码如果没有复杂的递归和循环逻辑,内部计算耗时相对外部I/O操作来说几乎可以忽略。所以外部I/O型代码是影响消息处理时长的主要症结所在。

    • 外部I/O型操作代码举例:

      • 读写外部数据库,如对远程的MySQL访问

      • 读写外部缓存系统,如对远程的Redis访问

      • 下游系统调用,如PRC远程调用

    • 关于下游系统调用逻辑需要进行提前梳理,掌握每个调用操作预期的耗时,这样做能有效判断消费逻辑中操作耗时是否合理。通常消息堆积是由于下游系统出现服务异常或者数据库容量超限。

  • 消费并发度

    • 一般情况下,消费者端的消费并发度由单节点线程数和节点数量共同决定,其值为单节点线程数 * 节点数量。不过,通常需要优先调整单节点的线程数,如果单机硬件资源达到了上限,则需要通过横向扩展来提高并发度。

    • 单节点线程数:单个Consumer实例所包含的线程数量

    • 节点数:即Consumer Group所包含的Consumer数量

    • 对于普通消息、延时消息及事务消息,并发度计算都是单节点线程数 * 节点数量。对于顺序消息,消息并发度等于 Topic的Queue队列数量。

  • 如何避免

    • 为了避免在业务使用时出现非预期的消息堆积和消费延迟问题,需要在前期涉及阶段对整个业务逻辑进行完善的排查和梳理。其中最重要的就是梳理消息的消费耗时和设置消息的消费并发度。

      • 梳理消息的消费耗时

        • 通过压测获取消息的消费耗时,并对耗时高的操作代码逻辑进行分析。梳理消息的消费耗时需要关注以下几点:

        • 消息消费逻辑的计算复杂度是否搞过,代码是否存在无限循环和递归等缺陷

        • 消息消费逻辑中I/O操作是否必须,能否用本地缓存等方案规避

        • 消费逻辑中的复杂耗时的操作时候可以异步化处理,如果可以,是否会有逻辑错乱隐患

      • 设置消费并发度

        • 逐步调大单个Consumer节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量

        • 根据上下游流量峰值极端出需要设置的节点数:节点数 = 流量峰值/单个节点消息吞吐量

  1. 死信消费

    • 描述:消息经过最大重试次数消费仍失败后,进入 死信队列(DLQ),需人工干预处理。

    • 核心概念:

      • 死信消息(Dead Letter Message)当消息达到最大重试次数后仍消费失败时,RocketMQ 将其标记为死信消息,自动转移到特殊队列

      • 死信队列(Dead Letter Queue)

        • 命名规则:%DLQ% + 消费者组名(如 %DLQ%OrderConsumerGroup)

        • 特性:

          • 独立于原始 Topic 的存储队列

          • 保留原始消息所有属性(包括业务键/Tag/生成时间)

          • 消息永不过期(需手动清理)

    • 死信消息生命周期

图片

  • 死信处理方案
方案 适用场景 实现方式
人工修复 需业务介入的复杂场景 1. 查询死信
自动重路由 临时依赖故障(如DB抖动) 将消息重新发送到原始Topic(需重置retryCount)
降级处理 非核心业务消息 记录错误日志后直接确认
归档分析 需要根因分析的场景 转存到Hive/ES进行分析
  • 本质

    • 自动转移:超阈值失败消息进入隔离区

    • 完整追溯:保留原始消息及所有重试记录

    • 多处理模式:支持人工/自动/归档等处理方式

    • 生态整合:与控制台/监控系统深度集成

  • 最佳实践

    • 死信预防措施

      • 消费幂等性:确保16次重试不会导致数据不一致

      • 超时控制:避免单条消息消费阻塞

      • 死信阈值告警:堆积量 > 1000 或 增长速率 > 50条/分钟时触发

    • 死信队列治理策略

维度 策略
存储周期 保留7天(配置fileReservedTime=168)
自动清理 每天凌晨扫描,对处理成功的死信打标记后删除
容量控制 单死信队列超过10万条时触发告警并自动扩容
敏感信息 在死信消费者中自动脱敏
  1. 延时/定时消息

    • 描述:定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

    • 消息生命周期:

    • 图片

      • 初始化:消息由生产者构建并初始化,准备发送到服务器。

      • 定时中:消息发送到服务端,在指定投递时间之前,消息存储在基于时间的存储系统中。不会立即为消息创建索引。

      • 就绪:在指定时间,消息被写入常规存储引擎,消息对消费者可见,并等待消费者消费。

      • 处理中:消息被消费者获取,并根据消费者的本地业务逻辑进行处理。

        • 在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内没有收到消费者的响应,RocketMQ 将重试该消息。
      • 确认:消费者完成消费并将消费结果提交给 Broker。Broker 标记当前消息是否成功消费。

        • 默认情况下,RocketMQ 会保留所有消息。当提交消费结果时,消息数据被逻辑标记为已消费,而不是立即删除。因此,在消息因保留期过期或存储空间不足而被删除之前,消费者可以回溯消息以重新消费。
      • 删除:当消息的保留期过期或存储空间不足时,RocketMQ 会以滚动方式从物理文件中删除最早保存的消息。

    • 实现原理:消息属性与消费逻辑的解耦设计,在 RocketMQ 4.x 中,Topic 仅作为消息的逻辑分类容器,不绑定特定类型。

      • 消息发送:设置 delayTimeLevel 属性

      • Broker 处理流程:

        • 接收消息后,将消息的 Topic 属性临时改为 SCHEDULE_TOPIC_XXXX ;

        • 根据延迟级别存入对应队列(如级别 3 存入队列 2);

        • 定时任务扫描到期后,恢复原始 Topic 并重投递至真实队列。

      • 关键点:

        • 延时行为通过消息属性+Broker 内部 Topic 路由实现,与原始 Topic 无关
  2. 事务消息

    • 描述:事务消息是 RocketMQ 提供的一种高级消息类型,旨在确保消息生产与本地事务之间的最终一致性。

    • 核心原理:两阶段提交

      • 1)发送方向RocketMQ发送“待确认”消息。

      • 2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。

      • 3)发送方开始执行本地事件逻辑。

      • 4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。

      • 5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。

      • 6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。

      • 7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。

    • 生命周期:

图片 * 初始化:消息由生产者构建和初始化,并准备好发送到 Broker。

  * 事务挂起:半消息被发送到 Broker。但是,它不会立即写入磁盘进行永久存储。相反,它存储在事务存储系统中。直到系统验证本地事务的第二阶段成功后,消息才会被提交。在此期间,消息对下游消费者不可见。

  * 回滚:在第二阶段,如果事务的执行结果是回滚,Broker 会回滚半消息并终止工作流程。

  * 就绪:消息已发送到 Broker,并对消费者可见且可供消费。

  * 传输中:消息由消费者获取并根据消费者的本地业务逻辑进行处理。

  * 在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内未收到消费者的响应,RocketMQ 会重试该消息。

  * 已确认:消费者完成消费并将消费结果提交给 Broker。Broker 标记当前消息是否成功消费。

  * 默认情况下,RocketMQ 保留所有消息。当提交消费结果时,消息数据被逻辑地标记为已消费,而不是立即删除。因此,在消息因保留期过期或存储空间不足而被删除之前,消费者可以回溯消息以进行重新消费。

  * 已删除:当消息的保留期过期或存储空间不足时,RocketMQ 会以滚动方式从物理文件中删除最早保存的消息。
  • 使用限制:

    • 消息类型一致性

      • 事务消息只能用于 MessageTypeTransaction 的 Topic。
    • 以事务为中心的消费

      • RocketMQ 的事务消息功能保证了本地核心事务与下游分支之间可以处理相同的事务。但是,它不保证消息消费结果与上游执行结果之间的一致性。因此,下游业务必须确保消息得到正确处理。我们建议消费者正确进行消费重试,以确保在发生故障时消息得到正确处理。
    • 中间状态可见性

      • RocketMQ 的事务消息功能只保证最终一致性,这意味着在消息投递给消费者之前,上游事务和下游分支之间的状态一致性无法保证。因此,事务消息仅适用于接受异步执行的事务场景。
    • 事务超时机制

      • RocketMQ 为事务消息实现了超时机制。Broker 收到半消息后,如果在一定时间内无法确定是提交还是回滚事务,则 Broker 默认回滚该消息。
  1. 顺序消息

    • 描述:顺序消息按照消息的发送顺序传递给消费者

    • 核心条件:RocketMQ 4.x 的顺序消费实现是一个生产者、Broker、消费者三方协同的机制,通过队列内顺序性 + 消费者单线程处理保障消息顺序。

      • 生产者:同一业务键(如订单ID)的消息必须发送到同一个队列

      • Broker:保证队列内消息存储顺序与生产顺序一致

      • 消费者:单队列单线程消费,且禁止重平衡期间队列切换

    • 实现原理:

      • 生产者:顺序写入同一队列

        • 通过 MessageQueueSelector 将相同业务键的消息路由到固定队列:

        • 关键点:相同 orderId 的消息永远进入同一队列。

      • Broker:队列内严格保序

        • CommitLog 顺序写:所有消息按到达顺序追加到 CommitLog 文件。

        • ConsumeQueue 顺序读:每个队列维护独立的消费位点(offset),消息按写入顺序被消费。

      • 消费者:单队列串行处理,使用 MessageListenerOrderly 并启用队列锁机制。

        • 队列锁(LockQueue)

          • 目的:防止重平衡导致多个消费者同时消费同一队列。

          • 消费者在拉取队列前向 Broker 申请队列锁(通过心跳上报)。

          • Broker 确保一个队列同一时间只被一个消费者持有。

          • 锁有效期:默认 30 秒(通过心跳续期)。

        • 消费位点同步

          • 提交策略:消费成功后立即同步位点到 Broker(非批量)。

          • 异常处理:若消费失败:

            • 返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。

            • 不提交位点,稍后重试当前队列。

    • 生命周期:

图片 * 已初始化:消息由生产者构建和初始化,并准备发送到 Broker。

  * 就绪:消息已发送到 Broker,对消费者可见并可供消费。

  * 处理中:消息由消费者获取,并根据消费者的本地业务逻辑进行处理。

  * 在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内未收到消费者的响应, RocketMQ 将重试消息。

  * 已确认:消费者完成消费并向 Broker 提交消费结果。Broker 标记当前消息是否成功消费。

  * 默认情况下,RocketMQ 保留所有消息。当提交消费结果时,消息数据被逻辑标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期过期或存储空间不足而被删除之前回溯消息进行重新消费。

  * 已删除:当消息的保留期过期或存储空间不足时,RocketMQ 以滚动方式从物理文件中删除最早保存的消息。
  1. 重置消费位点
  • 功能描述:调整 Consumer 的消费起点,解决消息积压或回溯消费问题。

  • 操作方式:

    • API 调用:通过 ResetConsumedOffsets 接口设置消费位点(支持 EndOffset 或 Timestamp)。

    • 控制台操作:在 RocketMQ 控制台选择时间范围或消息 ID 进行重置。

  • 注意事项:

    • 重置位点后,Consumer 会从新位置开始消费,跳过旧消息。

    • 需严格控制权限,避免误操作导致数据丢失。

  1. 跳消息
  • 功能描述:通过重置消费位点,跳过部分消息(如积压消息或错误消息)。

  • 实现方式:

    • 设置 EndOffset:直接跳到最新消息。

    • 设置 Timestamp:从指定时间点开始消费。

  • 风险:

    • 可能导致业务数据不完整,需评估是否允许跳过历史消息。
  1. 重试队列
  • 描述:重试队列是RocketMQ为处理消息消费失败而设计的特殊队列,当消费者处理消息失败时,Broker会自动将消息转移到重试队列,并按预设策略延迟后重新投递。

  • 核心特性:

    • 自动创建:首次消费失败时动态创建(无需手动配置)

    • 命名规则:%RETRY% + 消费者组名(如 %RETRY%OrderConsumerGroup)

    • 延迟投递:渐进式延迟投递,支持18级延迟策略(1s~2h)

    • 最大重试次数:默认16次(可配置)

    • 最终状态转移:超过最大重试次数后转入死信队列

  • 工作原理:

图片

    只爱西经
    林一