2024年8月13日

RocketMQ 批处理模型演进之路
RocketMQ 的目标,是致力于打造一个消息、事件、流一体的超融合处理平台。这意味着它需要满足各个场景下各式各样的要求,而批量处理则是流计算领域对于极致吞吐量要求的经典解法,这当然也意味着 RocketMQ 也有一套属于自己风格的批处理模型。 至于什么样的批量模型才叫“属于自己风格”呢,且听我娓娓道来。 什么是批处理 首先,既然谈 RocketMQ 的批处理模型,那就得聊聊什么是“批处理”,以及为什么批处理是极致吞吐量要求下的经典解法。在我看来,批处理是一种泛化的方法论,它处在各个系统的方方面面,无论是传统工业还是互联网,甚至在日常生活中,都能看到它的身影。 批处理的核心思想是将多个任务或数据集合在一起,进行统一处理。这种方法的优势在于可以充分利用系统资源,减少任务切换带来的开销,从而提高整体效率。比如在工业制造中,工厂通常会将相同类型的零部件批量生产,以降低生产成本和提高生产速度。在互联网领域,批处理则表现为批量数据的存储、传输和处理,以优化性能和提升系统吞吐量。 批处理在极致吞吐量需求下的应用,更加显著。例如,在大数据分析中,海量的数据需要集中处理才能得出有意义的结果。如果逐条处理数据,不仅效率低下,还可能造成系统瓶颈。通过批处理,可以将数据划分为若干批次,在预定的时间窗口内统一处理,从而提高系统的并行处理能力,提升整体吞吐量。 此外,批处理其实并不意味着牺牲延时,就比如在 CPU Cache 中,对单个字节的操作无论如何时间上都是会优于多个字节,但是这样的比较并没有意义,因为延时的感知并不是无穷小的,用户常常并不关心 CPU 执行一条指令需要花多长时间,而是执行完单个“任务/作业”需要多久,在宏观的概念上,反而批处理具有更低的延时。 RocketMQ 批处理模型演进 接下来我们看看,RocketMQ 与批处理的“如胶似漆、形影相随”吧,其实在 RocketMQ 的诞生之初,就已经埋下了批处理的种子,这颗种子,我们暂且叫它——早期的批处理模型。 早期批处理模型 下图,是作为用户视角上感知比较强的老三样,分别是 Producer、Consumer、Broker: 而早期批处理模型,实际上只和 Producer、Broker 有关,在这条链路上会有批量消息的概念,当消息到达 Broker 后这个概念就会消失。基于这点我们来看具体是怎么回事。首先批量消息的源头实际上就是 Producer 端的 Send 接口,在大部分场景下,我们发送一条消息都会使用以下的形式去操作: ```java SendResult send(Message msg); ``` 非常地简明扼要,将一条消息发送到 Broker,如果我们要使用上早期的批处理模型,也只需要稍作修改: ```java SendResult send(Collection msgs) ``` 可以看到,将多条消息串成一个集合,然后依旧是调用 send 接口,就可以完成早期批处理模型的使用了(从用户侧视角看就已经 ok 了),就像下图一样,两军交战,谁火力更猛高下立判~ 那么真就到此为止了吗?当然不是,首先这里的集合是有讲究的,并不是随意将多条消息放在一起,就可以 send 出去的,它需要满足一些约束条件: 相同 Topic。 不能是 RetryTopic。 不能是定时消息。 相同 isWaitStoreMsgOK 标记。 这些约束条件暂时先不展开,因为就如同它字面意思一样浅显易懂,但是这也意味着它的使用并不是随心所欲的,有一定的学习成本,也有一定的开发要求,使用前需要根据这些约束条件自行分类,然后再装进“大炮”中点火发射。这里可能有人会问,这不是为难我胖虎吗?为什么要加这么多约束?是不是故意的?实际上并非如此,我们可以想象一下,假如我们是商家: 客户 A 买了两件物品,在发货阶段我们很自然的就可以将其打包在一起(将多个 Message 串成一个 ArrayList),然后一次性交给快递小哥给它 Send 出去,甚至还能省一笔邮费呢~ 客户 B 和客户 C 各买了一件物品,此时我效仿之前的行为打包到一起,然后告诉快递小哥这里面一个发到黑龙江,一个发到海南,然后掏出一笔邮费,然后。。。就没有然后了。 很显然,第二个场景很可能会收到快递小哥一个大大的白眼,这种事情理所应当的做不了,这也是为什么属于同一个 Collection 的消息必须要满足各种各样的约束条件了,在 Broker 实际收到一个“批量消息”时,会做以下处理: 首先它会根据这一批消息的某些属性,挑选出对应的队列,也就是上图中最底下的「p1、p2......」,在选定好队列之后,就可以进行后续的写入等操作了,这也是为什么必须要求相同 Topic,因为不同的 Topic 是没法选定同一个队列的。 接下来就到了上图所示流程,可以看到这里分别来了三个消息,分别是 《四条消息》《一条消息》《三条消息》,接下来他们会依次进入 unPack 流程,这个流程有点像序列化过程,因为从客户端发送上来的消息都是内存结构的,距离实际存储在文件系统中的结构还有一些不同。在 unPack 过程中,会分别解包成:四条消息、一条消息、三条消息;此时和连续 Send 八条消息是没有任何区别的,也就是在这一刻,批量消息的生命周期就走到了尽头,此刻往后,“众生平等、不分你我”。 也正是这个机制,Consumer 其实并不知道 Producer 发送的时候“到底是发射弓箭,还是点燃大炮”。这么做有个非常好的优点,那就是有着最高的兼容性,一切的一切好像和单条消息 Send 的经典用法没有任何区别,在这种情况下,每条消息都有最高的自由度,例如各自独立的 tag、独立的 keys、唯一的 msgId 等等,而基于这些所衍生出来的生态(例如消息轨迹)都是无缝衔接的。也就是说:只需要更换发送者使用的 Send 接口,就可以获得极大的发送性能提升,而消费者端无需任何改动。 索引构建流水线改造 我一向用词都非常的严谨,可以看到上一段的结尾:“获得极大的发送性能提升”,至于为什么这么讲,是因为距离整体系统的提升还有一些距离,也就是这一段的标题“索引构建流水线改造”。 首先我们要有一个共识,那就是对于消息队列这种系统,整体性能上限比值“消费/生产”应该要满足至少大于等于一,因为大部分情况下,我们的生产出来的消息至少应该被消费一次(否则直接都不用 Send 了岂不美哉)。 其实在以往,发送性能没有被拔高之前,它就是整个生产到消费链路上的短板,也就是说消费速率可以轻松超过生产速率,整个过程也就非常协调。but!在使用早期批处理模型后,生产速率的大幅度提升就暴露了另外一个问题,也就是会出现消费速率跟不上生产的情况,这种情况下,去谈整个系统的性能都是“无稽之谈”。 而出现消费速率短板的原因,还要从索引构建讲起。由于消费是要找到具体的消息位置,那就必须依赖于索引,也就是说,一条消息的索引构建完成之前,是无法被消费到的。 下图就是索引构建流程的简易图: 这是整个直接决定消费速率上限的流程。通过一个叫 ReputMessageService 的线程,顺序扫描 CommitLog 文件,将其分割为一条一条的消息,再对这些消息进行校验等行为,将其转换成一条条的索引信息,并写入对应分区的 ConsumeQueue 文件。 整个过程是完全串行的,从分割消息,到转换索引,到写入文件,每一条消息都要经过这么一次流转。因为一开始是串行实现,所以改造起来也非常的自然,那就是通过流水线改造,提高它的并发度,这里面有几个需要解决的问题: CommitLog 的扫描过程并行难度高,因为每条消息的长度是不一致的,无法简单地分割出消息边界来分配任务。 单条消息的索引构建任务并不重,因此不能简单忽略掉任务流转过程中的开销(队列入队出队)。 写入 ConsumeQueue 文件的时候要求写入时机队列维度有序,否则会带来额外的检查开销等。 针对这几个难点,在设计中也引入了“批量处理”的思路,其实大到架构设计、小到实现细节,处处都体现了这一理念,下图就是改造后的流程: 由于 CommitLog 扫描过程很难并行化处理,那就干脆不做并行化改造了,就使用单线程去顺序扫描,但是扫描的时候会进行一个简单的批处理,扫描出来的消息并不是单条的,而是尽可能凑齐一个较大的 buffer 块,默认是 4MB,这个由多条消息构成的 buffer 块我们不妨将其称为一个 batch msg。 然后就是对这些 batch msg 进行并行解析,将 batch msg 以单条消息的粒度扫描出来,并构建对应的 DispatchRequest 结构,最终依次落盘到 ConsumeQueue 文件中。其中的关键点在于 batch msg 的顺序如何保证,以及 DispatchRequest 在流转时怎么保证顺序和效率。为此我专门实现了一个轻量级的队列 DispatchRequestOrderlyQueue,这个 Queue 采用环状结构,可以随着顺序标号不断递进,并且能做到 “无序入队,有序出队”,详细设计和实现均在开源 RocketMQ 仓库中,这里就不多赘述。 在经过改造后,索引构建流程不再成为扯后腿的一员,从原本眼中钉的角色美美隐身了~ BatchCQ 模型 经过上述索引构建流水线改造后,整个系统也就实现了最基本的批处理模型,可以在最小修改、最高兼容性的情况下让性能获得质的飞跃。 但是这并不够!因为早期的模型出于兼容性等考虑,所以依旧束手束脚的,于是 BatchCQ 模型诞生了,主要原因分为两个维度: 性能上: 早期模型中,Broker 端在准备写入阶段需要进行解包,会有一定的额外开销。 CommitLog 文件中不具备批量信息,索引需要分多次构建。 能力上: 无法实现端到端的批量行为,如加密、压缩。 那 BatchCQ 又是如何改进上述的问题的呢?其实也非常地直观,那就是“见字如面”,将 ConsumeQueue 也批量化。这个模型去掉 Broker 端写入前的解包行为,索引也只进行一次构建: 就像上图所示,如果把索引比做信封,原先每个信封只能包含一份索引信息,在批量化后则可以塞下任意数量的索引信息,具体的存储结构也发生了较大变化: 比如说如果来了两批消息,分别是(3+2)条,在普通的 CQ 模型里会分别插入 5 个 slot,分别索引到 5 条消息。但是在 BatchCQ 模型中,(3+2)条消息会只插入 2 个 slot,分别索引到 3 条以及 2 条。 也是因为这个特点,所以 CQ 原有的格式也发生了变化,为了记录更多信息不得不加入 Base Offset、Batch Num 等元素,而这些更改也让原来定位索引位置的逻辑发生了变化。 普通 CQ:每个 Slot 定长,【Slot 长度 QueueOffset】位点可以直接找到索引,复杂度 O(1)。 BatchCQ:通过二分法查找,复杂度 O(log n)。 虽然这部分只涉及到了 ConsumeQueue 的修改,但是它作为核心链路的一环,影响是很大的,首先一批的消息会被当作同一条消息来处理,不需要重新 unPack ,而且这些消息都会具有相同的 TAG、Keys 甚至 MessageId,想唯一区分同一批的消息,只能根据它们的 QueueOffset 了,这一点会让消息轨迹等依靠 MessageId 的能力无法直接兼容使用,但是消息的处理粒度依然可以保持不变(依赖的是 QueueOffset)。 AutoBatch 模型 通过 BatchCQ 改造之后,我们其实已经获得极致的吞吐量了。那个 AutoBatch 又是个啥呢? 这里又要从头说起,在早期批处理模型的总结里,提到了一个比较大的缺陷,那就是“使用起来不够顺手”,用户是需要关心各种约束条件的,就像前面提到的 Topic、消息类型、特殊 Flag 等,在 BatchCQ 里面其实是新增了 Keys、Tag 等维度的限制,错误使用会出现一些非预期的情况。 不难看出,无论是早期批处理模型、还是 BatchCQ 模型,使用起来都有一定的学习成本,除了需要关注各种使用方式外,想要用好,还有一些隐藏在暗处的问题需要主动去解决: 无论是早期的批处理模型,还是 batchCQ 模型,都需要发送端自行将消息分类打包。 消息分类和打包成本高,分类需要关心分类依据,打包需要关心触发时机。 分类依据复杂,早期批处理模型需要关注多个属性,batchCQ 在这基础上新增了多个限制。 打包时机不易掌握,使用不当容易出现性能下降、时延不稳定、分区不均衡等问题。 为了解决以上问题,AutoBatch 应运而生,它就是一台能自动分拣的无情打包机器,全天候运转,精密又高效,将以往需要用户关注的细节统统屏蔽,它具有以下几个优点: AutoBatch 托管分类和打包能力,只需要简单配置即可使用。 用户侧不感知托管的过程,使用原有发送接口即可享受批处理带来的性能提升,同时兼容同步发送和异步发送。 AutoBatch 同时兼容早期的批处理模型和 batchCQ 模型。 实现轻量,性能优秀,设计上优化延时抖动、小分区等问题。 首先到底有多简单呢?让我们来看一下: ```java // 发送端开启 AutoBatch 能力 rmqProducer.setAutoBatch(true); ``` 也就是说,只需要加入这么一行,就可以开启 RocketMQ 的性能模式,获得早期的批处理模型或者 BatchCQ 模型带来的极致吞吐量提升。在开启 AutoBatch 的开关后,用户所有已有的行为都不需要作出改变,使用原来经典的 Send(Message msg)即可;当然也可以进行更精细的内存控制和延时控制: ```java // 设置单个 MessageBatch 大小(kb) rmqProducer.batchMaxBytes(32 1024); // 设置最大聚合等待时间(ms) rmqProducer.batchMaxDelayMs(10); // 设置所有聚合器最大内存使用(kb) rmqProducer.totalBatchMaxBytes(32 1024 1024); ``` 那么它具体轻量在哪?又高效在哪?下面这个简易的流程图应该能给大家一个答案: 首先它只引入了一个单线程的背景线程——background thread,这个背景线程以 1/2 的 maxDelayMs 周期运行,将扫描到超过等待时机缓冲区的消息提交到异步发送的线程池中,此时就完成了时间维度的聚合。空间维度的聚合则是由发送线程在传递时进行检查,如果满足 maxBytes,则原地发送。 整个设计非常地精简,只额外引入了一个周期运行的线程,这样做可以避免因为 AutoBatch 模型本身出现性能短板,而且 batchMessage 的序列化过程也做了精简,去掉了发送时候所有的检测(在聚合过程中已提前分类)。 才艺展示 上面分享了 RocketMQ 在批处理模型上的演进,那么它们具体效果也就必须拉出来给大家做一个才艺展示了,以下所有的压测结果均来自于 OpenmessagingBenchmark 框架,压测中使用的各项配置如下所示: | | 压测机器 | x86芯片机器 | | | | | | 规格 | 32核(vCPU)64 GiB20 Mbpsecs.c7.8xlarge | 8核(vCPU)64 GiB20 Mbpsecs.r7.2xlarge | | 云盘 | 无 | ESSD云盘 PL1 965GiB (50000 IOPS) | | 操作系统 | Alibaba Cloud Linux 3.2104 LTS 64位 | Alibaba Cloud Linux 3.2104 LTS 64位 | | JDK版本 | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | openjdk version "11.0.19" 20230418 LTSOpenJDK Runtime Environment (Red_Hat11.0.19.0.71.0.1.al8) (build 11.0.19+7LTS) | 准备工作 为 OpenmessagingBenchmark 进行压测环境,首先部署一套开源社区上最新的 RocketMQ,然后配置好 Namesrv 接入点等信息,然后打开 RocketMQ 的性能模式——AutoBatch,将 autoBatch 字段设置为 true: 早期批处理模型 ```java bin/benchmark drivers driverrocketmq/rocketmq.yaml workloads/1topic100partitions1kb4p4c1000k.yaml ``` 开启 autobatch 能力后,就会使用早期批处理模型进行性能提升,可以看到提升幅度非常大,由原来的 8w 提升至 27w 附近,为原来的 300%。 索引构建流水线优化 流水线优化是需要在服务端开启的,下面是一个简单的配置例子: ```java // 开启索引构建流水线优化 enableBuildConsumeQueueConcurrently=true // 调整内存中消息最大消费阈值 maxTransferBytesOnMessageInMemory=256M maxTransferCountOnMessageInMemory=32K // 调整磁盘中消息最大消费阈值 maxTransferBytesOnMessageInDisk=64M maxTransferCountOnMessageInDisk=32K ``` 可以看到,只有开启索引构建优化,才能做到 稳稳地达到 27w 的吞吐,在没有开启的时候,消费速率不足会触发冷读直至影响到整个系统的稳定性,同时也不具备生产意义,所以在使用批量模型的时候也务必需要开启索引构建优化。 BatchCQ模型 BatchCQ 模型的使用与前面提到的两者不同,它不是通过开关开启的,BatchCQ 其实是一种 Topic 类型,当创建 topic 的时候指定其为 BatchCQ 类型,既可拥有最极致的吞吐量优势。 ```java // Topic 的各种属性在 TopicAttributes 中设置 public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute("queue.type", false, newHashSet("BatchCQ", "SimpleCQ"), "SimpleCQ"); topicConfig.getAttributes().put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "BatchCQ"); ``` 当使用 BatchCQ 模型的时候,与早期批处理模型已经有了天壤之别,因此我们寻求了和开源 Kafka 的对比,部署架构如下: RocketMQ 3 主 3 备架构,使用轻量级 Container 部署。 节点 1: MasterA,SlaveC 节点 2: MasterC,SlaveB 节点 3: MasterB,SlaveA Kafka 3 个节点,设置分区副本数为 2。 压测结果 | | MQ | Kafka | | | | | | 16partions | TPS: 251439.34P99: 264.0 | TPS: 267296.34P99: 1384.01 | | 10000partiotions | TPS: 249981.94P99: 1341.01 | 报错无数据 | 可以看到,在使用 BatchCQ 类型的 Topic 时,RocketMQ 与 Kafka 的性能基本持平: 16partitions,二者吞吐量相差 5% 以内,且 RocketMQ 则具有明显更低的延时表现。 10000partitions,得益于 RocketMQ 的存储结构更为集中,在大量分区场景下吞吐量几乎保持不变。而Kafka在默认配置的情况下出现报错无法使用。 因此在极致吞吐量的需求下,BatchCQ 模型能够很好地承接极致需求的流量,而且如果更换性能更好的本地磁盘,同样的机器配置能达到更高的上限。
作者:谷乂
#技术探索 #功能特性

2024年8月9日

深度剖析 RocketMQ 5.0 之消息进阶:如何支撑复杂业务消息场景?
简介: 本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 1. 前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2. 背景 今天的课程是 RocketMQ 5.0 消息进阶。这节课依然聚焦在业务消息场景,我们在 RocketMQ 5.0 概述里面就提到 RocketMQ 可以应对复杂的业务消息场景。这节课我们就从功能特性的角度出发,来看 RocketMQ 是如何去解决复杂业务场景的。 第一部分会先学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。第二部分,我们从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。第三部分,我们再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。 3. 一致性 3.1. 事务消息 3.1.1. 场景 我们先来看 RocketMQ 的第一个特性——事务消息,这是和一致性相关的特性,这也是 RocketMQ 有别于其他消息队列的一个最具区分度的特性。我们还是继续沿用大规模电商系统的案例,如图,我们仔细梳理一下流程,付款成功会在交易系统中订单数据库将订单状态更新为已付款,然后交易系统再发一条消息给 RocketMQ,RocketMQ 把订单已付款的事件通知给所有下游的应用,保障后续的履约环节。 但是这个流程有个问题,就是交易系统写数据库和发消息是分开的,它不是一个事务。会出现多种异常情况,比如数据库写成功了,但消息发失败了,这个订单的状态下游应用接收不到,对于电商业务可能就造成大量用户付款了,但是卖家不发货。如果先发消息成功,再写数据库失败,会造成下游应用认为订单已付款,推进卖家发货,但是实际用户未付款成功。这些异常都会对电商业务造成大量脏数据,产生灾难性业务后果。 这就需要使用 RocketMQ 高阶特性——事务消息。事务消息的能力是要保障生产者的本地事务(如写数据库)、发消息事务的一致性,最后通过 Broker at least once 的消费语义,保证消费者的本地事务也能执行成功。最终实现生产者、消费者对同一业务的事务状态达到最终一致。 3.1.2. 原理 如下图,事务消息的实现是两个阶段:提交+事务补偿机制结合实现的。 首先,生产者会发送 half 消息,也就是 prepare 消息,broker 会把 half 队列中。接下来生产者执行本地事务,一般是写数据库,本地事务完成后,会往 RocketMQ 发送 commit 操作,RocketMQ 会把 commit 操作写入 OP 队列,并进行 compact,把已提交的消息写到 ConsumeQueue 对消费者可见。反过来如果是 rollback 操作,则会跳过对应的 half 消息。面对异常的情况,比如生产者在发送 commit 或者 rollback 之前宕机了,RocketMQ broker 还会有补偿检查机制,定期回查 Producer 的事务状态,继续推进事务。 无论是 Prepare 消息、还是 Commit/Rollback 消息、或者是 compact 环节,在存储层面都是遵守 RocketMQ 以顺序读写为主的设计理念,达到最优吞吐量。 3.1.3. demo 接下来,我们来看一个事务消息的简单示例。 使用事务消息需要实现一个事务状态的查询器,这也是和普通消息一个最大的区别。如果我们是一个交易系统,这个事务回查器的实现可能就是根据订单 ID 去查询数据库来确定这个订单的状态到底是否是提交,比如说创建成功、已付款、已退款之类的。主体的消息生产流程也有很多不同,需要开启分布式事务,进行两阶段提交,先发一个 prepare 的消息,然后再去执行本地事务。这里的本地事务一般就是执行数据库操作。然后如果本地事务执行成功的话,就整体 commit,把之前的 prepare 的消息提交掉。这样一来,消费者就可以消费这条消息。如果本地事务出现异常的话,那么就把整个事务 rollback 掉,之前的那条 prepare 的消息也会被取消掉,整个过程就回滚了。事务消息的用法变化主要体现在生产者代码,消费者使用方式和普通消息一致,demo 里面就不展示了。 3.2. 顺序消息 3.2.1. 场景 + 原理 第二个高级特性是顺序消息,这个也是 RocketMQ 的特色能力之一。它解决的是顺序一致性的问题,要保障同一个业务的消息,生产和消费的顺序保持一致。在阿里曾有个场景是买卖家数据库复制,由于阿里订单数据库采用分库分表技术,面向买卖家不同的业务场景,分别按照买家主键和卖家主键拆分成买卖家数据库。两个数据库的同步就是采用了 Binlog 顺序分发的机制,通过使用顺序消息,把买家库的 Binlog 变更按照严格顺序在卖家库回放,以此达到订单数据库的一致性。如果没有顺序保障,那么就可能出现数据库级别的脏数据,将会带来严重的业务错误。 顺序消息的实现原理如下图,充分利用 Log 天然顺序读写的特点高效实现。在 Broker 存储模型中,每个 Topic 都会有固定的 ConsumeQueue,可以理解为 Topic 的分区生产者为发送消息加上业务 Key,在这个 case 里面可以用订单 ID,同一订单 ID 的消息会顺序发送到同一个 Topic 分区,每个分区在某个时刻只会被一个消费者锁定,消费者顺序读取同一个分区的消息串行消费,以此来达到顺序一致性。 3.2.2. demo 接下来,我们来看顺序消息的一个简单demo。 对于顺序消息来说,生产者跟消费者都有需要注意的地方。 在生产阶段,首先要定义一个消息的 group。每条消息都可以选择一个业务 ID 作为消息 Group,这个业务 ID 尽量离散、随机。因为同一个业务 ID 会分配到同一个数据存储分片,生产和消费都在这个数据分片上串行,如果业务 ID 有热点,会造成严重的数据倾斜和局部消息堆积。比如说在电商交易的场景,一般会选择订单 ID 进行业务消息分组,因为订单 ID 会比较离散。但如果我们选择的是卖家 ID,就有可能会出现热点,热点卖家的流量会远大于普通卖家。 在消费阶段的话,消费阶段有跟常规的消息收发一样有两种模式,一种是全托管的 push consumer 模式,一种是半托管的simple consumer 的模式,RocketMQ SDK 会保障同一个分组的消息串行进入业务消费逻辑。需要注意的点是,我们自身的业务消费代码也要串行进行,然后同步返回消费成功确认。不要把同分组的消息又放到另外的线程池并发消费,这样会破坏顺序语义。 4. 大规模业务 4.1. SQL 过滤 4.1.1. 场景 第三个高级特性是 SQL 消费模式,这个也是复杂业务场景的刚需。我们回到阿里的电商场景,阿里的整个电商业务都是围绕着交易展开,有数百个不同的业务在订阅交易的消息。这些业务基本上都面向某个细分领域,都只需要交易 Topic 下的部分消息。按照传统的模式,一般就是全量订阅交易 Topic,在消费者本地过滤即可,但是这样会消耗大量的计算、网络资源,特别是在双十一的峰值,这个方案的成本是无法接受的。 4.1.2. 原理 为了解决这个问题,RocketMQ 提供了 SQL 消费模式。在交易场景下,每笔订单消息都会带有不同维度的业务属性,包括卖家 ID、买家 ID、类目、省市、价格、订单状态等属性,而 SQL 过滤就是能让消费者通过 SQL 语句过滤消费目标消息。如下图,某个消费者只想关注某个价格区间内的订单创建消息,于是创建这个订阅关系 【Topic=Trade ,SQL: status= order create and (Price between 50 and 100)】,Broker 会在服务端运行 SQL 计算,只返回有效数据给消费者。为了提高性能,Broker 还引入了布隆过滤器模块,在消息写入分发时刻,提前计算结果,写入位图过滤器,减少无效 IO。总的来说就是把过滤链路不断前置,从消费端本地过滤,到服务端写时过滤,达到最优性能。 4.1.3. demo 接下来,我们再来看一个 SQL 订阅的示例。目前 RocketMQ SQL 过滤支持如下的语法,包括属性非空判断、属性大小比较、属性区间过滤、集合判断和逻辑计算,能满足绝大部分的过滤需求。 在消息生产阶段,我们除了设置 Topic、Tag 之外,还能添加多个自定义属性。比如在这个案例里面,我们设置了一个 region 的属性,表示这条消息是从杭州 region 发出来的。在消费的时候,我们就可以对根据自定义属性来进行 SQL 过滤订阅了。第一个 case 是我们用了一个 filter expression,判断 region 这个字段不为空且等于杭州才消费。第二个 case 添加更多的条件,如果这是一笔订单消息,我们还可以同时判断 region 条件和价格区间来决定是否消费。第三个 case 是全接收模式,表达式直接为 True,这个订阅方式会接收某一个主题下面的全量消息,不进行任何过滤。 4.2. 定时消息 4.2.1. 场景+原理 第四个高级特性是定时消息,生产者可以指定某条消息在发送后经过一定时间后才对消费者可见。有不少业务场景需要大规模的定时事件触发,比如典型的电商场景,基本上都有订单创建30分钟未付款就自动关闭订单的逻辑,定时消息能为这个场景带来极大的便利性。 RocketMQ 的定时消息是基于时间轮来实现的。TimerWheel,相信大家并不陌生,模拟表盘转动,来达到对时间进行排序的目的。TimerWheel 中的每一格代表着最小的时间刻度,称之为Tick,RocketMQ 里面是每一个 Tick 为一秒,同一个时刻的消息会写到同一个格子里。由于每个时刻可能会同时触发多条消息,并且每条消息的写入时刻都不一样,所以 RocketMQ 也同时引入了 Timerlog 的数据结构,Timerlog 按照顺序 append 方式写入数据,每个元素都包含消息的物理索引、以及指向同一个时刻的前一条消息,组成一个逻辑链表。TimeWheel 的每个格子都维护这个时刻的消息链表的头尾指针。类似表盘,TimerWheel 会有一个指针,代表当前时刻,绕着 TimerWheel 循环转动,指针所指之处,代表这一个 Tick 到期,所有内容一起弹出,会写到 ConsumeQueue,对消费者可见。 目前 RocketMQ 的定时消息性能已经远超 RabbitMQ 和 ActiveMQ。 5. 全局高可用 接下来,我们再讲一下 RocketMQ 的全局高可用技术解决方案。 在消息基础原理的文章里,我们提到 RocketMQ 的高可用架构,主要是指 RocketMQ 集群内的数据多副本和服务高可用。今天我们这里讲的高可用是全局的,就是业界经常说的同城容灾、两地三中心、异地多活等架构。现在蚂蚁支付和阿里交易采用的是异地多活的架构。异地多活相对于冷备、同城容灾、两地三中心模式具备更多的优点,可以应对城市级别的灾难,如地震、断电等事件。除此之外,一些因为人为的操作,比如说某个基础系统变更,引入新的 bug,导致的整个机房级别的不可用,异地多活的架构可以直接把流量切到可用机房,优先保障业务连续性,再去定位具体的问题。另一方面,异地多活还能实现机房级别的扩容,单一机房的计算存储资源是有限的,异地多活架构可以把业务流量按照比例分散在全国各地机房。同时多活架构实现了所有机房都在提供业务服务,而不是冷备状态,资源利用率大幅度提升。由于是多活状态,面对极端场景的切流,可用性更有保障,信心更足。 在异地多活的架构中,RocketMQ 承担的是基础架构的多活能力。多活的架构分为几个模块: 首先是接入层,通过统一接入层按照业务 ID 把用户请求分散到多个机房,业务ID一般可采用用户ID。 其次是应用层,应用层一般无状态,当请求进入某个机房后,需要尽量保障该请求的整个链路都在单元内封闭,包括 RPC、数据库访问、消息读写。这样才可以降低访问延迟,保障系统性能不会因为多活架构衰退。 再往下是数据层,包括数据库,消息队列等有状态系统。这里侧重讲解 RocketMQ 的异地多活,RocketMQ 通过 connector 组件,实现按 topic 粒度实时同步消息的数据;按照 Consumer 和 Topic 的组合粒度实时同步消费状态。 最后还需要全局的管控层。管控层要维护全局的单元化规则,哪些流量走到哪些机房;管理多活元数据配置,哪些应用需要多活、哪些 Topic 需要多活;另外在切流时刻,要协调所有系统的切流过程,控制好切流顺序。
作者:隆基
#技术探索 #功能特性

2024年7月24日

Apache RocketMQ ACL 2.0 全新升级
引言 RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。 升级的背景 ACL 1.0 痛点问题 RocketMQ ACL 1.0 的认证和授权流程如上图所示,在使用过程中,存在着以下痛点问题: 绕过访问控制的 IP 白名单:在标准安全实践中,IP 白名单通常用于限制客户端只能从特定 IP 或 IP 段访问资源。然而,ACL 1.0 中,IP 白名单被异常用于绕过鉴权验证的手段, 偏离了标准实践中的安全意图。这种设计上的偏差可能造成潜在的安全隐患,特别是在公网场景中,多个客户端共享同一个 IP 的情况下,会导致未授权的 IP 地址绕过正常的访问控制检查对集群中的数据进行访问。 缺乏管控 API 精细化控制:RocketMQ 提供了 130 多个管控 API,支持了集群配置,Topic、Group 的元数据管理,以及消息查询、位点重置等操作。这些操作涉及到敏感数据的处理,以及影响系统的稳定性。因此,根据用户不同角色或职责,精确定义可访问的 API 和数据范围变得至关重要。然而,ACL 1.0 仅对其中 9 个 API 进行了支持,包括 Topic、Group 元数据,以及Broker配置,剩下的 API 有可能被攻击者利用,对系统进行攻击,窃取敏感的数据。此外,要实施对这么多的管控 API 进行访问控制,现有的设计会导致大量的编码工作,并且在新增 API 时也增加了遗漏的风险。 缺少集群组件间访问控制:在 RocketMQ 架构中,涵盖了 NameServer、Broker 主从节点、Proxy 等多个关键组件。目前,这些组件之间的互相访问缺失了关键的的权限验证机制。因此,一但旦在集群外自行搭建 Broker 从节点或 Proxy 组件,便可以绕过现有的安全机制,访问并获取集群内的敏感数据,这无疑给系统的数据安全和集群的稳定性造成巨大的威胁。 特性与原理 ACL 2.0 新特性 RocketMQ ACL 2.0 针对 ACL 1.0 中的问题进行了解决,同时还带来了六个主要的新特性,具体如下: 精细的API资源权限定义:ACL 2.0 对 RocketMQ 系统中所有的资源都进行了定义,包括集群、命名空间、主题、消费者组,以实现对所有类型的资源进行独立的访问控制。此外,它将所有的 API 都纳入权限控制范畴,覆盖了包括消息收发、集群管理、元数据等各项操作,确保所有资源的任何操作都施加了严格的权限控制。 授权资源的多种匹配模式:在资源众多的集群环境中,为每个资源进行逐一授权会带来繁复的配置过程和管理负担。因此,ACL 2.0 引入了三种灵活的匹配模式:完全匹配、前缀匹配,以及通配符匹配。这些模式可以让用户根据资源的命名规范和结构特点,快速地进行统一的设定,简化权限的管理操作,提升配置的效率。 支持集群组件间访问控制:由于将所有资源类型和API操作都纳入了访问控制体系,集群内部组件之间的连接和访问也受到了权限控制,包括 Broker 主从之间的 Leader 选举、数据复制的过程,以及 Proxy 到 Broker 的数据访问等环节,这可以有效地避免潜在的数据泄露问题和对系统稳定性的风险,加强了整个集群的安全性和可靠性。 用户认证和权限校验分离:通过对认证和授权这两个关键模块进行解耦,系统可以提供类似“只认证不鉴权”等方式的灵活选择,以适应各种不同场景的需求。此外,两个组件可以分别演进、独立发展,从而诞生出多样的认证方式和先进的鉴权方法。 安全性和性能之间的平衡:当启用 ACL 后,客户端的每次请求都必须会经过完整的认证和授权流程。这确保了系统的安全性,但同时也引入了性能上的开销。在 ACL 2.0 中,提供了无状态认证授权策略和有状态认证授权策略,来分别满足对安全有极致要求,以及安全可控但性能优先这两种不同的安全和性能需求。 灵活可扩展的插件化机制:当前市场上,认证方式存在多种实现,授权方式也有不同场景的定制需求。因此,ACL 2.0 设计了一套插件化的框架,在不同层面上进行接口的定义和抽象,以支持未来对认证和授权进行扩展,满足用户根据自身业务需求定制和实现相应的解决方案。 访问控制模型 基于角色的访问控制(RBAC)和基于属性的访问控制(ABAC)是访问控制体系中两种主要的方法。RocketMQ ACL 2.0 将这两种方法进行了融合,打造出了一套更加灵活和强大的访问控制系统。RBAC 是基于角色的访问控制模型,通过角色进行权限的分配。RocketMQ ACL 2.0 将用户角色划分为超级用户(Super)和普通用户(Normal),超级用户具有最高级别的权限,能够无需授权即可访问资源,这简化了集群初始化及日常运维过程中的权限依赖问题。而普通用户在访问资源之前,需要被赋予相应的权限,适用于业务场景中,对资源进行按需访问。ABAC 是基于属性的访问控制模型,通过用户、资源、环境、操作等多维属性来表达访问控制策略。RocketMQ ACL 2.0 为普通用户提供了这种灵活的访问控制机制。帮助管理员根据业务需求、用户职责等因素,对资源进行更加精细的访问控制。在安全体系中,认证和授权分别扮演着不同的角色,RocetMQ ACL 2.0 将认证和授权进行了模块分离。这可以确保两个组件各司其职,降低系统的复杂度。认证服务致力于验证用户身份的合法性,而授权服务则专注于管理用户权限和访问控制。这样的划分不仅可以让代码更易于管理、维护和扩展,也为用户提供了使用上的灵活性。根据需求,用户可以选择单独启用认证或授权服务,也可以选择同时启用两者。这使得 RocketMQ ACL 既可以满足简单场景的快速部署,也能够适应复杂环境下对安全性的严格要求。 认证(Authentication) 认证作为一种安全机制,旨在验证发起访问请求者的身份真实性。它用于确保只有那些经过身份验证的合法用户或实体才能访问受保护的资源或执行特定的操作。简而言之,认证就是在资源或服务被访问之前回答“你是谁?”这个问题。RocketMQ ACL 2.0 版本维持了与 ACL 1.0 相同的认证机制,即基于 AK/SK 的认证方式。这种方式主要通过对称加密技术来核验客户端的身份,保证敏感的认证信息(如密码)不会在网络上明文传输,从而提升了整体的认证安全性。 主体模型 为了提升 RocketMQ 系统的访问控制和权限管理,ACL 2.0 针对主体模型做了以下改进和扩展: 1. 统一主体模型的抽象:为了实现不同实体的访问控制和权限管理,设计了统一的主体接口,允许系统中多个实例作为资源访问的主体。用户作为访问资源的主体之一,按照该模型实现了主体的接口。这为未来新实体类型的权限适配提供了扩展能力。 2. 角色分级与权限赋予: 超级用户:为了简化管理流程,超级用户被自动授予了全部权限,无需单独配置,从而简化了系统的初始化和日常的运维管理工作。 普通用户:普通用户的权限则需要明确授权。ACL 2.0 提供了相关的权限管理工具,可以根据组织的政策和安全需求,为普通用户赋予合适的权限。 3. 支持用户状态管理:为了应对可能出现的安全风险,比如用户密码泄露,ACL 2.0 提供了用户的启用与禁用功能。当发生安全事件,可以通过禁用用户状态,快速进行止血,从而达到阻止非法访问的目的。 认证流程 客户端流程: 1. 客户端在构建 RPC 请求时,检查是否设置了用户名和密码,若未配置,则直接发送请求; 2. 若已配置,则使用预设的加密算法对请求参数进行加密处理,并生成对应的数字签名(Signature)。 3. 在请求中附加用户名和 Signature,并将其发送至服务端以进行身份验证。 服务端流程: 1. 服务端接收到请求后,首先检查是否开启认证,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求进行认证相关的参数进行解析和组装,获取包括用户名和 Signature 等信息。 3. 通过用户名在本地库中查询用户相关信息,用户不存在,则返回处理无;用户存在,则进入下一步。 4. 获取用户密码,采用相同的加密算法对请求进行加密生成 Signature,并和客户端传递的 Signature 进行比对,若两者一致,则认证成功,不一致,则认证失败。 授权(Authorization) 核心概念 授权作为一种安全机制,旨在确定访问请求者是否拥有对特定资源进行操作的权限。简而言之,授权就是在资源被访问之前回答“谁在何种环境下对哪些资源执行何种操作”这个问题。基于“属性的访问控制(ABAC)”模型,RocketMQ ACL 2.0 涵盖了以下一系列的核心概念。在系统实现中,都会以以下概念作为指导,完成整个权限管理和授权机制的设计和实现。 权限模型 基于属性的访问控制(ABAC)模型的核心概念,ACL 2.0 对权限模型做了精心的设计,要点如下: 向后兼容的权限策略:默认情况下,ACL 2.0 只匹配和检验用户自定义的权限,若未找到匹配项,则视为无权限访问资源。但考虑到 ACL 1.0 中,存在默认权限的设置,允许对未匹配资源进行“无权限访问”和“有权限访问”的默认判定。因此,我们针对默认权限策略进行了兼容,确保 ACL 1.0 到 ACL 2.0 的无缝迁移。 灵活的资源匹配模式:在资源类型方面,ACL 2.0 支持了集群(Cluster)、命名空间(Namespace)、主题(Topic)、消费者组(Group)等类型,用于对不同类型的资源进行访问控制。在资源名称方面,引入了完全匹配(LITERAL)、前缀匹配(PREFIXED),以及通配符匹配(ANY)三种模式,方便用户根据资源的命名规范和结构,快速设定统一的访问规则,简化权限的管理。 精细的资源操作类型:在消息的发送和消费的接口方面,分别定义为 PUB 和 SUB 这两种操作。在集群和资源的管理的接口方面,分别定义为 CREATE、UPDATE、DELETE、LIST、GET 五种操作。通过这种操作类型的细化,可以帮助用户在资源的操作层面,无需关心具体的接口定义,简化对操作的理解和配置。 坚实的访问环境校验:在请求访问的环境方面,ACL 2.0 加入了客户端请求 IP 来源的校验,这个校验控制在每个资源的级别,可以精确到对每个资源进行控制。IP 来源可以是特定的 IP 地址或者是一个 IP 段,来满足不同粒度的 IP 访问控制,为系统的安全性增添一道坚实的防线。 授权流程 客户端流程: 1. 客户端在构建 RPC 请求时,构建本次调用的接口入参,接口对应权限背后的操作定义。 2. 客户端在接口入参中设置本次访问的资源信息,然后将用户和资源等参数传递到服务端。 服务端流程: 1. 服务端在收到请求后,首先检查是否开启授权,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求中和授权相关的参数进行解析和组装,这些数据包括用户信息、访问的资源、执行的操作,以及请求的环境等。 3. 通过用户名在本地数据存储中查询用户相关信息,若用户不存在,则返回错误;若用户存在,则进入下一步。 4. 判断当前用户是否是超级用户,若超级用户,则直接通过请求,无需做授权检查,若普通用户,则进入下一步进行详细的授权检查。 5. 根据用户名获取相关的授权策略列表,并对本次请求的资源、操作,以及环境进行匹配,同时按照优先级进行排序。 6. 根据优先级最高的授权策略做出决策,若授权策略允许该操作,则返回授权成功,若拒绝该操作,则返回无权限错误。 授权参数的解析 在 ACL 2.0 中,更具操作类型和请求频率,对授权相关参数(包括资源、操作等)的解析进行了优化。1. 硬编码方式解析对于消息发送和消费这类接口,参数相对较为复杂,且请求频次也相对较高。考虑到解析的便捷性和性能上的要求,采用硬编码的方式进行解析。2. 注解方式解析对于大量的管控接口,采用硬编码的方式工作量巨大,且这些接口调用频次较低,对性能要求不高,所以采用注解的方式进行解析,提高编码效率。 权限策略优先级 在权限策略匹配方面,由于支持了模糊的资源匹配模式,可能出现同一个资源对应多个权限策略。因此,需要一套优先级的机制来确定最终使用哪一套权限策略。假设配置了以下授权策略,按照以上优先级资源的匹配情况如下: 认证授权策略 出于安全和性能的权衡和考虑,RocketMQ ACL 2.0 为认证和授权提供了两种策略:无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful)。 无状态认证授权策略(Stateless): 在这种策略下,每个请求都会经过独立的认证和授权过程,不依赖于任何先前的会话和状态信息。这种严格的策略可以保证更高级别的安全保证。对权限进行变更,可以更加实时的反应在随后的请求中,无需任何等待。然而,这种策略在高吞吐的场景中可能会导致显著的性能负担,如增加系统 CPU 的使用率以及请求的耗时。 有状态认证授权策略(Stateful): 在这种策略下,同一个客户端连接,相同资源以及相同的操作下,第一次请求会经过完整的认证和授权,后续请求则不再进行重复认证和授权。这种方法可以有效地降低性能小号,减少请求的耗时,特别适合吞吐量较高的场景。但是,这种策略可能引入了安全上的妥协,对权限的变更也无法做到实时的生效。 在这两者策略的选择上,需要权衡系统的安全性要求和性能需求。如果系统对安全性的要求很高,并且可以容忍一定的性能损耗,那么无状态认证授权策略可能是更好的选择。相反,如果系统需要处理大量的并发请求,且可以在一定程度上放宽安全要求,那么有状态认证授权策略可能更合适。在实际部署时,还应该结合具体的业务场景和安全要求来做出决策。 插件化机制 为了适应未来持续发展的认证鉴权方式,以及满足用户针对特定场景的定制需求,RocketMQ ACL 2.0 在多个环节上提供了灵活性和可扩展性。 认证和授权策略的扩展:默认情况下,RocketMQ ACL 2.0 提供了无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful),以满足绝大多数用户对安全和性能的要求。但是,后续仍然可以探索出更优的策略,来兼顾安全和性能之间的平衡。 认证和授权方式的扩展:当前,在认证方面,市场上已经沉淀了多种成熟的实现,RocketMQ 目前只实现了其中一种,通过插件化的能力进行预留,未来可以轻松的引入更多的认证机制。在授权方面,RocketMQ 基于 ABAC 模型实现了一套主流的授权方式,以适应广泛的用户需求。但也提供了插件化的能力,方便未来能适配出更多贴合未来发展的解决方案。 认证和授权流程的编排:基于责任链设计模式,RocketMQ ACL 2.0 对其默认的认证和授权流程进行了灵活的编排。用户可以扩展或重写这些责任链节点,从而能够定制针对其具体业务场景的认证和授权逻辑。 用户和权限存储的扩展:RocketMQ 默认采用 RocksDB 在 Broker 节点上本地存储用户和权限数据。然而,通过实现预定义的接口,用户可以轻松地将这些数据迁移至任何第三方服务或存储系统中,从而优化其架构设计和操作效率。 审计日志 审计日志,用于记录和监控所有关于认证和授权的访问控制操作。通过升级日志,我们可以追踪到每一个访问的请求,确保系统的可靠性和安全性,同时,它也有助于问题的排查,进行安全的升级和满足合规的要求。RocketMQ ACL 2.0 对认证和授权相关的审计日志都进行了支持,格式如下: 认证日志 ``` 认证成功日志 [AUTHENTICATION] User:rocketmq is authenticated success with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+gg=. 认证失败日志 [AUTHENTICATION] User:rocketmq is authenticated failed with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+xx=. ``` 授权日志 ``` 授权成功日志 [AUTHORIZATION] Subject = User:rocketmq is Allow Action = Pub from sourceIp = 192.168.0.2 on resource = Topic:TPTEST for request = 10. 授权失败日志 [AUTHORIZATION] Subject = User:rocketmq is Deny Action = Sub from sourceIp = 192.168.0.2 on resource = Topic:GIDTEST for request = 10. ``` 配置与使用 部署架构 在部署架构方面,RocketMQ 提供了两种部署形态,分别是存算一体架构和存算分离架构。 存算一体架构 在 RocketMQ 存算一体架构中,Broker 组件同时承担了计算和存储的职责,并对外提供服务,接收所有客户端的访问请求。因此,由 Broker 组件承担认证和授权的重要角色。此外,Broker 组件还负责认证和授权相关的元数据的维护和存储。 存算分离架构 在 RocketMQ 存算分离架构中,存储由 Broker 组件负责,计算由 Proxy 组件负责,所有的对外请求都是由 Proxy 对外进行服务。因此,请求的认证和授权都由 Proxy 组件承担。Broker 承担元数据存储,为 Proxy 组件提供所需的认证和授权元数据的查询和管理服务。 集群配置 认证配置 参数列表 想要在服务端开启认证功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authenticationEnabled = true authenticationProvider = org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider initAuthenticationUser = {"username":"rocketmq","password":"12345678"} innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"} authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider ``` Proxy 配置 ``` { "authenticationEnabled": true, "authenticationProvider": "org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider", "authenticationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthenticationMetadataProvider", "innerClientAuthenticationCredentials": "{\"accessKey\":\"rocketmq\", \"secretKey\":\"12345678\"}" } ``` 授权配置 参数列表 想要在服务端开启授权功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authorizationEnabled = true authorizationProvider = org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider ``` Proxy 配置 ``` { "authorizationEnabled": true, "authorizationProvider": "org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider", "authorizationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthorizationMetadataProvider" } ``` 如何使用 命令行使用 用户管理关于 ACL 用户的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建用户 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq 创建用户,指定用户类型 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq t Super 更新用户 sh mqadmin updateUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p 12345678 删除用户 sh mqadmin deleteUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户详情 sh mqadmin getUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户列表 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster 查询用户列表,带过滤条件 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster f mq ``` ACL 管理关于 ACL 授权的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建授权 sh mqadmin createAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Allow 更新授权 sh mqadmin updateAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Deny 删除授权 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq 删除授权,指定资源 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权列表 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster 查询授权列表,带过滤条件 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权详情 sh mqadmin getAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq ``` 客户端使用 关于 ACL 的使用,ACL 2.0 和 ACL 1.0 的使用方式一样,没有任何区别,具体参考官方案例。 消息发送 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); StaticSessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(TOPICS) .build(); ``` 消息消费 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(CONSUMER_GROUP) .setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression)) .setMessageListener(messageView { return ConsumeResult.SUCCESS; }) .build(); ``` 扩容与迁移 扩容 如果想要在运行过程中的集群扩容一台 Broker,就需要将所有的元数据都同步到这台新的 Broker 上,ACL 2.0 提供了相应的拷贝用户和拷贝授权的接口来支持这项操作。 接口定义 使用案例 ``` 拷贝用户 sh mqadmin copyUser n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 拷贝授权 sh mqadmin copyAcl n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 ``` 迁移 如果已经使用上了 ACL 1.0,想要无缝地迁移至 ACL 2.0,也提供了相应的解决方案,只需要做以下配置即可。 配置定义 在 Broker 的配置文件中开启以下配置: ``` migrateAuthFromV1Enabled = true ``` 特别说明 启用以上配置后,将在 Broker 启动过程中自动触发执行。该迁移功能会把 ACL 1.0 中的用户权限信息写入 ACL 2.0 的相应存储结构中。对于在 ACL 2.0 中尚未存在的用户和权限,系统将自动添加。对于已存在的用户和权限,迁移功能不会进行覆盖,以避免重写 ACL 2.0 中已经进行的任何修改。ACL 1.0 中关于 IP 白名单,由于是用于绕过访问控制的检查,和 ACL 2.0 的行为不匹配,所以不会迁移到 ACL 2.0 中。如果已经使用相关的能力,请完成改造后再做迁移。 规划与总结 规划 关于 RocketMQ ACL 的未来规划,可能会体现在以下两个方面: 丰富的认证和授权扩展:市场上存在丰富的认证和授权解决方案,其他的存储或计算产品也都采用了各种各样的实现方式。为了紧跟行业的发展趋势,RocketMQ ACL 未来也将努力创新,以满足更为广泛和多变的客户需求。同时,也将持续深化研究和发展更加出色的认证和授权策略,以达到安全性和性能之间的理想平衡。 可视化的用户权限操作:当前,在 ACL 中进行用户和权限的配置仅能通过命令行工具,不够友好。未来我们希望能在 RocketMQ Dashboard 上提供一个清晰、易用的可视化管理界面,从而简化配置流程并降低管理的技术门槛。另一方面,现有的 Dashboard 尚未集成 ACL 访问控制体系,后续也要将它纳入进来,以实现用户在 Dashboard 上对各项资源进行操作的访问权限。 总结 RocketMQ ACL 2.0 不管是在模型设计、可扩展性方面,还是安全性和性能方面都进行了全新的升级。旨在能够为用户提供精细化的访问控制,同时,简化权限的配置流程。欢迎大家尝试体验新版本,并应用在生产环境中。非常期待大家的在社区中反馈、讨论,和参与贡献,共同推进 RocketMQ 社区的成长和技术进步。
作者:徒钟
#行业实践 #最佳实践 #功能特性

2023年7月13日

RocketMQ 5.0 无状态实时性消费详解
背景 RocketMQ 5.0版本引入了Proxy模块、无状态pop消费机制和gRPC协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。SimpleConsumer客户端采用了无状态的pop机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。。 在当前的消息产品中,消费普通使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。 然而,在引入Proxy之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和Proxy与Broker内部的长轮询之间互相耦合,也就是说,一次客户端对Proxy的长轮询只对应一次Proxy对Broker的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的Broker时,如果请求到达了没有消息的Broker,就会触发长轮询挂起逻辑。此时,即使另一台Broker存在消息,由于请求挂在了另一个Broker上,也无法拉取到消息。这导致客户端无法实时接收到消息,即false empty response。 这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。 AWS的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少false empty response。 其他产品 在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。 MNS采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的Broker。 首先,在长轮询时间内,会对后端的Broker进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与Broker进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致CPU和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。 然而,上述策略虽以尽可能轮询完所有的Broker为目标,但并不能解决所有问题。当轮询时间较短或Broker数量较多时,无法轮询完所有的Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该Broker上就绪,导致无法及时取回该消息。 解法 技术方案 首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。 为了解决该问题,关键在于将前端的客户端长轮询和后端的Broker长轮询解耦,并赋予Proxy感知后端消息个数的能力,使其能够优先选择有消息的Broker,避免false empty response。 考虑到Pop消费本身的无状态属性,期望设计方案的逻辑与Pop一致,而不在代理中引入额外的状态来处理该问题。 另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。 1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的Broker长轮询解耦开来,并赋予Proxy感知后端消息个数的能力,能够优先选择有消息的Broker,避免false empty response。 2. 由于Pop消费本身的无状态属性,因此期望该方案的设计逻辑和Pop一致,而不在Proxy引入额外的状态来处理这个事情。 3. Simplicity is ALL,因此期望这个方案简单可靠。 我们使用了NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导Proxy侧的消息拉取。 通过重构NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。 pop with notify 一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。 通知任务的目的是获取Broker是否存在可消费的消息,对应的是Notification请求;而请求任务的目的是消费Broker上的消息,对应的是Pop请求。 首先,长轮询任务会执行一次Pop请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个Broker提交一个异步通知任务。 在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的Broker存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。 消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该Broker存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。 消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。 如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。 自适应切换 考虑到当请求较多时,无需采用pop with notify机制,可使用原先的pop长轮询broker方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前Proxy统计的pop请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用pop with notify;反之则使用pop长轮询。 由于上述方案基于的均为单机视角,因此当消费请求在proxy侧不均衡时,可能会导致判断条件结果有所偏差。 Metric 为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组metric指标,来记录并观测实时长轮询的表现和损耗。 1. 客户端发起的长轮询次数 (is_long_polling) 2. pop with notify次数 (通过现有rpc metric统计) 3. 首次pop请求命中消息次数 (未触发notify) (is_short_polling_hit) 总结 通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免false empty response,保证消费的实时性,同时,相较于原先PushConsumer的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销, 产品侧 需明确长轮询和短轮询的区分,可以参考AWS的定义,当轮询时间大于0时,长轮询生效。 且需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS的最小值采取了1s,我们是否需要follow,还是采取一个更大的值。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
#技术探索 #功能特性 #云原生

2022年11月25日

RocketMQ 的消费者类型详解与最佳实践
在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。 消费者类型概览 本篇文章也会根据不同的消费者类型来进行讲述。在介绍不同的消息类型之前,先明确一下不同 RocketMQ 消费者中的一个通用工作流程:在消费者中,到达客户端的消息都是由客户端主动向服务端请求并挂起长轮询获得的。为了保证消息到达的及时性,客户端需要不断地向服务端发起请求(请求是否需要由客户端主动发起则与具体的客户端类型有关),而新的符合条件的消息一旦到达服务端,就会客户端请求走。最终根据客户端处理的结果不同,服务端对消息的处理结果进行记录。 另外 PushConsumer 和 SimpleConsumer 中还会有一个 ConsumerGroup 的概念,ConsumerGroup 相当于是一组相同订阅关系的消费者的共同身份标识。而服务端也会根据 ConsumerGroup 来记录对应的消费进度。同一个 ConsumerGroup 下的消息消费者将共同消费符合当前订阅组要求的所有消息,而不是独立进行消费。相比较于 PullConsumer,PushConsumer 和 SimpleConsumer 更加适用于业务集成的场景,由服务端来托管消费状态和进度,相对来说更加的轻量与简单。 简单来说: PushConsumer :全托管的消费者类型,用户只需要注册消息监听器即可,符合对应订阅关系的消息就会调用对应的消费方法,是与业务集成最为普遍的消费者类型。 SimpleConsumer:解耦消息消费与进度同步的消费者类型,用户自主接受来自服务端的消息,并对单条消息进行消息确认。和 PushConsumer 一样也由服务端托管消费进度,适用于用户需要自主控制消费速率的业务场景。 PullConsumer:使用流处理框架进行管理的消费者类型,用户按照队列(Topic 的最小逻辑组成单位)来进行消息的接收并可以选择自动或者手动提交消费位点。 PushConsumer PushConsumer 是 RocketMQ 目前使用最为广泛的消费者。用户只需要确认好订阅关系之后,注册相对应的 Listener 即可。符合对应订阅关系的消息在由 Producer 发出后,消费者的 Listener 接口也会被即时调用,那么此时用户需要在 Listener 中去实现对应的业务逻辑。 使用简介 以下是 Push 消费者的使用示例: PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // set the consumer group name. .setConsumerGroup(consumerGroup) // set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // handle the received message and return consume result. LOGGER.info("consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // close the push consumer when you don't need it anymore. pushConsumer.close(); 用户需要根据自己业务处理结果的不同来返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。当用户返回 ConsumeResult.SUCCESS时,消息则被视为消费成功;当用户返回 ConsumeResult.FAILURE时,则服务端视为消费失败,会进行该条消息的退避重试,消息的退避重试是指,在消息被消费成功之前,当前消息会被多次投递到用户注册的 MessageListener 中直到消费成功,而两次消费之间的时间间隔则是符合退避规律的。 特别的,每个 ConsumerGroup 都会有一个最大消费次数的设置,如果当前消息的消费次数超过了这个设置,则消息不会再被投递,转而被投递进入死信队列。这个消费次数在消息每次被投递到 MessageListener 时都会进行自增。譬如:如果消息的最大消费次数为 1,那么无论对于这条消息,当前是被返回消费成功还是消费失败,都只会被消费这一次。 应用场景与最佳实践 PushConsumer 是一种近乎全托管的消费者,这里的托管的含义在于用户本身并不需要关心消息的接收,而只需要关注消息的消费过程,除此之外的所有逻辑都在 Push 消费者的实现中封装掉了,用户只需要根据每条收到的消息返回不同的消费结果即可,因此也是最为普适的消费者类型。 MessageListener 是针对单条消息设计的监听器接口: / MessageListener is used only for the push consumer to process message consumption synchronously. Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the backend thread pool to consumer message concurrently. / public interface MessageListener { / The callback interface to consume the message. You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}. The consumption is successful only when {@link ConsumeResultSUCCESS } is returned, null pointer is returned or exception is thrown would cause message consumption failure too. / ConsumeResult consume(MessageView messageView); } 绝大多数场景下,使用方应该快速处理消费逻辑并返回消费成功,不宜长时间阻塞消费逻辑。对于消费逻辑比较重的情形,建议可以先行提交消费状态,然后对消息进行异步处理。 实际在 Push 消费者的实现中,为了保证消息消费的及时性,消息是会被预先拉取客户端再进行后续的消费的,因此在客户端中存在对已拉取消息大小的缓存。为了防止缓存的消息过多导致客户端内存泄漏,也提前预留了客户端参数供使用者自行进行设置。 // 设置本地最大缓存消息数目为 16 条 pushConsumer.setMaxCachedMessageCount(16); // 设置本地最大缓存消息占用内存大小为 128 MB pushConsumer.setMaxCachedMessageSizeInBytes(128 1024 1024); SimpleConsumer 相比较 PushConsumer,SimpleConsumer 则暴露了更多的细节给使用者。在 SimpleConsumer 中,用户将自行控制消息的接收与处理。 使用简介 以下是 SimpleConsumer 的使用示例: SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); final List messages = consumer.receive(maxMessageNum, invisibleDuration); LOGGER.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); 在 SimpleConsumer 中用户需要自行进行消息的拉取,这一动作通过 SimpleConsumerreceive 这个接口进行,然后再根据自己业务逻辑处理结果的不同再对拉取到的消息进行不同的处理。SimpleConsumerreceive 也是通过长轮询来接受来自服务端的消息,具体的长轮询时间可以使用 SimpleConsumerBuildersetAwaitDuration 来进行设置。 在 SimpleConsumer 中,用户需要通过 SimpleConsumerreceive 设置一个消息不重复的时间窗口(或者说关于通过这个接口收到的消息的一个不可见时间窗口),这个时间窗口从用户接受到这条消息开始计时,在这段时间之内消息是不会重复投递到消费者的,而超出这个时间窗口之后,则会对这条消息进行再一次的投递。在这个过程中,消息的消费次数也会进行递增。与 PushConsumer 类似的是,一旦消费次数超出 ConsumerGroup 的最大次数,也就不会进行重投了。 相比较于 PushConsumer 而言,SimpleConsumer 用户可以自主控制接受消息的节奏。SimpleConsumerreceive 会针对于当前的订阅关系去服务端拉取符合条件的消息。SimpleConsumer 实际上的每次消息接收请求是按照具体 Topic 的分区来 one by one 发起请求的,实际的 Topic 分区可能会比较多,因此为了保证消息接收的及时性,建议综合自己的业务处理能力一定程度上提高 SimpleConsumerreceive 的并发度。 用户在接受到消息之后,可以选择对消息使用 ack 或者 changeInvisibleDuration,前者即对服务端表示对这条消息的确认,与 PushConsumer 中的消费成功类似,而 changeInvisibleDuration 则表示延迟当前消息的可见时间,即需要服务端在当前一段时间之后再向客户端进行投递。值得注意的是,这里消息的再次投递也是需要遵循 ConsumerGroup 的最大消费次数的限制,即一旦消息的最大消费次数超出了最大消费次数(每次消息到达可见时间都会进行消费次数的自增),则不再进行投递,转而进入死信队列。举例来说: 进行 ack,即表示消息消费成功被确认,消费进度被服务端同步。 进行 changeInvisibleDuration, 1)如果消息已经超过当前 ConsumerGroup 的最大消费次数,那么消息后续会被投递进入死信队列 2)如果消息未超过当前 ConsumerGroup 的最大消费次数,若请求在上一次消息可见时间到来之前发起,则修改成功,否则则修改失败。 应用场景与最佳实践 在 PushConsumer 中,消息是单条地被投递进入 MessageListener来处理的,而在 SimpleConsumer 中用户可以同时拿到一批消息,每批消息的最大条数也由 SimpleConsumerreceive 来决定。在一些 IO 密集型的应用中,会是一个更加方便的选择。此时用户可以每次拿到一批消息并集中进行处理从而提高消费速度。 PullConsumer PullConsumer 也是 RocketMQ 一直以来都支持的消费者类型,RocketMQ 5.0 中全新的 PullConsumer API 还在演进中,敬请期待。下文中的 PullConsumer 会使用 4.0 中现存的 LitePullConsumer 进行论述,也是当前推荐的方式。 使用简介 现存的 LitePullConsumer 中的主要接口 // PullConsumer 中的主要接口 public interface LitePullConsumer { // 注册路由变化监听器 void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; // 将队列 assign 给当前消费者 void assign(Collection messageQueues); // 针对当前 assigned 的队列获取消息 List poll(long timeout); // 查找当前队列在服务端提交的位点 Long committed(MessageQueue messageQueue) throws MQClientException; // 设置是否自动提交队列位点 void setAutoCommit(boolean autoCommit); // 同步提交队列位点 void commitSync(); } 在 RocketMQ 中,无论是消息的发送还是接收,都是通过队列来进行的,一个 Topic 由若干个队列组成,消息本身也是按照队列的形式来一个个进行存储的,同一个队列中的消息拥有不同的位点,且位点的大小是随随消息达到服务端的时间逐次递增的,本质上不同 ConsumerGroup 在服务端的消费进度就是一个个队列中的位点信息,客户端将自己的消费进度同步给服务端本质上其实就是在同步一个个消息的位点。 在 PullConsumer 中将队列这个概念完整地暴露给了用户。用户可以针对自己关心的 topic 设置路由监听器从而感知队列的变化,并将队列 assign 给当前消费者,当用户使用 LitePullConsumerpoll 时会尝试获取已经 assign 好了的队列中的消息。如果设置了 LitePullConsumersetAutoCommit 的话,一旦消息达到了客户端就会自动进行位点的提交,否则则需要使用 LitePullConsumercommitSync 接口来进行手动提交。 应用场景与最佳实践 PullConsumer 中用户拥有对消息位点管理的绝对自主权,可以自行管理消费进度,这是与 PushConsumer 和 SimpleConsumer 最为本质的不同,这也使得 PullConsumer 在流计算这种需要同时自主控制消费速率和消费进度的场景能得到非常广泛的应用。更多情况下,PullConsumer 是与具体的流计算框架进行集成的。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:凌楚
#行业实践 #功能特性

2022年10月31日

RocketMQ 重试机制详解及最佳实践
引言 本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。 RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。 生产者发送重试 RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢? 1. 生产者重试次数 RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。  2. 生产者重试间隔 在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略: 非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔。 流控错误场景:系统会按照预设的指数退避策略进行延迟重试。 为什么要引入退避和随机抖动?  如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connectionbackoff.md。 INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒; MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6; JITTER :随机抖动因子,默认值:0.2; MAX_BACKOFF :等待间隔时间上限,默认值:120 秒; MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。 ConnectWithBackoff() current_backoff = INITIAL_BACKOFF current_deadline = now() + INITIAL_BACKOFF while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff MULTIPLIER, MAX_BACKOFF) current_deadline = now() + current_backoff + UniformRandom(JITTER current_backoff, JITTER current_backoff) 特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。 3. 重试带来的副作用 不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。  较多的重试次数也会增大服务端的处理压力。  4. 用户的最佳实践是什么 1)合理设置发送超时时间,发送的最大次数 发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。 2)如何保证发送消息不丢失 由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种: 1. 向调用方返回业务处理失败; 2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。  3)关注流控异常导致无法重试 触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。 4)早期版本客户端如何使用故障延迟机制进行发送重试? 对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用: producer.setSendLatencyFaultEnable(true) 配置重试次数使用: producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed() 消费者消费重试 消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢? RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题: 如何保证业务完整处理消息? 消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。 业务应用异常时处理中的消息状态如何恢复? 当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。 1. 什么是消费重试? 什么时候认为消费失败? 消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败。  消费重试是什么? 消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;  重试过程状态机:消息在重试流程中的状态和变化逻辑;  重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;  最大重试次数:消息可被重试消费的最大次数。   2. 消息重试的场景 需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。 推荐使用场景: 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功; 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。   正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。 错误使用场景: 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。  反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。 消费处理中使用消费失败来做处理速率限流,是不合理的。 限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。 这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面: RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力; 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。  3. 不要以重试替代限流 上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。 如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。 RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置订阅组名称 .setConsumerGroup(consumerGroup) // 设置订阅的过滤器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // 阻塞直到获得一个令牌,也可以配置一个超时时间 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); 4. PushConsumer 消费重试策略 PushConsumer 消费消息时,消息的几个主要状态如下: Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费; Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态; Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机; DLQ:死信状态 消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。 该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。 最大重试次数   PushConsumer 的最大重试次数由创建时决定。 例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。 重试间隔时间 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下: 说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。 顺序消息:重试间隔为固定时间,默认为 3 秒。  5. SimpleConsumer 消费重试策略 和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。 例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。 修改消息不可见时间需要满足以下条件: 消息处理未超时 消息处理未提交消费状态  如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。 最大重试次数 与 PushConsumer 相同。 消息重试间隔   消息重试间隔 = 不可见时间 - 消息实际处理时长 例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。 SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。 ClientServiceProvider provider1 = ClientServiceProvider.loadService(); String topic1 = "Your Topic"; FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG); SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder() //设置消费者分组。 .setConsumerGroup("Your ConsumerGroup") //设置接入点。 .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build()) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); List messageViewList = null; try { //SimpleConsumer需要主动获取消息,并处理。 messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 //没有ack会被认为消费失败 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); } 修改消息的不可见时间   案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。 simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync(); 6. 功能约束与最佳实践 设置消费的最大超时时间和次数   尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。 不要用重试机制来进行业务限流  错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计  正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。 总结 本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。 重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者: 斜阳
#行业实践 #功能特性

2022年10月20日

解析 RocketMQ 多样消费功能-消息过滤
什么是消息过滤 在消息中间件的使用过程中,一个主题对应的消费者想要通过规则只消费这个主题下具备某些特征的消息,过滤掉自己不关心的消息,这个功能就叫消息过滤。 就如上图所描述的,生产者会向主题中写入形形色色的消息,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要消费橙色的消息,第二个消费者只想要消费黄色的和灰色的消息,那么这个效果就需要通过消息过滤来实现。 消息过滤的应用场景 我们以常见的电商场景为例,来看看消息过滤在实际应用过程中起到的作用。 电商平台在设计时,往往存在系统拆分细、功能模块多、调用链路长、系统依赖复杂等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特别是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。 而在消息中间件使用方面,电商平台因为覆盖的领域众多会产生很多的消息主题,消息收发量也随着交易量和订阅系统的增加而增大。随着业务系统的水平拆解和垂直增加,相关的消息呈现出高订阅比和低投递比的状态,比如一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,但是投递比却只有 15:300,即一条消息只有 15 个订阅者需要投递,其他 285 个订阅者全部过滤了这条消息。那解决这些场景,就需要使用到消息过滤。 举例来说,在交易链路中,一个订单的处理流程分为下单、扣减库存、支付等流程,这个流程会涉及订单操作和状态机的变化。下游的系统,如积分、物流、通知、实时计算等,他们会通过消息中间件监听订单的变更消息。但是它们对订单不同操作和状态的消息有着不同的需求,如积分系统只关心下单消息,只要下单就扣减积分。物流系统只关系支付和收货消息,支付就发起物流订单,收货就完成物流订单。实时计算系统会统计订单不同状态的数据,所有消息都要接收。 试想一下如果没有消息过滤这个功能,我们会怎么支持以上消息过滤的功能呢?能想到的一般有以下两个方案: 1. 通过将主题进行拆分,将不同的消息发送到不同主题上。 对于生产者来说,这意味着消费者有多少消费场景,就需要新建多少个 Topic,这无疑会给生产者带来巨大的维护成本。对消费者来说,消费者有可能需要同时订阅多个 Topic,这同样带来了很大的维护成本。另外,消息被主题拆分后,他们之间的消费顺序就无法保证了,比如对于一个订单,它的下单、支付等操作显然是要被顺序处理的。 2. 消费者收到消息后,根据消息体对消息按照规则硬编码自行过滤。 这意味着所有的消息都会推送到消费者端进行计算,这无疑增加了网络带宽,也增加了消费者在内存和 CPU 上的消耗。 有了消息过滤这个功能,生产者只需向一个主题进行投递消息,服务端根据订阅规则进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码维护就非常友好,同时也能很大程度上降低网络带宽,同时减少消费者的内存占用和 CPU 的消耗。 RocketMQ 消息过滤的模式 RocketMQ 是众多消息中间件中为数不多支持消息过滤的系统。这也是其作为业务集成消息首选方案的重要基础之一。 在功能层面,RocketMQ 支持两种过滤方式,Tag 标签过滤和 SQL 属性过滤,下面我来这两个过滤方式使用方式和技术原理进行介绍 Tag 标签过滤 功能介绍 Tag 标签过滤方式是 RocketMQ 提供的基础消息过滤能力,基于生产者为消息设置的 Tag 标签进行匹配。生产者在发送消息时,设置消息的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。 过滤语法 1. 单 Tag 匹配:过滤表达式为目标 Tag,表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者; 2. 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费; 3. 全 Tag 匹配:使用星号()作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。 使用方式 1. 发送消息,设置 Tag 标签 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息 .setTag("TagA") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个 Tag 标签 //只订阅消息标签为“TagA”的消息 FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个 Tag 标签 //只订阅消息标签为“TagA”、“TagB”或“TagC”的消息 FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有 Tag 标签,即不过滤 //使用Tag标签过滤消息,订阅所有消息 FilterExpression filterExpression = new FilterExpression("", FilterExpressionType.TAG); pushConsumer.subscribe("TopicA", filterExpression); 技术原理 RocketMQ 在存储消息的时候,是通过 AppendOnly 的方式将所有主题的消息都写在同一个 CommitLog 文件中,这可以有效的提升了消息的写入速率。为了消费时能够快速检索消息,它会在后台启动异步方式将消息所在位点、消息的大小,以及消息的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行消息过滤的时候,可以在索引层面就可以获取到消息的标签,不需要从 CommitLog 文件中读取,这样就减少消息读取产生的系统 IO 和内存开销。标签存储哈希值,主要是为了保证 ConsumeQueue 索引文件能够定长处理,这样可以有效较少存储空间,提升这个索引文件的读取效率。 整个 Tag 标签过滤的流程如下: 1. 生产者对消息打上自己的业务标签,发送给我们的服务端 Broker; 2. Broker 将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中; 3. 消费者启动后,定时向 Broker 发送心跳请求,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保存在内存中; 4. 消费者向 Broker 拉取消息,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索消息,将订阅关系中的标签哈希值和消息中的标签哈希值做比较,如果匹配就返回给消费者; 5. 消费者收到消息后,会将消息中的标签值和本地订阅关系中标签值做精确匹配,匹配成功才会交给消费线程进行消费。 SQL 属性过滤 功能介绍 SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式,通过生产者为消息设置的属性(Key)及属性值(Value)进行匹配。生产者在发送消息时可设置多个属性,消费者订阅时可设置S QL 语法的过滤表达式过滤多个属性。 过滤语法 1. 数值比较:, =, , IN 3. 判空运算:IS NULL or IS NOT NULL 4. 逻辑运算:AND, OR, NOT 使用方式 1. 发送消息,设置属性 Message message = provider.newMessageBuilder() .setTopic("TopicA") .setKeys("messageKey") //设置消息属性,用于消费端根据指定属性过滤消息。 .addProperty("Channel", "TaoBao") .addProperty("Price", "5999") .setBody("messageBody".getBytes()) .build(); 2. 订阅消息,匹配单个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 3. 订阅消息,匹配多个属性 FilterExpression filterExpression = new FilterExpression("Channel='TaoBao' AND Price5000", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 4. 订阅消息,匹配所有属性 FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92); pushConsumer.subscribe("TopicA", filterExpression); 技术原理   由于 SQL 过滤需要将消息的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 增加很大的开销。为了降低这个开销,RocketMQ 采用了布隆过滤器进行优化。当 Broker 在收到消息后,会预先对所有的订阅者进行 SQL 匹配,并将匹配结果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩展文件中。在消费时,Broker 就会使用使用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这可以避免消息在一定不匹配的时候,不需要去 CommitLog 中将消息的属性拉取到内存进行计算,可以有效地降低属性和 SQL 进行匹配的消息量,减少服务端的内存和 CPU 开销。 整个 SQL 过滤的处理流程如下: 1. 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数;  2. 生产者对消息设置上自己的业务属性,发送给我们的服务端 Broker;  3. Broker 收到后将消息写入 CommitLog 中,然后通过异步线程将消息分发到 ConsumeQueue 索引文件中。在写入之前,会将这条消息的属性和当前所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个完整的布隆过滤位图;  4. 消费者消费消息的时候,Broker 会先获取预先生成的布隆过滤匹配参数,然后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配;  5. 布隆过滤器返回匹配成功只能说明消息属性和 SQL 可能匹配,Broker 还需要从 CommitLog 中将消息属性取出来,再做一次和 SQL 的精确匹配,这个时候匹配成功才会将消息投递给消费者  差异及对比 最佳实践 主题划分及消息定义 主题和消息背后的本质其实就是业务实体的属性、行为或状态发生了变化。只有发生了变化,生产者才会往主题里面发送消息,消费者才需要监听这些的消息,去完成自身的业务逻辑。 那么如何做好主题划分和消息定义呢,我们以订单实体为例,来看看主题划分和消息定义的原则。 主题划分的原则   1. 业务领域是否一致 不同的业务领域背后有不同的业务实体,其属性、行为及状态的定义天差地别。比如商品和订单,他们属于两个完全独立且不同的领域,就不能定义成同一个主题。 2. 业务场景是否一致 同一个业务领域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,但是订单缓存刷新可能需要被不同的流程触发,放在一起就会导致部分场景订单缓存不刷新的情况。 3. 消息类型是否一致 同一个业务领域和业务场景,对消息类型有不同需求,比如订单处理过程中,我们需要发送一个事务消息,同时也需要发送一个定时消息,那么这两个消息就不能共用一个主题。 消息定义的原则   1. 无标签无属性 对于业务实体极其简单的消息,是可以不需要定义标签和属性,比如 MySQLBinlog 的同步。所有的消费者都没有消息过滤需求的,也无需定义标签和属性。 2. 如何定义标签 标签过滤是 RocketMQ 中使用最简单,且过滤性能最好的一种过滤方式。为了发挥其巨大的优势,可以考虑优先使用。在使用时,我们需要确认这个字段在业务实体和业务流程中是否是唯一定义的,并且它是被绝大多数消费者作为过滤条件的,那么可以将它作为标签来定义。比如订单中有下单渠道和订单操作这两个字段,并且在单次消息发送过程中都是唯一定义,但是订单操作被绝大多数消费者应用为过滤条件,那么它最合适作为标签。 3. 如何定义属性 属性过滤的开销相对比较大,所以只有在标签过滤无法满足时,才推荐使用。比如标签已经被其他字段占用,或者过滤条件不可枚举,需要支持多属性复杂逻辑的过滤,就只能使用属性过滤了。 保持订阅关系一致 订阅关系一致是指同一个消费者组下面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。 正如上图所示,一个消费者组包含两个消费者,他们同时订阅了 TopicA 这个主题,但是消费者一订阅的是 TagA 这个标签的消息,消费者二订阅的是 TagB 这个标签的消息,那么他们两者的订阅关系就存在不一致。 导致的问题: 那么订阅关系不一致会导致什么问题呢? 1. 频繁复杂均衡 在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变化了就进行订阅关系覆盖,同时会触发客户端进行负载均衡。那么订阅关系不一致的两个客户端会交叉上传自己的订阅关系,从而导致客户端频繁进行负载均衡。 2. 消费速率下降 客户端触发了负载均衡,会导致消费者所持有的消费队列发生变化,出现间断性暂停消息拉取,导致整体消费速率下降,甚至出现消息积压。 3. 消息重复消费 客户端触发了负载均衡,会导致已经消费成功的消息因为消费队列发生变化而放弃向 Broker 提交消费位点。Broker 会认为这条消息没有消费成功而重新向消费者发起投递,从而导致消息重复消费。 4. 消息未消费 订阅关系的不一致,会有两种场景会导致消息未消费。第一种是消费者的订阅关系和 Broker 当前订阅关系不一致,导致消息在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 当前的虽然一致,但是 Broker 投递给了其他的消费者,被其他消费者本地过滤了。 使用的建议 在消息过滤使用中,有以下建议: 1. 不要共用消费者组 不同业务系统千万不要使用同一个消费者组订阅同一个主题的消息。一般不同业务系统由不同团队维护,很容易发生一个团队修改了订阅关系而没有通知到其他团队,从而导致订阅关系不一致的情况。 2. 不频繁变更订阅关系 频繁变更订阅关系这种情况比较少,但也存在部分用户实现在线规则或者动态参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载均衡的情况。 3. 变更做好风险评估 由于业务的发展,需求的变更,订阅关系不可能一直不变,但是变更订阅关系过程中,需要考虑整体发布完成需要的总体时间,以及发布过程中订阅关系不一致而对业务可能带来的风险。 4. 消费做好幂等处理 不管是订阅关系不一致,还是客户端上下线,都会导致消息的重复投递,所以消息幂等处理永远是消息消费的黄金法则。在业务逻辑中,消费者需要保证对已经处理过的消息直接返回成功,避免二次消费对业务造成的损害,如果返回失败就会导致消息一直重复投递直到进死信。 到此,本文关于消息过滤的分享就到此结束了,非常感谢大家能够花费宝贵的时间阅读,有不对的地方麻烦指正,感谢大家对 RocketMQ 的关注。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:徒钟
#技术探索 #功能特性

2022年8月18日

解析 RocketMQ 业务消息--顺序消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。 简介 顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。 功能原理 在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。 顺序发送 在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。 同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。  因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。 顺序消费 与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。 小结 无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。 我们回到开头那个问题: 既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢? 现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。 下述是一个表格,简要对比了顺序消息和普通消息。 最佳实践 合理设置 MessageGroup MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。 同步发送和发送重试 如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。 消费幂等 消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。 应用案例 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。   电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。   实战 发送 可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下: public class ProducerFifoMessageExample { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class); private ProducerFifoMessageExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String topic = "yourFifoTopic"; final Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before // message publishing. .setTopics(topic) // May throw {@link ClientException} if the producer is not initialized. .build(); // Define your message body. byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8); String tag = "yourMessageTagA"; final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey1ff69ada8e0e") // Message group decides the message delivery order. .setMessageGroup("youMessageGroup0") .setBody(body) .build(); try { final SendReceipt sendReceipt = producer.send(message); LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (Throwable t) { LOGGER.error("Failed to send message", t); } // Close the producer when you don't need it anymore. producer.close(); } } 消费 消费的代码如下: public class SimpleConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } public static void main(String[] args) throws ClientException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); // Credential provider is optional for client configuration. String accessKey = "yourAccessKey"; String secretKey = "yourSecretKey"; SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey); String endpoints = "foobar.com:8080"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); String consumerGroup = "yourConsumerGroup"; Duration awaitDuration = Duration.ofSeconds(30); String tag = "yourMessageTagA"; String topic = "yourTopic"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(5); final List messages = consumer.receive(maxMessageNum, invisibleDuration); for (MessageView message : messages) { try { consumer.ack(message); } catch (Throwable t) { LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); } } 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:绍舒
#技术探索 #功能特性

2022年8月3日

解析 RocketMQ 业务消息——事务消息
引言:在分布式系统调用场景中存在这样一个通用问题,即在执行一个核心业务逻辑的同时,还需要调用多个下游做业务处理,而且要求多个下游业务和当前核心业务必须同时成功或者同时失败,进而避免部分成功和失败的不一致情况出现。简单来说,消息队列中的“事务”,主要解决的是消息生产者和消费者的数据一致性问题。本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。 点击下方链接,查看视频讲解: 场景:为什么需要事务消息 以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括: 主分支订单系统状态更新:由未支付变更为支付成功; 物流系统状态新增:新增待发货物流记录,创建订单物流记录; 积分系统状态变更:变更用户积分,更新用户积分表; 购物车系统状态变更:清空购物车,更新用户购物车记录。  分布式系统调用的特点是:一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。 传统 XA 事务方案:性能不足 为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务,基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。 基于普通消息方案:一致性保障困难 将上述基于 XA 事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。 该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如: 消息发送成功,订单没有执行成功,需要回滚整个事务; 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致; 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。 基于RocketMQ分布式事务消息:支持最终一致性 上述普通消息方案中,普通消息和订单事务无法保证一致的本质原因是普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于消息队列 RocketMQ 版实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。 消息队列 RocketMQ 版事务消息的方案,具备高性能、可扩展、业务开发简单的优势。 基本原理 概念介绍 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;  半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;  消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。 事务消息生命周期 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态;  事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;  消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;  提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;  消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试;  消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。  消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。 事务消息基本流程 事务消息交互流程如下图所示: 1. 生产者将消息发送至 RocketMQ 服务端;  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息;  3. 生产者开始执行本地事务逻辑;  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下: 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者; 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;   6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。 实现细节:RocketMQ 事务消息如何实现 根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。 处理 Half 消息 发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图: 具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。 Commit 或 Rollback 命令处理 发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图: Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费;  Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。  具体实现在 EndTransactionProcessor 中。 事务消息 check 如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图: 事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。 另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。 具体实现参考: TransactionalMessageServiceImplcheck 方法。 实战:使用事务消息 了解了 RocketMQ 事务消息的原理后,我们看下如何使用事务。首先,我们需要创建一个 “事务消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 事务消息相比普通消息发送时需要修改以下几点: 发送事务消息前,需要开启事务并关联本地的事务执行。 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。  当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意: 1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟; 2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端; 3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。  活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:合伯
#技术探索 #功能特性

2022年7月27日

RocketMQ 消息集成:多类型业务消息——定时消息
引言 Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了 100% 阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。 本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。 点击下方链接,查看直播讲解: 概念:什么是定时消息 在业务消息集成场景中,定时消息是,生产者将一条消息发送到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间,消费者才可以消费到。 相似地,延迟消息其实是对于定时消息的另外一种解释,指的是生产者期望消息延迟一定时间,消费者才可以消费到。可以理解为定时到当前时间加上一定的延迟时间。 对比一下定时消息和普通消息的流程。普通消息,可以粗略的分为消息发送,消息存储和消息消费三个过程。当一条消息发送到 Topic 之后,那么这条消息就可以马上处于等待消费者消费的状态了。 而对于定时/延时消息来说,其可以理解为在普通消息的基础上叠加了定时投递到消费者的特性。生产者发送了一条定时消息之后,消息并不会马上进入用户真正的Topic里面,而是会被 RocketMQ 暂存到一个系统 Topic 里面,当到了设定的时间之后,RocketMQ 才会将这条消息投递到真正的 Topic 里面,让消费者可以消费到。 场景:为什么需要使用定时消息 在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。往往这类定时事件触发都会存在以下诉求: 高性能吞吐:需要大量事件触发,不能有性能瓶颈。 高可靠可重试:不能丢失事件触发。 分布式可扩展:定时调度不能是单机系统,需要能够均衡的调度到多个服务负载。  传统的定时调度方案,往往基于数据库的任务表扫描机制来实现。大概的思路就是将需要定时触发的任务放到数据库,然后微服务应用定时触发扫描数据库的操作,实现任务捞取处理。 这类方案虽然可以实现定时调度,但往往存在很多不足之处: 重复扫描:在分布式微服务架构下,每个微服务节点都需要去扫描数据库,带来大量冗余的任务处理,需要做去重处理。 定时间隔不准确:基于定时扫描的机制无法实现任意时间精度的延时调度。 横向扩展性差:为规避重复扫描的问题,数据库扫表的方案里往往会按照服务节点拆分表,但每个数据表只能被单节点处理,这样会产生性能瓶颈。 在这类定时调度类场景中,使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。消息队列 RocketMQ 版的定时消息具有高并发和水平扩展的能力。 案例:使用定时消息实现金融支付超时需求 利用定时消息可以实现在一定的时间之后才进行某些操作而业务系统不用管理定时的状态。下面介绍一个典型的案例场景:金融支付超时。现在有一个订单系统,希望在用户下单 30 分钟后检查用户的订单状态,如果用户还没有支付,那么就自动取消这笔订单。 基于 RocketMQ 定时消息,我们可以在用户下单之后发送一条定时到 30 分钟之后的定时消息。同时,我们可以使用将订单 ID 设置为 MessageKey。当 30 分钟之后,订单系统收到消息之后,就可以通过订单 ID 检查订单的状态。如果用户超时未支付,那么就自动的将这笔订单关闭。 原理:RocketMQ 定时消息如何实现 固定间隔定时消息 如前文介绍,定时消息的核心是如何在特定的时间把处于系统定时 Topic 里面的消息转移到用户的 Topic 里面去。 Apache RocketMQ 4.x 的版本的定时消息是先将定时消息放到按照 DelayLevel 放到 SCHEDULE_TOPIC_XXXX 这个系统的不同 Queue 里面,然后为每一个 Queue 启动一个定时任务,定时的拉取消息并将到了时间的消息转投到用户的 Topic 里面去。这样虽然实现简单,但也导致只能支持特定 DelayLevel 的定时消息。 当下,支持定时到任意秒级时间的定时消息的实现的 pr 提出到了社区,下面简单的介绍一下其基本的实现原理。 时间轮算法 在介绍具体的实现原理之前,先介绍一下经典的时间轮算法,这是定时消息实现的核心算法。 如上所示,这是一个一圈定时为 7 秒的时间轮,定时的最小精度的为秒。同时,时间轮上面会有一个指向当前时间的指针,其会定时的移向下一个刻度。 现在我们想定时到 1 秒以后,那么就将数据放到 “1” 这个刻度里面,同时如果有多个数据需要定时到同一个时间, 那么会以链表的方式添加到后面。当时间轮转到 “1” 这个刻度之后,就会将其读取并从链表出队。那如果想定到超过时间轮一圈的时间怎么处理呢?例如我们想定时到 14 秒,由于一圈的时间是 7 秒,那么我们将其放在“6”这个刻度里面。当第一次时间轮转到“6” 时,发现当前时间小于期望的时间,那么忽略这条数据。当第二次时间轮转到“6”时,这个时候就会发现已经到了我们期望的 14 秒了。 任意秒级定时消息 在 RocketMQ 中,使用 TimerWheel 对于时间轮进行描述和存储,同时使用一个 AppendOnly 的 TimerLog 记录时间轮上面每一个刻度所对应的所有的消息。 TimerLog 记录了一条定时消息的一些重要的元数据,用于后面定时的时间到了之后,将消息转移到用户的 Topic 里面去。其中几个重要的属性如下: 对于 TimerWheel 来说,可以抽象的认为是一个定长的数组,数组中的每一格代表时间轮上面的一个“刻度”。TimerWheel 的一个“刻度”拥有以下属性。 TimerWheel 和 TimerLog 直接的关系如下图所示: TimerWheel 中的每一格代表着一个时间刻度,同时会有一个 firstPos 指向这个刻度下所有定时消息的首条 TimerLog 记录的地址,一个 lastPos 指向这个刻度下所有定时消息最后一条 TimerLog 的记录的地址。并且,对于所处于同一个刻度的的消息,其 TimerLog 会通过 prevPos 串联成一个链表。 当需要新增一条记录的时候,例如现在我们要新增一个 “14”。那么就将新记录的 prevPos 指向当前的 lastPos,即 “13”,然后修改 lastPos 指向 “14”。这样就将同一个刻度上面的 TimerLog 记录全都串起来了。 有了 TimerWheel 和 TimerLog 之后,我们再来看一下一条定时消息从发送到 RocketMQ 之后是怎么最终投递给用户的。 首先,当发现用户发送的是一个定时消息过后,RocketMQ 实际上会将这条消息发送到一个专门用于处理定时消息的系统 Topic 里面去 然后在 TimerMessageStore 中会有五个 Service 进行分工合作,但整体可以分为两个阶段:入时间轮和出时间轮 对于入时间轮: TimerEnqueueGetService 负责从系统定时 Topic 里面拉取消息放入 enqueuePutQueue 等待 TimerEnqueuePutService 的处理 TimerEnqueuePutService 负责构建 TimerLog 记录,并将其放入时间轮的对应的刻度中  对于出时间轮: TimerDequeueGetService 负责转动时间轮,并取出当前时间刻度的所有 TimerLog 记录放入 dequeueGetQueue TimerDequeueGetMessageService 负责根据 TimerLog 记录,从 CommitLog 中读取消息 TimerDequeuePutMessageService 负责判断队列中的消息是否已经到期,如果已经到期了,那么将其投入用户的 Topic 中,等待消费消费;如果还没有到期,那么重新投入系统定时 Topic,等待重新进入时间轮。  实战:使用定时消息 了解了 RocketMQ 秒级定时消息的原理后,我们看下如何使用定时消息。首先,我们需要创建一个 “定时/延时消息” 类型的 Topic,可以使用控制台或者 CLi 命令创建。 从前面可以看出,对于定时消息来说,是在发送消息的时候 “做文章”。所以,对于生产者,相对于发送普通消息,我们可以在发送的时候设置期望的投递时间。 当定时的时间到了之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。 注意:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大。所以一般建议尽量不要设置大量相同触发时刻的消息。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:凯易、明锻
#技术探索 #功能特性