type
status
date
slug
summary
tags
category
icon
password
😀
这里写文章的前言: RocketMQ 基础知识

📝 整理流程

notion image
 

📝 模型概念

notion image

📔主题(Topic)

主题是 Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息
 
  • 定义数据的分类隔离;在Apache RocketMQ的方案设计中,建议将不同业务的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性
  • 主题下存在 MessageQueue : 队列,才是topic实际的操作单元,一个topic可能有多个队列,方便扩容
 
在 Apache RocketMQ 架构中,主题属于顶层资源和容器,拥有独立的权限管理、可观测性指标采集和监控等能力,创建和管理主题会占用一定的系统资源。因此,生产环境需要严格管理主题资源,请勿随意进行增、删、改、查操作
 

📔队列(MessageQueue)

队列是RocketMQ实际存储和传输的实际容器,也是RocketMQ消息的最小存储单元。Apache的一个主题由多个队列组成,以实现队列数量的水平拆分和队列内部的流式存储,在向内发送消息,但实际消息发送到该主题下的某个队列里
 
存储顺序性
队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息;消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理(顺序消费可以通过定义hashkey指定某个队列,消费者再顺序设置成顺序消费)。
 
 
流式操作语义
RocketMQ基于队列的存储模型可确保消息从任意位点读取任意数量的消息,以实现类似聚合读取,回溯读取等特性;这些特性是RabbitMQ,ActiveMQ等不具备的
 
队列数量的设置应遵循少用够用原则,避免随意添加队列数量,造成下列问题
  • 集群元数据膨胀:RocketMQ会以队列的粒度采集指标和监控数据,队列过多容易造成管控元数据膨胀
  • 客户端压力过大:RocketMQ的消息读写都是针对队列进行操作,队列过多容易产生空轮询请求,增加系统负荷
 

📔消息(Message)

  • 消息由生产者初始化并发送到RocketMQ服务端
  • 消息按照达到RocketMQ服务端的顺序存储到队列中
  • 消费者按照订阅的关系从RocketMQ服务端中获取消息并消费
 
四种类型:
  • Normal: 普通消息,消息本身无特殊语义,消息之间也没有任何关联
  • FIFO:顺序消息,RocketMQ通过消息分组 MessageGroup 标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序
  • Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见
  • Transaction:RocketMq支持事务消息,支持应用数据库更新和消息的事务一致性保障
 

📔生产者(Producer)

生产者是RocketMQ系统中用来构建并传输消息服务端的运行实体
生产者通常被集成在业务系统中,将业务消息按照要求封装成RocketMQ的消息(Message)并发送到服务端
  • 发送方式:生产者可通过API接口设置消息发送的方式。Apache RocketMQ 支持同步传输和异步传输。
  • 批量发送:生产者可通过API接口设置消息批量传输的方式。例如,批量发送的消息条数或消息大小。
  • 事务行为:Apache RocketMQ 支持事务消息,对于事务消息需要生产者配合进行事务检查等行为保障事务的最终一致性
 

📔消费者分组(ConsumerGroup)

消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组
 
和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾
 
在消费者分组中,统一定义以下消费行为,同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息
 
  • 订阅关系:Apache RocketMQ 以消费者分组的粒度管理订阅关系,实现订阅关系的管理和追溯
  • 投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持顺序投递和并发投递,投递方式在消费者分组中统一配置
  • 消费重试策略: 消费者消费消息失败时的重试策略,包括重试次数、死信队列设置等
 
整体流程:
  1. 消息有生产者初始化并发送到RocketMQ服务端
  1. 消息按照到达RocketMQ服务端顺序存储到主题指定队列中
  1. 消费者按照指定的订阅关系从RocketMQ服务端中获取消息并消费
 

📔消费者(Consumer)

消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理
 

📔订阅关系(Subscription)

定义:消息过滤规则的类型。订阅关系中设置消息过滤规则后,系统将按照过滤规则匹配主题中的消息,只将符合条件的消息投递给消费者消费,实现消息的再次分类
  • TAG过滤:按照tag字符串进行全文过滤匹配
  • SQL92过滤:按照SQL语法对消息属性进行过滤匹配
 

消息持久化

notion image
 
  • Commitlog: 存储消息的元数据,所有的消息都会按照顺序存入到commitLog文件当中;CommitLog是由多个文件组成,每个文件固定大小1G;以第一条偏移量为名
  • ConsumerQueue:存储消息在CommitLog的索引;一个MessageQueue一个文件,记录当前MessageQueue被那些消费者组消费到了那条数据
  • IndexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送和消费消息的主流程
  • checkpoint:数据存盘检查;主要记录commitLog文件,consumerQueue文件以及IndexFile文件最后一次刷盘的时间戳
  • config/*.json: 这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置,消费者组配置,消费者组消息偏移量offset等等一些信息
  • abort: 该文件用来RocketMQ判断程序是否正常关闭的一个标识文件。正常情况下,会再启动创建,而关闭服务时删除。但是如果遇到一些宕机,或者kill -9这些非正常的关闭服务,这个abort文件就不会被删除;因此RocketMQ就可以判断上一次服务是非正常关闭,后续就会做一些数据恢复的操作
 

commitLog

CommitLog存储了所有的消息实体;所有生产者发过来的消息,都会无差别的依次的存储到CommitLog文件当中;
好处: 减少查找目标文件的时间,让消息以最快的速度落盘
对比Kafka存文件,需要寻找消息所属的Partition文件,在完成写入,当Topic较多的时候,这样的Partition寻址就会比较浪费时间,所以Kafka不适合多Topic的场景
RocketMQ的这种快速落盘的方式在多Topic的场景下,优势会比较明显
 
commitLog的文件大小是固定的,但是其存储的每个消息单元长度是不固定的
正因为消息的记录大小不固定,所以RocketMQ在每次存CommitLog文件时,都会去检查当前CommitLog文件空间是否足够,如果不够的话,会重新创建一个CommitLog文件
 

ConsumeQueue

consumeQueue文件主要是加速消费者的消费索引;它的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量;这样,消费者通过ConsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录;而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件中
 
每个ConsumeQueue文件由固定30万个固定大小20byte的数组块组成,数据块的内容包括:msgPhyOffset(8 byte,消息在文件中的起始位置) + msgSize(4 byte,消息在文件中占用的长度) + msgTagCode(8 byte,消息的tag的hash值)
 

IndexFile

辅助消息的检索;消费者进行消息消费时,通过ConsumeQueue文件就足够完成消息检索了,但是如果按照MessageId或者messageKey来检索文件,就不够了(比如RocketMQ的管理台);IndexFile文件就是用来辅助这类消息检索的
他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件
 
文件由 indexHeader(固定40byte) + slot(固定500w个,每个固定20byte) + index(最多500w*4个,每个固定20byte)三部分组成
 

过期文件删除

消息既然要持久化,就必然有对应的删除机制
RocketMQ内置了一套过期文件的删除机制
 

如何判断过期文件

RocketMQ中,CommitLog文件和ConsumeQueue文件是以偏移量命名的,对于非当前写的文件,如果超过了一定的保留时间,那么这些文件都会被认为是过期文件,随时可以删除。
保留时间就是broker.conf中配置的fileReservedTime属性
 
注意,RocketMQ判断文件是否过期的唯一标准就是非当前写文件的保留时间,而并不关心文件当中的消息是否被消费过。所以,RocketMQ的消息堆积也是有时间限度的
 

如何删除过期文件

RocketMQ内部有一个定时任务,对文件进行扫描,并且触发文件删除的操作。用户可以指定文件删除操作的执行时间。在broker.conf中deleteWhen属性指定。默认是凌晨四点
 
RocketMQ还会检查服务器的磁盘空间是否足够,如果磁盘空间的使用率达到一定阈值,也会触发过期文件删除;所以RocketMQ官方就特别建议,borker的磁盘空间不要少于4G
 

高效写文件

零拷贝加速文件读写

零拷贝是操作系统层面提供的一种加速文件读写的操作机制,对应多的开源项目都在大量使用零拷贝,来提升IO操作的性能
对于java层面,对应着mmap和sendFile两种方式

理解CPU拷贝和DMA拷贝

操作系统对于内存空间,是分为用户态和内核态的。用户态的应用程序无法直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这其实是为了保护操作系统的安全。正因为如此,应用程序需要与网卡、磁盘等硬件进行数据交互时,就需要在用户态和内核态之间来回的复制数据。而这些操作,原本都是需要由CPU来进行任务的分配、调度等管理步骤的,早先这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,CPU的占用率会非常高
之后,操作系统为了避免CPU完成被各种IO调用给占用,引入了DMA(直接存储器).有DMA来负责这些频繁的IO操作
DMA是一套独立的指令集,不会直接占用CPU的计算资源;这样CPU就不需要参与具体的数据复制的工作,只需要管理DMA权限即可
 
DMA拷贝极大的释放了CPU的性能,因此他的拷贝速度会比CPU拷贝要快很多;但是DMA拷贝本身,也在不断优化
notion image
 
 
引入DMA拷贝之后,在读写请求的过程中,CPU不需要参与具体的工作,DMA可以独立完成数据在系统内部的复制。但是数据的复制,依然需要借助数据总线进行;但系统内的IO操作过多时,还是会占用过多的数据总线,造成总线冲突,最终还是会影响数据读写的性能
 
为了避免DMA总线冲突对性能的影响,后来又引入了Channel通道的方式。Channel,是一个完全独立的处理器,专门负责IO操作。既然是处理器,Channel就有自己的IO指令,与CPU无关,他也更适合大型的IO操作,性能更高
notion image
 
这也是java层面的零拷贝相关的操作都是通过Channel的子类实现的
零拷贝技术,其实并不是不拷贝,而是经历减少了CPU拷贝
 

mmap文件映射机制

主要是通过java.nio.channels.FileChannel的map方法完成映射
 
以一次文件的读写操作为例子,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的的状态切换,每次状态切换的过程中,都需要大量的数据复制
在这个过程中,总共需要进行四次数据拷贝;而磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化为了DMA拷贝;而内核态与用户态之间拷贝依然是CPU拷贝;在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的两次拷贝
 
而mmap文件映射的方式,就是在用户态不在保存文件的内容,而只保存文件的映射,包括文件的内存起始地址,文件大小等
真实的数据,也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制
notion image
 
 
notion image
 
 
这种mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数是非常多的。
所以mmap机制非常适合操作小文件,如果文件太大,映射信息也会过大,容易造成问多问题
通过mmap机制建议的映射文件不要超过2g,而RocketMQ做大的commitLog文件保持在1g固定大小,也是为了方便文件映射
 

sendFile机制运行

主要是通过java.nio.channels.FileChannel的transferTo方法完成
 
 
早期的sendFile机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝;但是在后期不断改进过程中,sendfile优化了实现机制
在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据;而真实的数据内容,会交由DMA控制器,从缓存页中打包异步发送到socket中
notion image
 
 
notion image
 

顺序写加速文件写入磁盘

通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行
 
Kafka官方详细分析过顺序写的性能提升问题。Kafka官方曾说明,顺序写的性能基本能够达到内存级别。而如果配备固态硬盘,顺序写的性能甚至有可能超过写内存。而RocketMQ很大程度上借鉴了Kafka的这种思想
 
org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在这个方法中,会以追加的方式将消息先写入到一个堆外内存byteBuffer中,然后再通过fileChannel写入到磁盘
 

刷盘机制保证消息不丢失

 
在操作系统层面,当应用程序写入一个文件时,文件内容并不会直接写入到硬件当中,而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位,缓存文件的具体内容
 
这些写入PageCache中的文件,在应用程序看来,是已经完全落盘保存好了的,可以修改,复制等等
但是,本质上,PageCache依然是内存状态,所以一断电就会丢失。因此,需要将内存状态的数据写入到磁盘当中,这样数据才能真正完成持久化,断电也不会丢失。这个过程就称为刷盘
 
PageCache是源源不断产生的,而Linux操作系统显然不可能时时刻刻往硬盘写文件。所以,操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时,就会完成PageCache刷盘。另外,在Linux中,对于有数据修改的PageCache,会标记为Dirty(脏页)状态。当Dirty Page的比例达到一定的阈值时,就会触发一次刷盘操作。例如在Linux操作系统中,可以通过/proc/meminfo文件查看到Page Cache的状态
 
但是操作系统的刷盘不是时时刻刻执行的,那么对于用户态的应用程序来说,那就避免不了非正常宕机时的数据丢失问题。因此,操作系统也提供了一个系统调用,应用程序可以自行调用这个系统调用,完成page cache的强制刷盘
 
RocketMQ对何时刷盘也提供了两种刷盘机制,同步刷盘和异步刷盘
  • 同步刷盘: 在返回成功状态前,消息被写入磁盘;具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态
  • 异步刷盘: 在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
  • 配置方式: borker配置文件里的flushDiskType参数设置的,这个参数被配置成 SYNC_FLUSH , ASYNC_FLUSH 中的一个
同步刷盘机制会更频繁的调用fsync,所以吞吐量相比异步刷盘会降低,但是数据的安全性会得到提高
notion image
 
 
 

🤗 总结归纳

📎 参考文章

 
💡
有关文章的问题,欢迎您在底部评论区留言,一起交流~