深度剖析 RocketMQ 5.0 之消息进阶:如何支撑复杂业务消息场景?

作者|隆基
2024年8月9日

简介: 本文主要学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。

1. 前言

从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。

目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。

在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。

2. 背景

今天的课程是 RocketMQ 5.0 消息进阶。这节课依然聚焦在业务消息场景,我们在 RocketMQ 5.0 概述里面就提到 RocketMQ 可以应对复杂的业务消息场景。这节课我们就从功能特性的角度出发,来看 RocketMQ 是如何去解决复杂业务场景的。

第一部分会先学习 RocketMQ 的一致性特性,一致性对于交易、金融都是刚需。第二部分,我们从大规模复杂业务出发,学习 RocketMQ 的 SQL 订阅、定时消息等特性。第三部分,我们再从高可用的角度来看,这里更多的是大型公司对于高阶可用性的要求,如同城容灾、异地多活等。

3. 一致性

3.1. 事务消息

3.1.1. 场景

我们先来看 RocketMQ 的第一个特性——事务消息,这是和一致性相关的特性,这也是 RocketMQ 有别于其他消息队列的一个最具区分度的特性。我们还是继续沿用大规模电商系统的案例,如图,我们仔细梳理一下流程,付款成功会在交易系统中订单数据库将订单状态更新为已付款,然后交易系统再发一条消息给 RocketMQ,RocketMQ 把订单已付款的事件通知给所有下游的应用,保障后续的履约环节。

但是这个流程有个问题,就是交易系统写数据库和发消息是分开的,它不是一个事务。会出现多种异常情况,比如数据库写成功了,但消息发失败了,这个订单的状态下游应用接收不到,对于电商业务可能就造成大量用户付款了,但是卖家不发货。如果先发消息成功,再写数据库失败,会造成下游应用认为订单已付款,推进卖家发货,但是实际用户未付款成功。这些异常都会对电商业务造成大量脏数据,产生灾难性业务后果。

这就需要使用 RocketMQ 高阶特性——事务消息。事务消息的能力是要保障生产者的本地事务(如写数据库)、发消息事务的一致性,最后通过 Broker at least once 的消费语义,保证消费者的本地事务也能执行成功。最终实现生产者、消费者对同一业务的事务状态达到最终一致。

3.1.2. 原理

如下图,事务消息的实现是两个阶段:提交+事务补偿机制结合实现的。

首先,生产者会发送 half 消息,也就是 prepare 消息,broker 会把 half 队列中。接下来生产者执行本地事务,一般是写数据库,本地事务完成后,会往 RocketMQ 发送 commit 操作,RocketMQ 会把 commit 操作写入 OP 队列,并进行 compact,把已提交的消息写到 ConsumeQueue 对消费者可见。反过来如果是 rollback 操作,则会跳过对应的 half 消息。面对异常的情况,比如生产者在发送 commit 或者 rollback 之前宕机了,RocketMQ broker 还会有补偿检查机制,定期回查 Producer 的事务状态,继续推进事务。

无论是 Prepare 消息、还是 Commit/Rollback 消息、或者是 compact 环节,在存储层面都是遵守 RocketMQ 以顺序读写为主的设计理念,达到最优吞吐量。

3.1.3. demo

接下来,我们来看一个事务消息的简单示例。

使用事务消息需要实现一个事务状态的查询器,这也是和普通消息一个最大的区别。如果我们是一个交易系统,这个事务回查器的实现可能就是根据订单 ID 去查询数据库来确定这个订单的状态到底是否是提交,比如说创建成功、已付款、已退款之类的。主体的消息生产流程也有很多不同,需要开启分布式事务,进行两阶段提交,先发一个 prepare 的消息,然后再去执行本地事务。这里的本地事务一般就是执行数据库操作。然后如果本地事务执行成功的话,就整体 commit,把之前的 prepare 的消息提交掉。这样一来,消费者就可以消费这条消息。如果本地事务出现异常的话,那么就把整个事务 rollback 掉,之前的那条 prepare 的消息也会被取消掉,整个过程就回滚了。事务消息的用法变化主要体现在生产者代码,消费者使用方式和普通消息一致,demo 里面就不展示了。

3.2. 顺序消息

3.2.1. 场景 + 原理

第二个高级特性是顺序消息,这个也是 RocketMQ 的特色能力之一。它解决的是顺序一致性的问题,要保障同一个业务的消息,生产和消费的顺序保持一致。在阿里曾有个场景是买卖家数据库复制,由于阿里订单数据库采用分库分表技术,面向买卖家不同的业务场景,分别按照买家主键和卖家主键拆分成买卖家数据库。两个数据库的同步就是采用了 Binlog 顺序分发的机制,通过使用顺序消息,把买家库的 Binlog 变更按照严格顺序在卖家库回放,以此达到订单数据库的一致性。如果没有顺序保障,那么就可能出现数据库级别的脏数据,将会带来严重的业务错误。

顺序消息的实现原理如下图,充分利用 Log 天然顺序读写的特点高效实现。
在 Broker 存储模型中,每个 Topic 都会有固定的 ConsumeQueue,可以理解为 Topic 的分区
生产者为发送消息加上业务 Key,在这个 case 里面可以用订单 ID,同一订单 ID 的消息会顺序发送到同一个 Topic 分区,每个分区在某个时刻只会被一个消费者锁定,消费者顺序读取同一个分区的消息串行消费,以此来达到顺序一致性。

3.2.2. demo

接下来,我们来看顺序消息的一个简单demo。

对于顺序消息来说,生产者跟消费者都有需要注意的地方。

在生产阶段,首先要定义一个消息的 group。每条消息都可以选择一个业务 ID 作为消息 Group,这个业务 ID 尽量离散、随机。因为同一个业务 ID 会分配到同一个数据存储分片,生产和消费都在这个数据分片上串行,如果业务 ID 有热点,会造成严重的数据倾斜和局部消息堆积。比如说在电商交易的场景,一般会选择订单 ID 进行业务消息分组,因为订单 ID 会比较离散。但如果我们选择的是卖家 ID,就有可能会出现热点,热点卖家的流量会远大于普通卖家。

在消费阶段的话,消费阶段有跟常规的消息收发一样有两种模式,一种是全托管的 push consumer 模式,一种是半托管的simple consumer 的模式,RocketMQ SDK 会保障同一个分组的消息串行进入业务消费逻辑。需要注意的点是,我们自身的业务消费代码也要串行进行,然后同步返回消费成功确认。不要把同分组的消息又放到另外的线程池并发消费,这样会破坏顺序语义。

4. 大规模业务

4.1. SQL 过滤

4.1.1. 场景

第三个高级特性是 SQL 消费模式,这个也是复杂业务场景的刚需。我们回到阿里的电商场景,阿里的整个电商业务都是围绕着交易展开,有数百个不同的业务在订阅交易的消息。这些业务基本上都面向某个细分领域,都只需要交易 Topic 下的部分消息。按照传统的模式,一般就是全量订阅交易 Topic,在消费者本地过滤即可,但是这样会消耗大量的计算、网络资源,特别是在双十一的峰值,这个方案的成本是无法接受的。

4.1.2. 原理

为了解决这个问题,RocketMQ 提供了 SQL 消费模式。在交易场景下,每笔订单消息都会带有不同维度的业务属性,包括卖家 ID、买家 ID、类目、省市、价格、订单状态等属性,而 SQL 过滤就是能让消费者通过 SQL 语句过滤消费目标消息。如下图,某个消费者只想关注某个价格区间内的订单创建消息,于是创建这个订阅关系 【Topic=Trade ,SQL: status= order create and (Price between 50 and 100)】,Broker 会在服务端运行 SQL 计算,只返回有效数据给消费者。为了提高性能,Broker 还引入了布隆过滤器模块,在消息写入分发时刻,提前计算结果,写入位图过滤器,减少无效 IO。总的来说就是把过滤链路不断前置,从消费端本地过滤,到服务端写时过滤,达到最优性能。

4.1.3. demo

接下来,我们再来看一个 SQL 订阅的示例。目前 RocketMQ SQL 过滤支持如下的语法,包括属性非空判断、属性大小比较、属性区间过滤、集合判断和逻辑计算,能满足绝大部分的过滤需求。

在消息生产阶段,我们除了设置 Topic、Tag 之外,还能添加多个自定义属性。比如在这个案例里面,我们设置了一个 region 的属性,表示这条消息是从杭州 region 发出来的。在消费的时候,我们就可以对根据自定义属性来进行 SQL 过滤订阅了。第一个 case 是我们用了一个 filter expression,判断 region 这个字段不为空且等于杭州才消费。第二个 case 添加更多的条件,如果这是一笔订单消息,我们还可以同时判断 region 条件和价格区间来决定是否消费。第三个 case 是全接收模式,表达式直接为 True,这个订阅方式会接收某一个主题下面的全量消息,不进行任何过滤。

4.2. 定时消息

4.2.1. 场景+原理

第四个高级特性是定时消息,生产者可以指定某条消息在发送后经过一定时间后才对消费者可见。有不少业务场景需要大规模的定时事件触发,比如典型的电商场景,基本上都有订单创建30分钟未付款就自动关闭订单的逻辑,定时消息能为这个场景带来极大的便利性。

RocketMQ 的定时消息是基于时间轮来实现的。TimerWheel,相信大家并不陌生,模拟表盘转动,来达到对时间进行排序的目的。TimerWheel 中的每一格代表着最小的时间刻度,称之为Tick,RocketMQ 里面是每一个 Tick 为一秒,同一个时刻的消息会写到同一个格子里。由于每个时刻可能会同时触发多条消息,并且每条消息的写入时刻都不一样,所以 RocketMQ 也同时引入了 Timerlog 的数据结构,Timerlog 按照顺序 append 方式写入数据,每个元素都包含消息的物理索引、以及指向同一个时刻的前一条消息,组成一个逻辑链表。TimeWheel 的每个格子都维护这个时刻的消息链表的头尾指针。类似表盘,TimerWheel 会有一个指针,代表当前时刻,绕着 TimerWheel 循环转动,指针所指之处,代表这一个 Tick 到期,所有内容一起弹出,会写到 ConsumeQueue,对消费者可见。

目前 RocketMQ 的定时消息性能已经远超 RabbitMQ 和 ActiveMQ。

5. 全局高可用

接下来,我们再讲一下 RocketMQ 的全局高可用技术解决方案。

在消息基础原理的文章里,我们提到 RocketMQ 的高可用架构,主要是指 RocketMQ 集群内的数据多副本和服务高可用。今天我们这里讲的高可用是全局的,就是业界经常说的同城容灾、两地三中心、异地多活等架构。现在蚂蚁支付和阿里交易采用的是异地多活的架构。异地多活相对于冷备、同城容灾、两地三中心模式具备更多的优点,可以应对城市级别的灾难,如地震、断电等事件。除此之外,一些因为人为的操作,比如说某个基础系统变更,引入新的 bug,导致的整个机房级别的不可用,异地多活的架构可以直接把流量切到可用机房,优先保障业务连续性,再去定位具体的问题。另一方面,异地多活还能实现机房级别的扩容,单一机房的计算存储资源是有限的,异地多活架构可以把业务流量按照比例分散在全国各地机房。同时多活架构实现了所有机房都在提供业务服务,而不是冷备状态,资源利用率大幅度提升。由于是多活状态,面对极端场景的切流,可用性更有保障,信心更足。

在异地多活的架构中,RocketMQ 承担的是基础架构的多活能力。多活的架构分为几个模块:

  • 首先是接入层,通过统一接入层按照业务 ID 把用户请求分散到多个机房,业务ID一般可采用用户ID。
  • 其次是应用层,应用层一般无状态,当请求进入某个机房后,需要尽量保障该请求的整个链路都在单元内封闭,包括 RPC、数据库访问、消息读写。这样才可以降低访问延迟,保障系统性能不会因为多活架构衰退。
  • 再往下是数据层,包括数据库,消息队列等有状态系统。这里侧重讲解 RocketMQ 的异地多活,RocketMQ 通过 connector 组件,实现按 topic 粒度实时同步消息的数据;按照 Consumer 和 Topic 的组合粒度实时同步消费状态。
  • 最后还需要全局的管控层。管控层要维护全局的单元化规则,哪些流量走到哪些机房;管理多活元数据配置,哪些应用需要多活、哪些 Topic 需要多活;另外在切流时刻,要协调所有系统的切流过程,控制好切流顺序。