2024年8月9日

深度剖析 RocketMQ 5.0 之 IoT 消息:物联网需要什么样的消息技术?
简介: 本文来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 本节课分为三个部分,第一部分,我们来学习一个典型的物联网技术架构,以及在这个技术架构里面,消息队列所发挥的作用。第二部分我们会讲在物联网的场景里面,对消息技术的要求和面向服务端应用的消息技术有什么区别?第三部分,我们会学习 RocketMQ 5.0 的子产品 MQTT,是如何解决这些物联网技术难题的。 3. 物联网消息场景 我们先来了解一下物联网的场景是什么,以及消息在物联网里面有什么作用。 物联网肯定是最近几年最火的技术趋势之一,有大量的研究机构、行业报告都提出了物联网快速发展的态势。首先是物联网设备规模爆发式增长,预测会在 2025 年达到 200 多亿台。 其次是物联网的数据规模,来自物联网的数据增速接近 28%,并且未来有 90% 以上的实时数据来自物联网场景。这也就意味着未来的实时流数据处理数据类型会有大量物联网数据。 最后一个重要的趋势是边缘计算,未来会有 75% 的数据在传统数据中心或者云环境之外来处理,这里的边缘指的是商店、工厂、火车等等这些离数据源更近的地方。由于物联网产生的数据规模很大,如果全部数据传输到云端处理,会面临难以承受的成本,应该充分利用边缘的资源直接计算,再把高价值的计算结果传输云端;另一方面,在离用户近的地方计算直接响应,可以降低延迟,提升用户体验。 物联网的发展速度这么快,数据规模那么大,跟消息有什么关系呢?我们通过这个图来看一下消息在物联网场景发挥的作用:第一个作用是连接,承担通信的职责,支持设备和设备的通信,设备和云端应用的通信,比如传感器数据上报、云端指令下发等等这些功能,支撑 IoT 的应用架构,连接云边端。第二个作用是数据处理,物联网设备源源不断的产生数据流,有大量需要实时流处理的场景,比如设备维护,高温预警等等。基于 MQ 的事件流存储和流计算能力,可以构建物联网场景的数据架构。 4. 物联网消息技术 下面我们来看看在物联网场景里,对消息技术有什么诉求?我们先从这个表格来分析物联网消息技术跟之前我们讲的经典消息技术有什么区别? 经典的消息主要是为服务端系统提供发布订阅的能力,而物联网的消息技术是为物联网设备之间、设备和服务端之间提供发布订阅的能力。我们来分别看一下各自场景的特点。 在经典消息场景里,消息 broker、消息客户端都是服务端系统,这些系统都是部署在 IDC 或者公共云环境。无论是消息客户端、消息服务端,都会部署在配置比较不错的服务器机型,有容器、虚拟机、物理机等等这些形式。同时,客户端和消息服务端一般都是部署在同一个机房,属于内网环境,网络带宽特别高,而且网络质量稳定。客户端的数量一般对应到应用服务器的数量,规模较小,一般都是数百、数千台服务器,只有超大规模的互联网公司才能达到百万级。从生产消费的角度来看,每个客户端的消息生产发送量一般对应到其业务的 TPS,能达数百数千的 TPS。在消息消费方面,一般是采用集群消费,一个应用集群共享一个消费者 ID,共同分担该消费组的消息。每条消息的订阅比一般也不高,正常情况下不会超过 10 个。 而在 IoT 消息场景,很多条件都不一样,甚至是相反的。IoT的消息客户端是微型设备,计算存储资源都很有限,消息服务端可能要部署在边缘环境,使用的服务器配置也会比较差。另一方面物联网设备,一般是通过公网的环境来连接的,它的环境特别复杂,而且经常会不断移动,有些时候会断网或处于弱网环境,网络质量差。物联网场景中,消息的客户端实例数对应到物联网设备数,可以到亿级别,比大型互联网公司的服务器数量要大很多。每个设备的消息 tps 不高,但是一条消息有可能同时被百万级的设备接受,订阅比特别高。 5. RocketMQ MQTT 从这里可以看出,物联网需要的消息技术和经典的消息设计很不一样。接下来我们再来看,为了应对物联网的消息场景,RocketMQ 5.0 做了哪些事情?RocketMQ 5.0 里面,我们发布了一个子产品,叫做 RocketMQ MQTT。它有三个技术特点: 首先,它采用的标准的物联网协议 MQTT,该协议面向物联网弱网环境、低算力的特点设计,协议十分精简。同时有很丰富的特性,支持多种订阅模式,多种消息的 QoS,比如有最多一次,最少一次,当且仅当一次。它的领域模型设计也是 消息、 主题、发布订阅等等这些概念,和 RocketMQ 特别匹配,这为打造一个云端一体的 RocketMQ 产品形态奠定了基础。 第二,它采用的是纯算分离的架构。RocketMQ Broker 作为存储层,MQTT 相关的领域逻辑都在 MQTT Proxy 层实现,并面向海量的连接、订阅关系、实时推送深度优化,Proxy 层可以根据物联网业务的负载独立弹性,如连接数增加,只需要新增 proxy 节点。 第三,它采用的是端云一体化的架构,因为领域模型接近、并且以 RocketMQ 作为存储层,每条消息只存一份,这份消息既能被物联网设备消费,也能被云端应用消费。另外 RocketMQ 本身是天然的流存储,流计算引擎可以无缝对 IoT 数据进行实时分析。 5.1. IoT 消息存储模型 接下来我们再从几个关键的技术点,来深入了解 RocketMQ 的物联网技术实现。 5.1.1. 读放大为主,写放大为辅 首先要解决的问题是物联网消息的存储模型,在发布订阅的业务模型里,一般会采用两种存储模型,一种是读放大,每条消息只写到一个公共队列,所有的消费者读取这个共享队列,维护自己的消费位点。另外一种模型是写放大模型,每个消费者有自己的队列,每条消息都要分发到目标消费者的队列中,消费者只读自己的队列。 因为在物联网场景里,一条消息可能会有百万级的设备消费,所以,很显然,选择读放大的模型能显著降低存储成本、提高性能。 但是,只选择读放大的模式没法完全满足要求,MQTT 协议有其特殊性,它的 Topic 是多级 Topic,而且订阅方式既有精准订阅,也有通配符匹配订阅。比如在家居场景,我们定义一个多级主题,比如家/浴室/温度,有直接订阅完整多级主题的 家/浴室/温度,也有采用通配符订阅只关注温度的,还有只关注一级主题为 家的所有消息。 对于直接订阅完整的多级主题消费者可以采用读放大的方式直接读取对应多级主题的公共队列;而采用通配符订阅的消费者无法反推消息的 Topic,所以需要在消息存储时根据通配符的订阅关系多写一个通配符队列,这样消费者就可以根据其订阅的通配符队列读取消息。 这就是 RocketMQ 采用的读放大为主,写放大为辅的存储模型。 5.1.2. 端云一体化存储 基于上节课的分析,我们设计了 RocketMQ 端云一体化的存储模型,看下这张图。 消息可以来自各个接入场景(如服务端的 RMQ/AMQP,设备端的 MQTT),但只会写一份存到 commitlog 里面,然后分发出多个需求场景的队列索引,比如服务端场景(MQ/AMQP)可以按照一级 Topic 队列进行传统的服务端消费,设备端场景可以按照 MQTT 多级 Topic 以及通配符订阅进行消费消息。 这样我们就可以基于同一套存储引擎,同时支持服务端应用集成和 IoT 场景的消息收发,达到端云一体化。 5.2. 队列规模问题 我们都知道像 Kafka 这样的消息队列每个 Topic 是独立文件,但是随着 Topic 增多消息文件数量也增多,顺序写就退化成了随机写,性能明显下降。RocketMQ 在 Kafka 的基础上进行了改进,使用了一个 Commitlog 文件来保存所有的消息内容,再使用 CQ 索引文件来表示每个 Topic 里面的消息队列,因为 CQ 索引数据比较小,文件增多对 IO 影响要小很多,所以在队列数量上可以达到十万级。但是这个终端设备队列的场景下,十万级的队列数量还是太小了,我们希望进一步提升一个数量级,达到百万级队列数量,所以,我们引入了 Rocksdb 引擎来进行 CQ 索引分发。 面向 IoT 的百万级队列设计 Rocksdb 是一个广泛使用的单机 KV 存储引擎,有高性能的顺序写能力。因为我们有了 commitlog 已具备了消息顺序流存储,所以可以去掉 Rocksdb 引擎里面的 WAL,基于 Rocksdb 来保存 CQ 索引。在分发的时候我们使用了 Rocksdb 的 WriteBatch 原子特性,分发的时候把当前的 MaxPhyOffset 注入进去,因为 Rocksdb 能够保证原子存储,后续可以根据这个 MaxPhyOffset 来做 Recover 的 checkpoint。最后,我们也提供了一个 Compaction 的自定义实现,来进行 PhyOffset 的确认,以清理已删除的脏数据。 5.3. IoT 消息推送模型 介绍了底层的队列存储模型后,我们再详细描述一下匹配查找和可靠触达是怎么做的。在 RocketMQ 的经典消费模式里,消费者是直接采用长轮询的方式,从客户端直接发起请求,精确读取对应的 topic 队列。而在 MQTT 场景里,因为客户端数量、订阅关系数量规模巨大,无法采用原来的长轮询模式,消费链路的实现更加复杂。这里使用的是推拉结合的模型。 这里展示的是一个推拉模型,终端设备通过 MQTT 协议连到 Proxy 节点。消息可以来自多种场景(MQ/AMQP/MQTT)发送过来,存到 Topic 队列后会有一个 notify 逻辑模块来实时感知这个新消息到达,然后会生成消息事件(就是消息的 Topic 名称),把这个事件推送至网关节点,网关节点根据它连上的终端设备订阅情况进行内部匹配,找到哪些终端设备能匹配上,然后会触发 pull 请求去存储层读取消息,再推送终端设备。 一个重要问题,就是 notify 模块怎么知道一条消息在哪些网关节点上面的终端设备感兴趣,这个其实就是关键的匹配查找问题。一般有两种方式:第一种,简单的广播事件;第二种,集中存储在线订阅关系(比如图里的 lookup 模块),然后进行匹配查找,再精准推送。事件广播机制看起来有扩展性问题,但是其实性能并不差,因为我们推送的数据很小,就是 Topic 名称,而且相同 Topic 的消息事件可以合并成一个事件,我们线上就是默认采用的这个方式。集中存储在线订阅关系,这个也是常见的一种做法,如保存到 RDS、Redis 等等,但要保证数据的实时一致性也是有难度的,而且要进行匹配查找对整个消息的实时链路RT开销也会有一定的影响。这幅图里还有一个 Cache 模块,用来做消息队列 cache,避免在大广播比场景下每个终端设备都向存储层发起读数据情况。
作者:隆基
#技术探索 #物联网

2024年8月9日

深度剖析 RocketMQ 5.0 之事件驱动:云时代的事件驱动有啥不同?
简介: 本文技术理念的层面了解一下事件驱动的概念。RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 1.前言 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场景,比如同时进行物联网消息、微服务消息的处理,同时进行应用集成、数据集成、实时分析等,企业需要为此维护多套消息系统,付出更多的资源成本和学习成本。 在这样的背景下,2022 年,RocketMQ 5.0 正式发布,相对于 RocketMQ 4.0,架构走向云原生化,并且覆盖了更多的业务场景。想要掌握最新版本 RocketMQ 的应用,就需要进行更加体系化的深入了解。 2.背景 今天我们要学习的课程是 RocketMQ 5.0 的事件驱动。事件驱动是一个经典的概念,通过今天这节课,我们会掌握云时代的事件驱动和之前有哪些不同 这是今天我们要学习的内容,第一部分先从技术理念的层面了解一下事件驱动的概念。第二部分会讲,RocketMQ 5.0 在面向云时代的事件驱动架构新推出的子产品 EventBridge,最后再结合几个具体的案例帮助大家了解云时代的事件驱动方案。 3. 事件驱动架构 3.1. 事件驱动架构定义 首先我们来学习一下什么是事件驱动。先从事件驱动的定义来看,事件驱动本质上是一种软件设计模式。它能够最大化降低不同模块以及不同系统之间的耦合度。 这里有一个典型的事件驱动架构图,首先是事件生产者发送事件到 EventBroker,然后 EventBroker 会把事件路由到对应的消费者进行事件处理。事件处理能够灵活扩展,随时增减事件消费者,事件生产者对此透明。 为什么说事件驱动是个很经典的设计模式呢,因为早在几十年前,就出现过多种事件驱动的技术,比如桌面客户端编程框架,点击按钮就可以触发 onclick 事件,开发者编写业务逻辑响应事件。在编程语言上,也经常会采用事件驱动的代码模式,比如 callback、handler 这类的函数。进入分布式系统的时代,系统之间的通信协同也会采用事件驱动的方式。 你有没有发现,这里的图和之前 RocketMQ 的消息应用解耦图很像。没错,无论是消息的发布订阅,还是事件的生产消费都是为了进行代码解耦、系统解耦。消息队列更偏技术实现,大部分的 EventBroker 都是基于消息队列实现的,而事件驱动更偏向于架构理念。 3.2. 事件的特征 从技术角度来看,消息队列是和 RPC 对应的,一个是同步通信,一个是异步通信。消息队列并不会规定消息的内容,只负责传输二进制内容。如果从技术实现来看,的确,EDA 需要的核心技术就是消息队列的技术。事件驱动跟消息驱动最大的区别就是,事件是一种特殊的消息,只有消息满足了某些特征,才能把它叫做事件。 我打个比方,来看左边这个图。消息就像是一个抽象类,有多种子类,最主要的就是 Command 和 Event 两种。以信号灯为例,向信号灯发送打开的消息,这就是一种 Command,信号灯接受这个 Command 并开灯。开灯后,信号灯对外发出信号灯变成绿色的消息,这个就是一种 Event。 对于 Event 来说,有四个主要的特征: 第一,它是一个不可变的,事件就是表示已经发生了的事情,已经成为事实。 第二,事件有时间概念,并且对同一个实体来说事件的发送是有序的。如信号灯按顺序发送了绿、黄、红等事件。 第三,事件是无预期的,这个就是EDA架构之所以能够实现最大化解耦的特点,事件的产生者对于谁是事件消费者,怎么消费这个事件是不关心的。 第四,由于事件驱动是彻底解耦的,并且对于下游怎么去消费事件没有预期,所以事件是具象化的,应该包括尽可能详尽的信息,让下游消费者各取所需。比如像交通交通信号灯事件,包含多个字段,包括它的来源是谁、它的类型是什么?它的主题是什么?是具体的哪一个信号灯,另外它会包含唯一的ID,便于跟踪?它会有事件发生时间,事件的内容。 3.3. 云时代的事件驱动 在全行业数字化转型的时代,事件驱动架构应用范围扩大,成为 Gartner 年度十大技术趋势。在新型的数字化商业解决方案里,会有 60% 采纳 EDA 架构。 事件驱动作为一个经典的架构模式,为什么会在云时代再度成为焦点呢?主要有两个原因: 首先是云原生技术带来的,其中之一是微服务。微服务是云原生应用架构的核心,引入微服务架构,数字化企业能够按照小型化的业务单元和团队划分,以“高内聚、低耦合”的方式高效协作。但是微服务架构也会带来新的问题,比如大量同步微服务会面临延迟增大、可用性降低等风险,采用事件驱动的微服务体系,可提高微服务的韧性,降低延迟,实现更彻底的解耦。 另外一个云原生代表技术 Serverless 架构范式本身也是事件驱动的。现在主要的 Serverless 产品形态,无论是阿里云的函数计算、还是 AWS 的 Lambda,它们的主要触发源都是各种形态的事件,比如云产品事件,OSS 文件上传,触发用户基于函数进行文件加工处理计算;用户业务事件,EventBroker 触发函数运行消费逻辑;云产品运维事件,用户通过响应事件,在云平台的基础上扩展自己的自动化运维体系。事件驱动架构的大规模使用,能够帮助数字化企业释放云计算 Serverless 的技术红利。 IoT 也是事件驱动架构的重要推动力,有大量的 IoT 应用构建都是基于事件驱动的,比如传感器上报设备事件,温度变化事件、地址位置变化事件等等,云端应用订阅这些事件触发对应的业务流程。 在全行业大规模数字化转型后,跨业务、跨组织的业务合作会从线下搬到线上,在数字经济时代,数字化商业生态规模会持续扩大,跨组织业务协同更需要彻底解耦。而 EDA 天然具备的异步、解耦的特性就可以解决这一系列的问题。比如阿里聚石塔业务就是事件驱动的模式,聚石塔实时发布交易事件,合作伙伴包括ISV、软件服务商、品牌商家订阅消费交易事件,建设个性化的 CRM、商家运营、后台管理系统等等,形成一个庞大的电子商务数字化生态。 4. EventBridge 4.1. 云时代的事件驱动能力抽象 接下来进入第二个部分的内容,一起学习一下 RocketMQ 5.0 的 EventBridge。在了解这个系统的技术实现之前,我们先来了解一下 EventBridge 对事件驱动的通用能力抽象,从这里也可以了解到 EventBridge 的领域模型。 我们从左往右看这张图。最左边是事件源,因为这个事件是希望被跨平台消费的,所以我们希望采用业界标准来作为事件的格式。同时,事件是有可能被跨组织消费的,所以我们需要一个统一的事件中心,让这些不同的事件源都注册到这个事件中心。对消费者来说就好比是一个事件商店,能够选择自己感兴趣的事件订阅。在事件消费者开始编写消费逻辑的时候,他还需要对这个事件的格式有更清楚的了解,需要知道这个事件有哪些内容,有哪些字段,分别是什么含义,才能编写正确的消费业务逻辑。所以,EventBridge 还提供了 schema 中心,有这个 schema 中心后,消费者对于事件的格式也就一目了然,不用跟事件源的发起者进行沟通了,整个效率也得到了大幅度的提升。再往后面看,就到了事件消费的环节,因为事件的消费者种类很多,不同消费者关注不同的事件类型,EventBridge 需要提供丰富的过滤规则。即便多个消费者对同一个事件感兴趣,但是可能只需要事件的部分内容,EventBridge 还提供了事件转换的能力。这就是 RocketMQ 5.0 对事件驱动的能力抽象。 4.2. 统一事件标准 在云计算的时代、大规模数字化转型时代,我们强调事件驱动架构往往跨越了不同的组织,不同的平台。所以事件驱动架构需要一个统一的事件标准。在 EventBridge 这个产品里,我们采纳了 CNCF 基金会中的 CloudEvents 标准,这个是业界事件的事实标准,这个标准就是为了简化事件声明,提升事件在跨服务、跨平台的互操作性。 CloudEvents 带来了很多价值: 第一,它提供了一种规范,使得跨组织、跨平台的事件集成,有了共同语言,加速更多的事件集成。然后也因为有的规范,所以它可以加速跨服务,跨平台的事件的集成。 第二,随着 Serverless 的普及,各大云厂商都提供函数计算的服务,有了 CloudEvents 规范,用户在函数计算的使用上就可以实现无厂商绑定。 第三,webhook 是一种通用的集成模式,有了 CloudEvents 规范作为统一格式,不同系统的 webhook 能实现更好的互操作性。 最后,基于这样统一的规范,其实是更有利于沉淀事件驱动的基础软件设施的,比如跨服务的事件 Tracing 链路追踪。 4.3. RocketMQ EventBridge 如下图是 RocketMQ 面向 EDA 场景全新推出的产品形态 EventBridge。 它的核心技术都是基于 RocketMQ,但是在产品界面上面向事件驱动的业务进行一层抽象,核心领域对象从消息变成 CloudEvents。基于统一事件标准来构建事件驱动的数字生态。它的事件源也很多样,可以是云产品事件,可以是 SaaS 平台事件,应用自定义事件、通用的 WebHook。当然,它的事件目标更是多样化的,通过事件规则引擎把事件路由到不同的消费者,典型的消费者,比如函数计算,也可以是存储系统,消息通知如钉钉短信,还有通用的的 webhook。通过事件驱动这种彻底解耦的架构,更适合建设混合云、多云的数字化系统。 为了提升事件驱动的研发效率,EventBridge 也支持 Schema 的特性,支持事件信息的解释、预览,甚至还可以自动化的生成代码,让开发者以低代码、0 代码的方式完成事件集成。 EventBridge 的另一个比较重要的特性是事件规则引擎。因为不同的事件消费者,他们对于事件的兴趣是不一样的。所以我们提供了七种事件过滤模式,包括前缀匹配、后缀匹配、除外匹配、数值匹配等等,可以进行各种复杂的组合逻辑过滤,只推送消费者感兴趣的事件。 当然,就算都关心同一个事件,不同消费者对事件内部的信息关注点也会有所不同。为了提升事件消费效率,我们也提供了四种事件转化器,可以只推送给消费者它关心的事件字段。还可以对事件进行自定义的模板转化,满足更灵活的业务诉求。 作为 RocketMQ 的子项目,在 EventBridge 里也同样提供了完整的可观测的能力。能够根据事件的时间、类型查询事件列表。每个事件都会生成唯一 ID。用户可以根据唯一 ID 去精确的定位事件的内容、发生时间、对应的事件规则,下游的消费状况,精准排查问题。 5. 典型案例 接下来结合几个典型案例来看 EventBridge 的使用场景。 第一个案例适用于使用大量云产品的公司。C 客户是一家以智能消费终端为核心的科技公司,希望收集账号里全部的云上事件,方便后续做分析或故障处理。公共云的 EventBridge 汇聚了所有的云产品事件,通过 EventBridge,客户能收集全量的事件对齐进行自定义的业务处理。还能够配置事件规则,过滤异常事件推送给监控系统或者钉钉,及时关注处理。 第二个案例是 SaaS 事件的集成。现在随着整个云计算生态的繁荣,有不少企业不仅使用了公共云的 IaaS、PaaS 产品,也会同时使用三方的 SaaS 产品,比如各种 ERP、CRM 等系统。基于 EventBridge 标准的 HTTP、webhook 的集成能力,能够无缝连接三方 SaaS 系统作为事件源,企业能够收集到他所关心的所有 SaaS 事件,方便后续管理,比如申请单,入职单,报销单,订单等等这些场景。 第三个案例是 SaaS 平台集成,以钉钉为例,钉钉是典型的 SaaS 平台,他有繁荣的生态,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等等。通过 EventBridge 把公共云的 Paas 层生态和钉钉的 SaaS 层生态连接起来,而且依赖 EventBridge 完成整体事件生命周期的管理,以 WebHook 的形式推送给下游 ISV 接收端。比如钉钉的官方事件源包括视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以把钉钉的官方数据流和其他系统做深度集成。
作者:隆基
#技术探索 #事件驱动架构

2023年7月27日

RocketMQ 在业务消息场景的优势详解
一、消息场景 RocketMQ5.0是消息事件流一体的实时数据处理平台,是业务消息领域的事实标准,很多互联网公司在业务消息场景会使用RocketMQ。 我们反复提到的“消息、业务消息”,指的是分布式应用解耦,是RocketMQ的业务基本盘。通过本文,我们将深入了解RocketMQ5.0在业务消息场景的优势能力,了解为什么RocketMQ能够成为业务消息领域的事实标准。 RocketMQ在业务消息领域的经典场景是应用解耦,这也是RocketMQ诞生初期解决阿里电商分布式互联网架构的核心场景,主要承担分布式应用(微服务)的异步集成,达到应用解耦的效果。解耦是所有的软件架构最重要的追求。 分布式应用(微服务)采用同步RPC与异步消息的对比。比如在业务系统中,有三个上游应用与4个下游应用,采用同步RPC的方式,会有34的依赖复杂度;而采用异步消息的方式则可以化繁为简,简化为3+4的依赖复杂度,从乘法简化为加法。 通过引入消息队列实现应用的异步集成可以获得四大解耦优势。 代码解耦:极大提升业务敏捷度。如果用同步调用的方式,每次扩展业务逻辑都需要上游应用显式调用下游应用接口,代码直接耦合,上游应用要做变更发布,业务迭代互相掣肘。而通过使用消息队列扩展新的业务逻辑,只需要增加下游应用订阅某个Topic,上下游应用互相透明,业务可以保持灵活独立快速迭代。 延迟解耦:如果使用同步调用的方式,随着业务逻辑的增加,用户操作的远程调用次数会越来越多,业务响应越来越慢,性能衰减,业务发展不可持续。而使用消息队列,无论增加多少业务,上游应用只需调用一次消息队列的发送接口即可响应线上用户,延迟为常量,基本在5ms以内。 可用性解耦:如果使用同步调用的方式,任何下游业务不可用都会导致整个链路失败。该种结构下类似于串联电路,甚至在部分调用失败的情况下,还会出现状态不一致。而采用RocketMQ进行异步集成,只要RocketMQ服务可用,用户的业务操作便可用。RocketMQ服务通过多对主备组成的broker集群提供,只要有一对主备可用,则整体服务可用,作为基础软件,可用性远大于普通的业务应用,下游应用的业务推进都可以通过MQ的可靠消息投递来达成。 流量解耦:即削峰填谷。如果采用同步调用的方式,上下游的容量必须对齐,否则会出现级联不可用。容量完全对齐需要投入大量精力进行全链路压测与更多机器成本。而通过引入RocketMQ,基于RocketMQ亿级消息的堆积能力,对于实时性要求不高的下游业务,可以尽最大努力消费,既保证了系统稳定性,又降低了机器成本与研发运维成本。 二、基础特性 阿里的交易应用流程为:用户在淘宝上下单时会调用交易应用创建订单,交易应用将订单落到数据库,然后生产一条订单创建的消息到RocketMQ,返回给终端用户订单创建成功的接口。完成的交易流程推进则是依赖RocketMQ将订单创建消息投递给下游应用,会员应用收到订单消息,需要给买家赠送积分、淘金币,触发用户激励相关的业务。购物车应用则是负责删除在购物车里面的商品,避免用户重复购买。同时,支付系统与物流系统也都会基于订单状态的变更,推进支付环节与履约环节。 过去十年多年,阿里电商业务持续蓬勃发展,交易的下游应用已达数百个,并且还在不断增加。基于RocketMQ的电商架构极大提高了阿里电商业务的敏捷度,上游核心的交易系统完全无需关心哪些应用在订阅交易消息,交易应用的延迟与可用性也一直保持在很高水准,只依赖少量的核心系统与RocketMQ,不会受数百个下游应用的影响。 交易的下游业务类型不一,有大量的业务场景不需要实时消费交易数据,比如物流场景能容忍一定的延迟。通过RocketMQ的亿级堆积能力,极大降低了机器成本。RocketMQ的sharednothing架构具备无限横向扩展的能力,已经连续10年支撑了高速增长的双十一消息峰值,在几年前达到亿级TPS。 三、增强能力 经典场景下,RocketMQ相对于其他消息队列,拥有诸多差异化优势与增强。 首先,稳定性方面,稳定性交易是金融场景最重要的需求。RocketMQ的稳定性不仅限于高可用架构,而是通过全方位的产品能力来构建稳定性竞争力。比如重试队列,当下游消费者因为业务数据不ready或其他原因导致某条消息消费失败,RocketMQ不会因此阻塞消费,而是能将此消息加入到重试队列,然后按时间衰减重试。如果某条消息因为某些因素经过十几次重试始终无法消费成功,则RocketMQ会将它转到死信队列,用户可以通过其他手段来处理失败的消息,是金融行业的刚需。 同时,消费成功后如果因为代码bug导致业务不符合预期,应用可以对业务bug进行修复并重新发布,然后应用消息回溯的功能将消息拉回到之前的时间点,让业务按照正确逻辑重新处理。 RocketMQ的消费实现机制采用自适应拉模式的消费,在极端的场景下能够避免消费者被大流量打垮。同时,在消费者的SDK里,做了缓存本地的消息数量与消息内存占用的阈值保护,防止消费应用的内存风险。 其次,RocketMQ还具备优秀的可观测能力,是稳定性的重要辅助手段。RocketMQ是业界第一个提供消息消息级别可观测能力的消息队列,每条消息都可以带上业务主键,比如在交易场景,用户可以将订单ID作为消息的业务主键。当某个订单的业务需要排查,用户可以基于订单ID查询该条消息的生成时间以及消息内容。消息的可观测数据还能继续下钻,通过消息轨迹查看消息由哪台生产者机器发送、由哪些消费者机器在什么时间消费、消费状态是成功或失败等。 除此之外,它支持了几十种核心的度量数据,包括集群生产者流量分布、慢消费者排行、消费的平均延迟、消费堆积数量、消费成功率等。基于丰富的指标,用户可以搭建更加完善的监控报警体系来进一步加固稳定性。 为了支撑更灵活的应用架构,RocketMQ在生产与消费等关键接口提供了多种模式。 生产者接口:RocketMQ同时提供了同步发送接口与异步发送接口。同步发送是最常用的模式,业务流程的编排是串行的,在应用发完消息、Broker完成存储后返回成功后,应用再执行下一步逻辑。然而在某些场景下,完成业务涉及多个远程调用,应用为了进一步降低延迟、提高性能,会采用全异步化的方式,并发发出远程调用(可以是多次发消息或RPC的组合),异步收集结果推,进业务逻辑。 在消费者的接口方面也提供了两种方式: 监听器模式被动消费:这是目前使用最广泛的方式,用户无需关心客户端何时去Broker拉取消息,何时向Broker发出消费成功的确认,也无需维护消费线程池、本地消息缓存等细节。只需要写一段消息监听器的业务逻辑,根据业务执行结果返回Success或Failure。它属于全托管的模式,用户可以专注于业务逻辑的编写,而将实现细节完全委托给RocketMQ客户端。 主动消费模式:将更多的自主权交给用户,也称为Simple Consumer。在该种模式下,用户可以自己决定何时去Broker读取消息、何时发起消费确认消息。对业务逻辑的执行线程也有自主可控性,读取完消息后,可以将消费逻辑放在自定义的线程池执行。在某些场景下,不同消息的处理时长与优先级会有所不同,采用Simple Consumer的模式,用户可根据消息的属性、大小做二次分发,隔离到不同的业务线程池执行处理。该模式还提供了消息粒度消费超时时间的设定能力,针对某些消费耗时长的消息,用户能够调用change Invisible Duration接口,延长消费时间,避免超时重试。 四、总结 消息经典场景:应用解耦; RocketMQ基础特性:发布订阅、可靠消息、亿级堆积、无限扩展; 业务消息场景的增强能力:稳定性、可观测、多样化接口。 【活动】一键体验 RocketMQ 六大生产环境 免费试用+30秒一键体验,低门槛、快速、高效、易操作,带你了解“历经万亿级数据洪峰考验”的云消息队列RocketMQ! 点击阅读原文,立即参与活动! 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:隆基
#技术探索

2023年7月20日

从互联网到云时代,Apache RocketMQ 是如何演进的?
2022年,RocketMQ5.0的正式版发布。相对于4.0版本而言,架构走向云原生化,并且覆盖了更多业务场景。 一、消息队列演进史 操作系统、数据库、中间件是基础软件的三驾马车,而消息队列属于最经典的中间件之一,已经有30多年的历史。消息队列的发展主要经历了以下几个阶段: 第一阶段(19802000年):80年代诞生了第一款消息队列The Information Bus,第一次提出发布订阅模式来解决软件之间的通信问题;90年代是国际商业软件巨头的时代,IBM、Oracle、Microsoft纷纷推出自己的MQ,其中最具代表性的为IBM MQ,价格昂贵,面向高端企业,主要是大型金融、电信等企业。该类商业MQ一般采用高端硬件,软硬件一体机交付,MQ本身的软件架构为单机架构。 第二阶段(2000~2007年):进入00年代后,初代开源消息队列崛起,诞生了JMS、AMQP两大标准,与之对应的两个实现分别为ActiveMQ、RabbitMQ,他们引领了初期的开源消息队列技术。开源极大促进了消息队列的流行,降低了使用门槛,技术普惠化,逐渐成为企业级架构的标配。相比于今天而言,这类MQ主要面向传统企业级应用和小流量场景,横向扩展能力较弱。 第三阶段(2007~2017年):PC互联网、移动互联网爆发式发展。由于传统的消息队列无法承受亿级用户的访问流量与海量数据传输,诞生了互联网消息中间件,核心能力是全面采用分布式架构,具备很强的横向扩展能力,开源典型代表有Kafka、RocketMQ,闭源的有淘宝Notify。Kafka的诞生还将消息中间件从消息领域延伸到了流领域,从分布式应用的异步解耦场景延伸到大数据领域的流存储与流计算场景。 第四阶段(2014~至今):云计算、IoT、大数据引领了新的浪潮。 二、互联网时代的RocketMQ 阿里的电商系统最初是个庞大的单体巨石应用,在研发效率、稳定性方面都无法满足淘宝和天猫飞速的发展。为了解决问题,2008年,淘宝与天猫发起了一次最大规模的架构升级,启动了“五彩石”项目,将单体应用拆分为分布式应用,同时抽象淘宝、天猫的共同底座——业务中台,包括交易中心、商品中心、买家中心等。在业务中台之下,同时诞生了阿里中间件(初期三大件包括消息、RPC、分布式数据层),RocketMQ是其中之一。 虽然在当时业界已经存在不少商业或开源的消息队列,比如IBMMQ、ActiveMQ、RabbitMQ,但无一例外,它们都诞生于传统企业级应用的场景,无法承受互联网对于高并发、无限扩展的苛刻要求。以RabbitMQ为例,RabbitMQ的队列流量与存储负载都为单机,无法满足业务横向扩展的需求。当时另一款具备无限横向扩展能力的消息队列是Kafka,但其主要用于日志类场景,未经过大规模核心业务稳定性验证,而且偏向于简单的log型消息队列,无法满足电商对于复杂消息功能特性的诉求,比如消息过滤、延迟消息等。 另一方面,传统的消息队列无法解决电商业务对于分布式一致性的要求。通过消息队列实现应用异步解耦后,电商业务还需要保障不同上下游应用对于订单状态要达成最终一致,否则会产生大量脏数据,造成业务错误。 大规模的电商系统,既要高性能又要一致性,传统的分布式事务技术束手无策。比如IBM MQ虽然可以使用XA事务来满足分布式一致性的功能诉求,但是XA带来的延迟与成本,对于海量的互联网流量难以承受。 为了解决电商业务对于消息队列的高性能、一致性、无限扩展等需求,自研消息队列成为了当时阿里唯一的出路,最终互联网消息队列RocketMQ应运而生。 为了支持超大规模的复杂电商业务,RocketMQ面向四个方面进行了重点建设,形成了四大优势能力。 ① 支撑超大规模复杂业务的能力,具备丰富的消息特性。 每一个大型互联网公司都会有主营业务(比如阿里是交易、蚂蚁是支付、饿了么是外卖),以主营业务为中心扩展业务能力,阿里电商是围绕交易事件建设的电商操作系统,每笔交易事件都会触发不同的业务,不同细分业务会关注不同类型的交易事件,比如垂直市场只关注某个类目的交易事件、天猫超市只关注某个卖家的交易事件、购物车只关注下单成功的交易事件等。 RocketMQ的SQL订阅提供灵活的消息过滤能力,能够满足下游消费者按照不同的业务维度进行消息过滤的诉求。 在大型互联网业务中,还会有各种定时事件触发场景,最典型的是交易超时关闭机制,阿里交易或者12306订票都有类似的机制。RocketMQ的定时消息能够很方便的满足这类诉求。 ② 一致性。 无论是阿里交易还是蚂蚁支付,都天然对数据一致性有着极高要求,RocketMQ在一致性方面也打造了多个关键特性。最具代表性的是分布式事务消息,RocketMQ是第一个实现该种特性的消息队列,能够保障交易的上下游对于订单状态达到最终一致。该方案也成为异步消息一致性方案的事实标准,被多个互联网公司所采纳,甚至也有公司将移植到定制版的Kafka种。除了分布式一致性之外,RocketMQ还提供了顺序消息的特性,满足顺序一致性的需求。 ③ 稳定性。 稳定性是交易与金融场景的基石特性,也是RocketMQ的根本。RocketMQ除了具备核心服务的HA之外,还具备了全局高可用能力,在阿里内部支持同城多活、异地多活、中心容灾等高阶HA能力。同时,稳定性也不局限于数据与服务的高可用,RocketMQ从产品层面对稳定性进行了全方位的建设,如消息轨迹、消息回溯、消息死信机制。 ④ 高性能。 在双十一的极限流量下,RocketMQ写消息延迟4个9在1ms内,100%在100ms内。RocketMQ采用sharednothing分布式架构,在吞吐量方面也具备无限扩展的能力,已经连续10年支持了双十一万亿级消息洪峰,为百万级的应用实例提供低延迟消息服务。互联网的故事还在进行,云计算规模化落地的时代悄然而来。 三、云计算时代的RocketMQ5.0 2015年,RocketMQ的首个云消息服务在阿里云上线,开启了大规模的云计算实践的序幕。同时RocketMQ也是业界第一个提供公有云服务的开源消息队列。 在大规模的云计算业务场景下,RocketMQ面临着全新的挑战与机遇。 多样性:它不再仅服务于某一家公司的内部业务,不再局限于互联网或金融企业,需要实现全行业、全场景的覆盖。 标准化:对于服务企业内部的自研消息队列而言,无需考虑协议或API的标准化。但是对于云消息服务而言,因为服务对象是外部企业客户,据信通院统计,80%以上的企业客户已经采纳开源技术和标准技术。因此,作为一款云消息服务,需要提供对业界的事实标准协议、接口、SDK的兼容,才能保证客户平滑上云,同时打消客户技术绑定的担忧。 云原生:云原生理念深入人心,消息队列要更好地帮助客户实现云原生应用架构,为业务降本提效。 新趋势:各种新技术的兴起,包括IoT、5G、边缘计算、事件驱动,还有事件流技术。面向技术的新趋势与多样化的业务需求,RocketMQ进行了自我进化,演进到5.0版本。 为了充分释放云的技术红利,RocketMQ5.0在技术架构上进行了云原生的演进。从客户端到服务端都进行了全方位的改造,更高弹性、可用性、更低成本。 客户端采用轻量SDK设计理念,将原来富客户端的逻辑下沉到Broker,满足现代化应用轻量化、Serverless的趋势。 Broker彻底进行弹性架构改造,分离RocketMQ Proxy与Store层,其中Proxy是完全无状态的计算节点,专注多协议、多领域场景覆盖,可以面向不同工作负载独立弹性,如物联网、微服务、大数据不同场景有不同的资源诉求。Store层则专注消息的高可用存储,包括副本复制、主备切换与云存储集成。同时对RocketMQ的Topic资源进行三层解耦,面向消息的Topic、面向流的Topic逻辑分片、面向底层存储的Topic物理分片,每一层都可以独立弹性。 在存储层引入了Leaderless的高可用架构,Store节点身份对等,Leaderless化,0外部依赖。多副本策略可定制,可用性+可靠性+成本灵活组合,面向多可用区、多region组建Geo高可用能力。 为了满足云时代多样化的用户需求,RocketMQ5.0从原来的互联网业务消息中间件扩展到"消息、事件、流"超融合处理平台,解锁更全面的能力。 在消息领域,全面拥抱云原生技术,更好的弹性架构与高可用能力。 在事件领域,支持CloudEvent规范,以事件为中心的产品新界面,助力客户建设跨业务、跨组织的数字化商业生态。 在流领域,流存储增强批量特性,大幅度提高数据吞吐量;新增逻辑队列能力,解耦逻辑资源与物理资源,在流场景也具备无缝伸缩能力;新增流数据库RSQLDB,提供实时事件流处理、流分析能力。 RocketMQ基于端云一体化架构实现了完整的物联网消息队列的能力,从原来的连接应用扩展到连接物联网设备。同时RocketMQ5.0也继续保持极简架构的原则,能够以最低的资源消耗、运维成本搭建服务,适合边缘计算。 除了的产品核心能力之外,RocketMQ5.0积极建设开源生态。 一方面是应用架构生态的建设,既有经典的开源项目、规范的集成,比如JMS、AMQP等,也有云原生技术生态的集成,比如CloudEvents、Dapr、Envoy。同时RocketMQ也会进一步发力数据架构生态,全链路集成大数据的摄入、数据存储、数据处理、数据分析组件,从离线大数据到实时大数据。 【活动】一键体验 RocketMQ 六大生产环境 免费试用+30秒一键体验,低门槛、快速、高效、易操作,带你了解“历经万亿级数据洪峰考验”的云消息队列RocketMQ! 点击阅读原文,立即参与活动! 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:隆基
#技术探索 #云原生

2023年7月13日

RocketMQ 5.0 无状态实时性消费详解
背景 RocketMQ 5.0版本引入了Proxy模块、无状态pop消费机制和gRPC协议等创新功能,同时还推出了一种全新的客户端类型:SimpleConsumer。SimpleConsumer客户端采用了无状态的pop机制,彻底解决了在客户端发布消息、上下线时可能出现的负载均衡问题。然而,这种新机制也带来了一个新的挑战:当客户端数量较少且消息数量较少时,可能会出现消息消费延时的情况。。 在当前的消息产品中,消费普通使用了长轮询机制,即客户端向服务端发送一个超时时间相对较长的请求,该请求会一直挂起,除非队列中存在消息或该请求到达设定的长轮询时间。 然而,在引入Proxy之后,目前的长轮询机制出现了一个问题。客户端层面的长轮询和Proxy与Broker内部的长轮询之间互相耦合,也就是说,一次客户端对Proxy的长轮询只对应一次Proxy对Broker的长轮询。因此,在以下情况下会出现问题:当客户端数量较少且后端存在多个可用的Broker时,如果请求到达了没有消息的Broker,就会触发长轮询挂起逻辑。此时,即使另一台Broker存在消息,由于请求挂在了另一个Broker上,也无法拉取到消息。这导致客户端无法实时接收到消息,即false empty response。 这种情况可能导致以下现象:用户发送一条消息后,再次发起消费请求,但该请求却无法实时拉取到消息。这种情况对于消息传递的实时性和可靠性产生了不利影响。 AWS的文档里也有描述此等现象,他们的解决方案是通过查询是所有的后端服务,减少false empty response。 其他产品 在设计方案时,首先是需要目前存在的消息商业化产品是如何处理该问题的。 MNS采取了以下策略,主要是将长轮询时间切割为多个短轮询时间片,以尽可能覆盖所有的Broker。 首先,在长轮询时间内,会对后端的Broker进行多次请求。其次,当未超过短轮询配额时,优先使用短轮询消费请求来与Broker进行通信,否则将使用长轮询,其时间等于客户端的长轮询时间。此外,考虑到过多的短轮询可能会导致CPU和网络资源消耗过多的问题,因此在短轮询超过一定数量且剩余时间充足时,最后一次请求将转为长轮询。 然而,上述策略虽以尽可能轮询完所有的Broker为目标,但并不能解决所有问题。当轮询时间较短或Broker数量较多时,无法轮询完所有的Broker。即使时间足够充足的情况下,也有可能出现时间错位的情况,即在短轮询请求结束后,才有消息在该Broker上就绪,导致无法及时取回该消息。 解法 技术方案 首先,需要明确该问题的范围和条件。该问题只会在客户端数量较少且请求较少的情况下出现。当客户端数量较多且具备充足的请求能力时,该问题不会出现。因此,理想情况是设计一个自适应的方案,能够在客户端数量较多时不引入额外成本来解决该问题。 为了解决该问题,关键在于将前端的客户端长轮询和后端的Broker长轮询解耦,并赋予Proxy感知后端消息个数的能力,使其能够优先选择有消息的Broker,避免false empty response。 考虑到Pop消费本身的无状态属性,期望设计方案的逻辑与Pop一致,而不在代理中引入额外的状态来处理该问题。 另外,简洁性是非常重要的,因此期望该方案能够保持简单可靠,不引入过多的复杂性。 1. 为了解决该问题,本质上是要将前端的客户端长轮询和后端的Broker长轮询解耦开来,并赋予Proxy感知后端消息个数的能力,能够优先选择有消息的Broker,避免false empty response。 2. 由于Pop消费本身的无状态属性,因此期望该方案的设计逻辑和Pop一致,而不在Proxy引入额外的状态来处理这个事情。 3. Simplicity is ALL,因此期望这个方案简单可靠。 我们使用了NOTIFICATION,可以获取到后端是否有尚未消费的消息。拥有了上述后端消息情况的信息,就能够更加智能地指导Proxy侧的消息拉取。 通过重构NOTIFICATION,我们对其进行了一些改进,以更好地适应这个方案的要求。 pop with notify 一个客户端的请求可以被抽象为一个长轮询任务,该轮询任务由通知任务和请求任务组成。 通知任务的目的是获取Broker是否存在可消费的消息,对应的是Notification请求;而请求任务的目的是消费Broker上的消息,对应的是Pop请求。 首先,长轮询任务会执行一次Pop请求,以确保在消息积压的情况下能够高效处理。如果成功获取到消息,则会正常返回结果并结束任务。如果没有获取到消息,并且还有剩余的轮询时间,则会向每个Broker提交一个异步通知任务。 在任务通知返回时,如果不存在任何消息,长轮询任务将被标记为已完成状态。然而,如果相关的Broker存在消息,该结果将被添加到队列中,并且消费任务将被启动。该队列的目的在于缓存多个返回结果,以备将来的重试之需。对于单机代理而言,只要存在一个通知结果返回消息,Proxy即可进行消息拉取操作。然而,在实际的分布式环境中,可能会存在多个代理,因此即使通知结果返回消息存在,也不能保证客户端能够成功拉取消息。因此,该队列的设计旨在避免发生这种情况。 消费任务会从上述队列中获取结果,若无结果,则直接返回。这是因为只有在通知任务返回该Broker存在消息时,消费任务才会被触发。因此,若消费任务无法获取结果,可推断其他并发的消费任务已经处理了该消息。 消费任务从队列获取到结果后,会进行加锁,以确保一个长轮询任务只有一个正在进行的消费任务,以避免额外的未被处理的消息。 如果获取到消息或长轮询时间结束,该任务会被标记完成并返回结果。但如果没有获取到消息(可能是其他客户端的并发操作),则会继续发起该路由所对应的异步通知任务,并尝试进行消费。 自适应切换 考虑到当请求较多时,无需采用pop with notify机制,可使用原先的pop长轮询broker方案,但是需要考虑的是,如何在两者之间进行自适应切换。目前是基于当前Proxy统计的pop请求数做判断,当请求数少于某一值时,则认为当前请求较少,使用pop with notify;反之则使用pop长轮询。 由于上述方案基于的均为单机视角,因此当消费请求在proxy侧不均衡时,可能会导致判断条件结果有所偏差。 Metric 为了之后进一步调优长轮询和观察长轮询的效果,我们设计了一组metric指标,来记录并观测实时长轮询的表现和损耗。 1. 客户端发起的长轮询次数 (is_long_polling) 2. pop with notify次数 (通过现有rpc metric统计) 3. 首次pop请求命中消息次数 (未触发notify) (is_short_polling_hit) 总结 通过如上方案,我们成功设计了一套基于无状态消费方式的实时消费方案,在做到客户端无状态消费的同时,还能够避免false empty response,保证消费的实时性,同时,相较于原先PushConsumer的长轮询方案,能够大量减少用户侧无效请求数量,降低网络开销, 产品侧 需明确长轮询和短轮询的区分,可以参考AWS的定义,当轮询时间大于0时,长轮询生效。 且需明确一个长轮询最小时间,因为长轮询时间过小时无意义,AWS的最小值采取了1s,我们是否需要follow,还是采取一个更大的值。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
#技术探索 #功能特性 #云原生

2023年4月11日

RocketMQ 多级存储设计与实现
设计总览 RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景: 1. 冷热数据分离:RocketMQ 新近产生的消息会缓存在 page cache 中,我们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。如果有少许消费者尝试消费冷数据就会从硬盘中重新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就可以避免这个问题; 2. 延长消息保留时间:将消息卸载到更大更便宜的存储介质中,可以用较低的成本实现更长的消息保存时间。同时多级存储支持为 topic 指定不同的消息保留时间,可以根据业务需要灵活配置消息 TTL。 RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑: 1. 均摊成本:RocketMQ 多级存储需要将全局 CommitLog 转换为 topic 维度并重新构建消息索引,一次性处理整个 CommitLog 文件会带来性能毛刺; 2. 对小规格实例更友好:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,等待 CommitLog 写满再上传本身就有冷读风险。采取准实时上传的方式既能规避消息上传时的冷读风险,又能尽快使得冷数据可以从多级存储读取。 Quick Start 多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步: 1. 修改 Broker 配置,指定使用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn 2. 配置你想使用的储存介质,以卸载消息到其他硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新储存的文件路径:tieredStoreFilepath 可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储 更多使用说明和配置项可以在 GitHub 上查看多级存储的 技术架构 architecture 接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher 接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。 容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue 容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。 驱动层:TieredFileSegment 驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。 消息上传 RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。 TieredDispatcher TieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段 upload buffer 触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中 上传进度控制 每个队列都会有两个关键位点控制上传进度: 1. dispatch offset:已经写入缓存但是未上传的消息位点 2. commit offset:已上传的消息位点 upload progress 类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息 消息读取 TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略: DISABLE:禁止从多级存储中读取消息; NOT_IN_DISK:不在 DefaultMessageStore 中的消息从多级存储中读取; NOT_IN_MEM:不在 page cache 中的消息即冷数据从多级存储读取; FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。 ${a} 需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。 ${b} TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。 ${c} 预读缓存 TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中 ${d} 预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制: 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,在清理缓存的同时会将下次预读消息量减半。 预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。 某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为: 1. 所有订阅这个 topic 的 group 都访问了缓存 2. 到达缓存过期时间 故障恢复 上文中我们介绍上传进度由 commit offset 和 dispatch offset 控制。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中恢复,继续从 commit offset 开始上传消息,之前缓存的消息会重新上传并不会丢失。 开发计划 面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。 RocketMQ 多级存储希望一方面利用对象存储低成本的优势延长消息存储时间、拓展数据的价值;另一方面利用其共享存储的特性在多副本架构中兼得成本和数据可靠性,以及未来向 Serverless 架构演进。 tag 过滤 多级存储拉取消息时没有计算消息的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端增加 tag 过滤能力。 广播消费以及多个消费进度不同的消费者 预读缓存失效需要所有订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进度不一致的情况下很难触发,导致无用的消息在缓存中堆积。 需要计算出每个 group 的消费 qps 来估算某个 group 能否在缓存失效前用上缓存的消息。如果缓存的消息预期在失效前都不会被再次访问,那么它应该被立即过期。相应的对于广播消费,消息的过期策略应被优化为所有 Client 都读取这条消息后才失效。 和高可用架构的融合 目前主要面临以下三个问题: 1. 元数据同步:如何可靠的在多个节点间同步元数据,slave 晋升时如何校准和补全缺失的元数据; 2. 禁止上传超过 confirm offset 的消息:为了避免消息回退,上传的最大 offset 不能超过 confirm offset; 3. slave 晋升时快速启动多级存储:只有 master 节点具有写权限,在 slave 节点晋升后需要快速拉起多级存储断点续传。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:张森泽
#技术探索 #云原生

2023年1月13日

RocketMQ 集成生态再升级:轻松构建云上数据管道
阿里云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,面向互联网分布式应用场景提供微服务异步解耦、流式数据处理、事件驱动处理等核心能力。其自诞生以来一直为阿里集团提供稳定可靠的消息服务,历经多年双十一万亿级流量洪峰的验证。 随着业务需求场景日渐丰富,在多年经验积累后,阿里云 RocketMQ 也迎来了革命性的更新,正式发布了阿里云消息队列 RocketMQ 版 5.0,在架构、网络、负载均衡、存储等诸多方面进行了显著优化。其定位不再局限于消息解耦场景,将全新布局事件驱动和消息流式处理场景。 阿里云 EventBridge 作为云上事件枢纽一直以来都保持着对云上事件、数据的友好生态支持。随着 RocketMQ 5.0版本的用户日渐增多,EventBridge 在近期对 RocketMQ Connector 进行了全面升级。升级之后的 RocketMQ Connector 不仅可以支持RocketMQ 5.0 版本,同时也能支持云上自建 RocketMQ 实例。除此之外,基于成熟的事件流能力,用户使用 EventBridge 也能轻松构建消息路由能力,实现对灾备、数据同步的需求。 本文将从业务架构和 API 使用等方面讲解如何使用 EventBridge 创建阿里云 RocketMQ 4.0、5.0 版本,开源自建版本以及消息路由的相关任务。 EventBridgeRocketMQ 4.0 业务架构 RocketMQ 4.0 版本使用较为经典的 clientnameserverbroker 架构,整个应用主要由生产者、消费者、NameServer 和 Broker 组成。 Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。 Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。 生产者:与 Name Server 集群中的其中一个节点(随机)建立长连接(Keepalive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长连接,且定时向 Master Broker 发送心跳。 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从  Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。 EventBridge在获取用户授权之后,利用生成的 sts 临时授权对客户的  RocketMQ 实例进行消息读取或写入。 API 使用 在 API 介绍方面,我们以创建「自定义总线自定义事件源」为例,事件目标以及事件流中的API基本一致。 基于 EventBridge 创建 RocketMQ 4.0 任务的 API 和之前基本保持了一致。具体参数如下 版本:代表阿里云消息队列 RocketMQ 版本,可选择 4.x 或 5.x; RocketMQ 实例:RocketMQ 对应的实例 ID。用户在阿里云 RocketMQ控制台每创建一个实例都会有一个对应的实例 ID,如MQ_INST_123456789_BX6zY7ah; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; 消费位点:初始消费位点。可选择最新位点、最早位点、或者指定时间戳。 EventBridgeRocketMQ 5.0 业务架构 RocketMQ 5.0 版将通用的存储逻辑下沉,集中解决消息存储的多副本、低延迟、海量队列分区等技术问题,将上层的消息处理剥离出完全的无状态计算层,主要完成协议适配、权限管理、消费状态、可观测运维体系支持,Broker 则继续专注于存储能力的持续优化。存算分离的架构设计,使得从 SDK 接入到线上运维全链路带来全面提升: 1. 轻量版 SDK 的开放和全链路可观测系统的提升:同时支持 4.x 通信协议和全新的 gRPC 通信协议,并内置 OpenTelemetry 埋点支持,新版本 SDK 新增了 10 余个指标埋点。 2. 消息级负载均衡:新版本 SDK 不再参与实际存储队列的负载均衡,消息负载均衡将更加轻量,以单条消息为调度最小单元。 3. 多网络访问支持:新版本支持单一实例同时暴露公网、内网等访问形式,方便客户多网络接入访问。 4. 海量分级存储:新版本开放分级存储历史消息保存能力,消息低成本无大小限制,最长保存 30 天。冷热数据进行分离设计,极大降低消费历史消息对实例的性能影响。 RocketMQ 5.0 版本 可以支持 VPC 内部安全识别,用户上云无需修改代码。在用户授予 EventBridge 网络和 RocketMQ 相关权限之后,用户在 EventBridge 创建 MQ 5.0 Source&Sink 任务的时,EventBridge 会根据 RocketMQ 5.0 实例的 VPC 信息,调用网络组件获取相应代理信息。MQ sdk 侧通过配置代理实现消息的收发。 API 使用 相比于 4.0 实例,5.0 实例多了 VPC、交换机和安全组 3 个参数。 5.0 实例新增了 VPC 属性,用户需要在对应 vpc 内去访问 MQ 5.0 实例。EventBridge 在获得用户授权之后,也是经由 5.0 实例对应的 VPC 内进行消息的收发。创建任务时前端会自动填充好实例的 vpc 和交换机信息。 安全组参数限制了 EventBridge 在 vpc 内的访问策略,用户可以选择使用已有安全组也可以选择快速创建,让 EventBridge 快速创建一个安全组供任务使用。安全组策略推荐使用默认的安全组策略。使用上推荐第一次在此vpc内创建任务时,使用 EventBridge 自动创建一个安全组,后续在此 VPC 内再创建其他任务时,在使用已有中选择 EventBridge 创建的安全组。 EventBridge自建 Apache RocketMQ 针对用户在阿里云自建 Apache RocketMQ 集群的场景,EventBridge 也支持了消息导出能力。用户通过配置接入点、topic、groupID、VPC 等信息,即可将自建集群中的消息导入 EventBridge,进而对接 EventBridge 目前支持的大量下游生态。 业务架构 抽象来看,EventBridge 访问自建 MQ 实例的链路和阿里云 5.0 版本基本一致,都是从用户 vpc 发起对 MQ 实例的访问。区别在于接入点的不同,前者是用户自建 MQ 集群的nameserver,而后者为阿里云 RocketMQ 提供的接入点,不需要感知真实的 MQ 集群是部署在用户 vpc 还是阿里云 RocketMQ 自身的生产环境。 API 使用 在 API 使用方面,自建集群的大部分参数需要用户手动填入。 接入点:nameserver 地址。后续会支持 proxy 地址; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; FilterType:过滤模式,目前支持 Tag 过滤; 认证模式:如果开启 ACL 鉴权,可在此配置鉴权信息; 消费位点:初始消费位点; VPC:自建 MQ 集群对应的 VPC 参数信息; 交换机:自建 MQ 集群对应的交换机信息; 安全组:EventBridge使用此安全组访问用户自建 MQ 集群,安全组规定了 EventBridge 在此 vpc 内的访问策略。 RocketMQ 消息路由 当用户有灾备或者消息同步的需求时,可能就会需要消息路由能力,即将 A region 下某实例 topic 的消息同步到 B region 的某 topic 中。 对于 EventBridge 而言,消息路由并非单独的一个产品能力,用户通过使用事件流即可实现消息路由。 针对非跨境场景的消息路由,如从北京同步消息到上海,跨 region 网络打通能力由 EventBridge 来实现,用户无需关注过多实现细节。 针对跨境场景,如北京同步消息到新加坡,EventBridge 使用的是公网链路完成对目标实例的写入,使用的是目标 MQ 实例的公网接入点。消息出公网的能力需要用户提供,即需要用户提供 VPC、交换机和安全组配置,此VPC须带有NAT等访问公网能力, EventBridge 使用此 VPC 实现写入目标端公网接入点。 在 API 使用方面,创建消息路由任务本质上是创建事件流,API 参数和上面各类型 RocketMQ 实例任务一致,这里以创建一个青岛到呼和浩特的 RocketMQ 消息路由为例。 1.进入 EventBridge 控制台,regionBar 选择到呼和浩特,点击左侧“事件流”,然后选择“创建事件流”。 2.在事件源页面,事件提供方选择“消息队列 RocketMQ 版”,地域选择青岛,剩余 RocketMQ 相关参数按需求选择。 3.规则页面按需填写,这里选择默认内容。 4.在“目标”页面,服务类型选择“消息队列 RocketMQ 版”,剩余参数按需填写。 5.点击“创建”,等待事件流任务启动即可。 总结 本文介绍了 EventBridge 对接各类型 RocketMQ 实例的基本原理与对应的 API 使用说明,便于已经使用了 RocketMQ 5.0 版本和自建 MQ 实例的用户可以借助 EventBridge 的能力实现事件驱动业务架构的搭建。同时针对灾备和业务消息同步的场景,本文也基于事件流讲解了如何基于 EventBridge 创建 RocketMQ 消息路由任务。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#技术探索 #生态集成

2023年1月6日

RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。 RocketMQ 可观测体系 作为一款典型的分布式中间件产品,RocketMQ 被广泛应用于业务核心链路中,每条消息都关联着核心业务数据的变化。业务链路有其明显的复杂性: 生产者、消费者多对多:业务调用链路网状结构,上下游梳理困难 上下游解耦、异步链路:异步化调用,信息收集不完整 消息是状态数据:未消费成功、定时中等状态增加排查的复杂度 消息链路耦合复杂的业务处理逻辑:无法快速定位问题边界 鉴于消息链路耦合业务系统,复杂带状态,RocketMQ 通过强大的可观测系统和经验支撑,及时发现问题、定位问题、解决问题有助于提升运维效率,对于业务运行是一项重要的保障能力。 RocketMQ 的可观测体系主要由指标(Metrics)、轨迹(Tracing)和日志(Logging)组成。 指标 RocketMQ中定义了详细的Metrics指标,这些指标覆盖生产者、消费者、服务端及消息收发关键接口和流程的统计数据,并支持从实例、Topic和Group等多个维度进行聚合展示,帮助您实时监控消息业务或RocketMQ服务的运行状态。和4.x版本相比,RocketMQ服务端5.x版本增加了消息堆积场景相关指标、关键接口的耗时指标、错误分布指标、存储读写流量等指标,帮助您更好地监控异常场景。 消息轨迹 在分布式应用中,RocketMQ作为全链路中异步解耦的关键服务,提供的Tracing数据可有效将业务上下游信息串联起来,帮助您更好地排查异常,定位问题。和4.x版本相比,RocketMQ服务端5.x版本支持OpenTelemetry开源标准,提供更加丰富的轨迹指标,针对消费场景、高级消息类型场景等细化轨迹内容,为问题定位提供更多关键信息。 日志 RocketMQ为不同的异常情况定义唯一的错误码及错误信息,并划分不同的错误级别,您可以根据客户端返回的错误码信息快速获取异常原因。和4.x版本相比,RocketMQ服务端5.x版本统一了ErrorCode和ErrorMessage,异常日志中增加了RequestID、资源信息,细化了错误信息,保证日志内容明确靠。 RocketMQ 监控告警介绍 RocketMQ 联合阿里云云监控提供了开箱即用且免费的监控报警服务,可帮助您解决如下问题: 实例规格水位监控预警 若您实际使用的指标值超过实例的规格限制,RocketMQ会进行强制限流。提前配置实例规格水位告警可以提前发现规格超限风险并及时升配,避免因限流导致的业务故障。 业务逻辑错误监控预警 您在消息收发时可能会收到异常报错,配置调用错误告警可以提前在业务反馈前发现异常,帮助您提前判断异常来源并及时修复。 业务性能指标监控预警 如果您的消息链路有相关性能指标要求,例如RT耗时、消息延迟等,提前配置业务指标告警可以帮助您提前治理业务风险。 RocketMQ 版提供了丰富的 Metric 指标和告警监控项。各监控项可分为运行水位、收发性能、异常错误事件三类告警。根据大量生产环境实践经验,建议您根据以下原则配置如下告警 接下来重点通过消息堆积和消息收发失败这两个典型场景来阐述基于可观测体系中的指标(Metrics),RocketMQ 如何通过云监控创建监控规则,将关键的 Metrics 指标作为告警项,帮助您自动监控服务的运行状态,并自动发送报警通知, 便于您及时预警服务的异常信息,提高运维效率。 应用场景1:消息堆积问题 消息堆积指标及监控配置 业界通用指标:使用消息堆积量(ready + inflight)来度量消费健康度,表示未处理完成的消息量;部分产品额外增加已就绪消息量来度量消息拉取的及时性;使用上述 2 个指标直接来配置报警有以下缺点: 有误报或无法触发报警的问题 及时性的间接指标,不直观 RocketMQ 指标:额外支持延时时间来度量消费健康度,涵盖了所有业务场景,根据业务容忍延迟度直接配置时间告警阈值。 消息处理延迟时间:表示业务处理完成及时度 已就绪消息排队时间:表示拉取消息及时度 建议对消息堆积敏感的用户,都在 RocketMQ 实例页的监控报警,添加如下报警指标,并设置符合业务需求的阈值。 如何定位和处理堆积问题 假如收到堆积报警,确认消息出现堆积情况,可参考以下措施进行定位和处理。 1. 判断消息堆积在 RocketMQ 服务端还是客户端 查看客户端本地日志文件 ons.log,搜索是否出现如下信息:the cached message count exceeds the threshold 出现相关日志信息,说明客户端本地缓冲队列已满,消息堆积在客户端,请执行步骤2。 若未出现相关日志,说明消息堆积不在客户端,若出现这种特殊情况,请直接提交工单联系阿里云技术支持。 2. 确认消息的消费耗时是否合理 若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑,请执行步骤3。 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。 消息的消费耗时可以通过以下方式查看: 查看消费者状态,在客户端连接信息中查看业务处理时间,获取消费耗时的平均值。 3. 查看客户端堆栈信息。只需要关注线程名为 ConsumeMessageThread 的线程,这些都是业务消费消息的逻辑。 客户端堆栈信息可以通过以下方式获取:查看消费者状态,在客户端连接信息中查看 Java 客户端堆栈信息 使用 Jstack 工具打印堆栈信息。 常见的异常堆栈信息如下: 消费逻辑有抢锁休眠等待等情况。消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。 示例一: 消费逻辑操作数据库等外部存储卡住。消费线程阻塞在外部的 HTTP 调用上,导致消费缓慢。 示例二: 4. 针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以跳过不消费,您可以通过重置消费位点跳过这些堆积的消息从最新位点开始消费,快速恢复业务。 如何避免消息堆积 为了避免在业务使用时出现非预期的消息堆积和延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。 梳理消息的消费耗时通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息: 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。 消息消费逻辑中的 I/O 操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。外部 I/O 操作通常包括如下业务逻辑: 读写外部数据库,例如 MySQL 数据库读写。 读写外部缓存等系统,例如 Redis 读写。 下游系统调用,例如 Dubbo 调用或者下游 HTTP 接口调用。 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。 设置消息的消费并发度 逐步调大线程的单个节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。 应用场景2:消息收发失败问题 消息收发的核心流程 从上图中可以看出消息收发都要先从 NameServer 返回路由,再通过 broker 的鉴权以及实例规格是否超限的判断,才能进行正常收发消息。根据经验检消息收发失败的原因有如下情况: API 请求频率是否超过实例规格限制 查网络是否正常 服务端是否是有重启造成的短期收发失败 操作资源是否有权限 常见的消息收发失败异常 在无论开发阶段还是生产运行阶段,遇到收发失败问题,我们都可以从客户端日志出发进行排查。以下列举下常见的消息收发失败异常场景: 1. 在客户端日志中出现ClusterName consumer groupId consumer topic messages flow control, flow limit threshold is , remainMs 异常信息 原因:RocketMQ 每个实例都明确了消息收发 API 调用 TPS,例如,标准版实例支持每秒 5000 次 API 调用,若实例消息收发 API 调用频率超过规格限制,会导致实例被限流。实例被限流后,导致部分消息收发请求失败。 建议措施: 1. 配置实例 API 调用频率监控告警 建议设置为规格上限的 70%。例如,您购买的实例消息收发 TPS 上限为 10000,则告警阈值建议设置为 7000。 1. 配置限流次数告警 RocketMQ 支持将指定实例触发限流的事件作为监控项,通过对限流次数的监控,可以帮助您了解当前业务的受损情况。 2. 在客户端日志中出现RemotingConnectException: connect to failed 或者 RemotingTimeoutException 等异常信息。 可能有如下原因: MQ 服务升级过程中 , 会出现短暂的网络闪断,查看官网公告看是否在服务升级窗口 检查应用服务器到broker的网络是否通畅,是否有网络延迟 检查应用的网络带宽情况,是否被打满 确认下应用是否出现 FGC 现象,FGC 会造成一定的网络延迟 3. 在客户端日志当中出现 system busy, start flow control for a while 或者 broker busy, start flow control for a while等异常信息。 可能原因:共享集群 broker(出现网络,磁盘,IO 等抖动)压力大,造成消息收发出现排队现象;若是偶尔短暂抖动,此类错误 SDK 会自动重试,但建议在自己的业务代码做好异常处理,当自动重试次数超限仍失败情况下,业务根据需要做好容灾。若长时间持续出现,可以提工单让技术人员跟进排查。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:合伯
#技术探索 #可观测

2022年12月21日

消息收发弹性——生产集群如何解决大促场景消息收发的弹性&降本诉求
产品介绍—什么是消息收发弹性 大家好,我是来自阿里云云原生消息团队的赖福智,花名宸罡,今天来给大家分享下阿里云 RocketMQ5.0 实例的消息弹性收发功能,并且通过该功能生产集群是如果解决大促场景消息收发的弹性以及降本诉求的。 阿里云弹性策略 本次将会从产品介绍,产品使用及限制,使用方式及演示三个方面来介绍。在介绍 Rocketmq5.0 实例的消息首发弹性之前,先从整体上看下阿里云的弹性策略。我们通常认为业务方往往存在预期外的突发业务热点和毛刺流量,常规扩容无法及时应对,这样一来服务会有不确定性的风险。因此为了应对突发流量,我们设计了一套处理机制,最基本的是要满足规格内的预期流量,然后是应对弹性区间内的突发流量可以随时开启的弹性能力,最后是要有对完全超过弹性上限流量的限流限流能力。针对弹性区间的突发流量,传统自建集群通过常规扩容方式应对,需要分钟级的处理时间,在这段时间内业务会受损,并且为了这部分偶尔的突发流量扩容到一个较大的规格并不划算。云上5.0实例的消息收发弹性能力对弹性区间内的突发流量可以做到秒级响应,针对大促这种预期内的短期突发流量可以按量收费更加实惠,仅当用户真正用到这部分弹性能力才收费。 消息收发弹性简介 接下来我们就看具体看下 5.0 实例的消息收发弹性,消息收发弹性最直观的感受就是在 5.0 实例的详情页面的自适应弹性 TPS 这部分,可以看到在正常消息收发 TPS 的旁边额外有一个自适应弹性 TPS。通过这部分弹性 TPS 的设置,用户可以快速、低成本的应对大促这种短时间突发流量的场景。 这时可能有小伙伴会问为什么我不直接升级规格提高标准收发 TPS,而是使用弹性 TPS 呢?让我们假设一个典型的大促场景,比如在今晚 0 点有大促活动,使用消息弹性功能的用户完全可以提前几天就把弹性功能打开,大促结束等流量恢复后再把弹性功能关闭,实际上不关闭也不会有什么问题,不使用则不收费。 如果通过升级规格来提升标准 TPS 应对大促流量,用户同样是提前几天就把规格升高了,那么在大促前这几天按照高规格收费但实际又跑不到高规格的 TPS,实际上花了更多的钱但是确造成了资源的浪费。如果用户为了避免资源浪费在大促当天 0 点前升级规格,一个是需要用户付出额外的精力来关注 RocketMQ 按时升配,再就是实例的升配是一个重资源操作,扩容耗时长,无法做到即开即用秒级生效,很有可能已经到 0 点了但是升配还没有完成。 使用消息弹性功能的话可以做到秒级生效开箱即用,并且如果没有使用到这部分额外的弹性 TPS 是不会收费的。但是弹性 TPS 也不是个解决问题的万能银弹,也是有上限的,基本上可以在规格标准 TPS 的基础上额外有一半的弹性 TPS,如果标准 TPS+ 弹性 TPS 仍然无法满足用户业务需求,此时意味着仅扩容弹性节点已经无法满足需求,同时需要扩容存储节点,所以需要升配规格,这部分的原理后面会详细解释。 也有用户会问,如果我的日常 TPS 在 2500 左右,可不可以购买一个 2000 标准 TPS 的实例并且一直开着 1000 的弹性 TPS 满足需求呢?这种情况我们建议直接使用标准 TPS 大于 2500 的实例,因为弹性 TPS 这部分的使用会额外计费,如果一天 24 小时都在大量使用弹性 TPS,从计费上来说直接使用更高规格的实例更实惠。 5.0 实例消息收发弹性的实现方式,和传统自建方式的对比 接下来我们看下阿里云 RocketMQ5.0 实例是怎么实现消息收发弹性的,并且在扩容场景和自建 RocketMQ 有什么优势。传统自建 RocketMQ 集群如左图显示,是一个存储计算不分离的架构,这种架构下 Broker 是一个很重的组件,因为它同时需要处理客户端的请求,也要负责数据的读取写入,Broker 同时负责计算和存储。作为一个有状态的节点,Broker 扩容是一个很重的操作,时间会很慢,而且在很多时候我们并不需要扩容存储能力,仅仅需要应对高 TPS 请求的计算能力,此时随着 Broker 扩容的存储扩容实际上被浪费了。 再来看下 RocketMQ5.0 实例消息收发弹性是怎么做的,首先 5.0 实例的架构是存储计算分离的模式,用户的客户端仅会请求计算层的计算节点,计算节点操作存储节点读写消息,客户端并不会直接访问存储节点。开启消息收发弹性功能意味着开启了计算层的弹性能力。得益于这种存储计算分离的架构,可以让我们快速低成本的扩容计算层节点,计算层节点作为无状态应用可以做到秒级扩容,十分便捷。而且在云厂商拥有大量资源池的前提下可以做到资源的弹性扩容。可以说 RocketMQ5.0 实例的消息收发弹性能力依赖于阿里云作为云厂商的弹性能力和存算分离的技术方案得以实现。 在大促这种短时间大流量的场景下,大部分都是不需要扩容存储节点的,此时就可以通过开通消息收发弹性的能力满足需求。 产品使用及限制:消息收发弹性的使用及限制 支持版本 消息收发弹性的功能仅在专业版和铂金版支持,标准版实例不支持,并且专业版的单机规格作为给用户使用的测试版本也不支持。 弹性上限 不同规格实例的弹性 TPS 上限不同,基本上在标准 TPS 的基础上额外有一半的弹性 TPS,下图所示为专业版的弹性 TPS 上限。受篇幅所限,其他规格的弹性上限可以参考官方文档,就不再列出了。 计费方式 弹性 TPS 是额外计费的,计费周期按小时计费,不足 1 小时,按 1 小时计算。计费方式为超过限制的 TPS× 使用时长(小时)× 弹性规格单价(元/TPS/小时)弹性规格单价如下图,不同地域的单价会有略微差异。 SLA 可能有小伙伴会担心使用到这部分额外的弹性 TPS 会不会有问题,毕竟这部分是在标准 TPS 之上额外的能力,有一种自己实例超负荷运转的感觉。这个是完全不用担心的,不同规格的弹性上限已经经过压测验证,和规格标准 TPS 享受一样的稳定性 SLA 保证。 使用方式及演示:结合业务场景的最佳使用方式 开启方式、生效时间、收发比例 最后我们来实际操作下开启弹性收发能力并且验证该功能。RocketMQ5.0实例依然支持使用 RocketMQ4.0 实例的 1.x 客户端访问,所以这里分别提供了 1.x 客户端和 5.x 客户端的测试代码实例。 该程序开启了 200 个线程的线程池通过 ratelimiter 根据输入参数设置每秒最大的发送消息条数,打印失败的原因,并且每秒统计成功发送的消息量 在这里我已经提前购买好了一个专业版的实例,默认是不会开启消息收发弹性能力的。我们可以点击这里的开启弹性按钮进入实例修改页面开启弹性功能。这里要注意的是开启之后的弹性 TPS 依然受实例整体的消息收发占比设置,用户可以根据自己的消息收发场景设置该比例。 再开启之前我们来尝试下每秒发送 2300 个消息会怎么样,可以看到已经被限流了,并且每秒成功发送的量要比 2000 多一些。接着我们将弹性开启,并且将默认的收发比 1:1 改为 4:5,这是修改后的实例状态,现在让我们继续每秒发送 2300 个消息来验证下,可以看到已经都成功发送了。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:宸罡
#技术探索

2022年11月30日

RocketMQ 5.0 可观测能力升级:Metrics 指标分析
从消息的生命周期看可观测能力 在进入主题之前先来看一下 RocketMQ 生产者、消费者和服务端交互的流程: message produce and consume process RocketMQ 的消息是按照队列的方式分区有序储存的,这种队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势,尤其是在流式处理场景下能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 接下来我们来看一下消息的整个生命周期中需要关注的重要节点: message life cycle 首先是消息发送:发送耗时是指一条消息从生产者开始发送到服务端接收到并储存在硬盘上的时间。如果是定时消息,需要到达指定的定时时间才能被消费者可见。 服务端收到消息后需要根据消息类型进行处理,对于定时/事务消息只有到了定时时间/事务提交才对消费者可见。RocketMQ 提供了消息堆积的特性,即消息发送到服务端后并不一定立即被拉取,可以按照客户端的消费能力进行投递。 从消费者的角度上看,有三个需要关注的阶段: 拉取消息:消息从开始拉取到抵达客户端的网络和服务端处理耗时; 消息排队:等待处理资源,即从消息抵达客户端到开始处理消息; 消息消费:从开始处理消息到最后提交位点/返回 ACK。 消息在生命周期的任何一个阶段,都可以清晰地被定义并且被观测到,这就是 RocketMQ 可观测的核心理念。而本文要介绍的 Metrics 就践行了这种理念,提供覆盖消息生命周期各个阶段的监控埋点。借助 Metrics 提供的原子能力我们可以搭建适合业务需要的监控系统: 日常巡检与监控预警; 宏观趋势/集群容量分析; 故障问题诊断。 RocketMQ 4.x Metrics 实现 – Exporter RocketMQ 团队贡献的 RocketMQ exporter 已被 Prometheus 官方的开源 Exporter 生态所收录,提供了 Broker、Producer、Consumer 各个阶段丰富的监控指标。 exporter metrics spec Exporter 原理解析 RocketMQ expoter 获取监控指标的流程如下图所示,Expoter 通过 MQAdminExt 向 RocketMQ 集群请求数据。获取的数据转换成 Prometheus 需要的格式,然后通过 /metics 接口暴露出来。 rocketmq exporter 随着 RocketMQ 的演进,exporter 模式逐渐暴露出一些缺陷: 无法支持 RocketMQ 5.x 中新加入的 Proxy 等模块的可观测需求; 指标定义不符合开源规范,难以和其他开源可观测组件搭配使用; 大量 RPC 调用给 Broker 带来额外的压力; 拓展性差,增加/修改指标需要先修改 Broker 的 admin 接口。 为解决以上问题,RocketMQ 社区决定拥抱社区标准,在 RocketMQ 5.x 中推出了基于 OpenTelemtry 的 Metrics 方案。 RocketMQ 5.x 原生 Metrics 实现 基于 OpenTelemtry 的 Metrics OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题,提供与三方 vendor 无关的服务。 在讨论新的 Metrics 方案时 RocketMQ 社区决定遵守 OpenTelemetry 规范,完全重新设计新 metrics 的指标定义:数据类型选用兼容 Promethues 的 Counter、Guage、Histogram,并且遵循 Promethues 推荐的指标命名规范,不兼容旧有的 rocketmqexporter 指标。新指标覆盖 broker、proxy、producer、consumer 等各个 module,对消息生命周期的全阶段提供监控能力。 指标上报方式 我们提供了三种指标上报的方式: Pull 模式:适合自运维 K8s 和 Promethues 集群的用户; Push 模式:适合希望对 metrics 数据做后处理或接入云厂商的可观测服务的用户; Exporter 兼容模式:适合已经在使用 Exporter 和有跨数据中心(或其他网络隔离环境)传输 metrics 数据需求的用户。 Pull Pull 模式旨在与 Prometheus 兼容。在 K8s 部署环境中无需部署额外的组件,prometheus 可以通过社区提供的 K8s 服务发现机制(创建 PodMonitor、ServiceMonitor CDR)自动获取要拉取的 broker/proxy 列表,并从他们提供的 endpoint 中拉取 metrics 数据。 pull mode Push OpenTelemetry 推荐使用 Push 模式,这意味着它需要部署一个 collector 来传输指标数据。 push mode OpenTelemetry 官方提供了 collector 的实现,支持对指标做自定义操作如过滤、富化,可以利用社区提供的插件实现自己的 collector。并且云厂商提供的可观测服务(如 AWS CloudWatch、阿里云 SLS)大多已经拥抱了 OpenTelemetry 社区,可以直接将数据推送到它们提供的 collector 中,无需额外的组件进行桥接。 OpenTelemetry collector 兼容 RocketMQ Exporter 新的 Metrics 也提供对 RocketMQ Exporter 的兼容,现在使用 exporter 的用户无需变更部署架构即可接入新 Metrics。而且控制面应用(如 Promethues)和数据面应用(如 RocketMQ)有可能隔离部署。因此借助 Exporter 作为代理来获取新的 Metrics 数据也不失为一种好的选择。 RocketMQ 社区在 Exporter 中嵌入了一个 OpenTelemetry collector 实现,Broker 将 Metrics 数据导出到 Exporter,Exporter 提供了一个新的 endpoint(下图中的 metricsv2)供 Prometheus 拉取。 exporter mode 构建监控体系最佳实践 丰富的指标覆盖与对社区标准的遵循使得可以轻而易举的借助 RocketMQ 的 Metrics 能力构建出适合业务需求的监控体系,这个章节主要以一个典型的流程介绍构建监控体系的最佳实践: 集群监控/巡检 触发告警 排查分析。 集群状态监控与巡检 我们将指标采集到 Promethues 后就可以基于这些指标配置监控,这里给出一些示例: 接口监控: 监控接口调用情况,可以据此快速抓出异常的请求对症下药 下图给出一些相关示例:所有 RPC 的耗时(avg、pt90、pt99 等)、成功率、失败原因、接口调用与返回值分布情况等。 rpc metrics 客户端监控: 监控客户端的使用情况,发现非预期的客户端使用如超大消息发送、客户端上下线、客户端版本治理等。 下图给出一些相关示例:客户端连接数、客户端语言/版本分布、发送的消息大小/类型分布。 client metrics Broker 监控: 监控 Broker 的水位和服务质量,及时发现集群容量瓶颈。 下图给出一些相关示例:Dispatch 延迟、消息保留时间、线程池排队、消息堆积情况。 broker metrics 以上的示例只是 Metrics 的冰山一角,需要根据业务需要灵活组合不同的指标配置监控与巡检。 告警配置 有了完善的监控就可以对需要关注的指标配置告警,比如可以配置 Broker 监控中 Dispatch 延迟这个指标的告警: broker alert 收到告警后可以联动监控查看具体原因,关联发送接口的失败率可以发现有 1.7% 的消费发送失败,对应的报错是没有创建订阅组: promblem analysis 问题排查分析 最后以消息堆积这个场景为例来看一下如何基于 Metrics 分析线上问题。 从消息生命周期看堆积问题 正如本文开篇所述,排查 RocketMQ 的问题需要结合消息的生命周期综合分析,如果片面的认定是服务端/客户端的故障未免会误入歧途。 对于堆积问题,我们主要关注消息生命周期中的两个阶段: 就绪消息:就绪消息是可供消费但还未被拉取的消息,即在服务端堆积的消息; 处理中消息:处理中的消息是被客户端拉取但是还未被消费的消息。 consume lag 多维度指标分析堆积问题 对于堆积问题,RocketMQ 提供了消费延迟相关指标 rocketmq_consumer_lag_latency 可以基于这个指标配置告警。告警的阈值需要根据当前业务对消费延迟的容忍程度灵活指定。 触发告警后就需要对消息堆积在还是就绪消息和处理中消息进行分析,RocketMQ 提供了 rocketmq_consumer_ready_messages 和 rocketmq_consumer_inflight_messages 这两个指标,结合其他消费相关指标与客户端配置综合分析即可判断出消息堆积的根因: case 1:就绪消息持续上涨,处理中消息达到客户端堆积上限 这是最常见的堆积场景,客户端处理中的消息量 rocketmq_consumer_inflight_messages 达到了客户端配置的阈值,即消费者的消费能力低于消息发送量。如果业务要求尽可能实时的消费消息就需要增加消费者机器数量,如果业务对消息延迟不是很敏感可以等待业务高峰过去后再消化堆积的消息。 case 2:就绪消息几乎为 0,处理中消息持续上涨 这个 case 多出现在使用 RocketMQ 4.x 客户端的场景,此时消费位点是顺序提交的,如果某条消息的消费卡住会导致位点无法提交。看起来的现象是消息在客户端大量堆积,即处理中消息持续上涨。可以结合消费轨迹和 rocketmq_process_time 这个指标抓出消费慢的消息分析上下游链路,找到根因优化消费逻辑。 case 3: 就绪消息持续上涨,处理中消息几乎为 0 此种场景说明客户端没有拉取到消息,一般有如下几种情况: 鉴权问题:检查 ACL 配置,如果使用公有云产品请检查 AK、SK 配置; 消费者 hang 住:尝试打印线程堆栈或 gc 信息判断是否是进程卡死; 服务端响应慢:结合 RPC 相关指标查看拉取消息接口调用量与耗时、硬盘读写延迟。检查是否为服务端问题,如硬盘 IOPS 被打满了等等。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:玄珏
#技术探索 #可观测