2024年10月21日

基于 Apache RocketMQ 的 ApsaraMQ Serverless 架构升级
_本文整理于 2024 年云栖大会阿里云智能集团高级技术专家金吉祥(牟羽)带来的主题演讲《ApsaraMQ Serverless 能力再升级,事件驱动架构赋能 AI 应用》_ 云消息队列 ApsaraMQ 全系列产品 Serverless 化,支持按量付费、自适应弹性、跨可用区容灾,帮助客户降低使用和维护成本,专注业务创新。那 ApsaraMQ 是如何一步一步完成 Serverless 能力升级的?在智能化时代,我们的事件驱动架构又是如何拥抱 AI、赋能 AI 的? 本次分享将从以下四个方面展开: + 首先,回顾 ApsaraMQ Serverless 化的完整历程,即 ApsaraMQ 从阿里内部诞生、开源,到捐赠给 Apache 进行孵化,再到完成 Serverless 化升级的不断突破、与时俱进的过程。 + 然后,重点介绍 ApsaraMQ 的存算分离架构,这是全面 Serverless 化进程中不可或缺的前提。 + 接下来,会从技术层面重点解读近一年 ApsaraMQ Serverless 能力的演进升级,包括弹性能力的升级、基于 RDMA 进一步降低存储和计算层之间的 CPU 开销,以及对无感扩缩容的优化。 + 最后,介绍我们在面向 AI 原生应用的事件驱动架构上的探索,以及基于阿里云事件总线定向更新向量数据库,为 AI 应用融入实时最新数据的最佳实践。 ApsaraMQ Serverless 化历程 首先,我们来看 ApsaraMQ Serverless 化的整个发展历程。 + 2012 年,RocketMQ 在阿里巴巴内部诞生,并将代码在 Github 上开源; + 2016 年,云消息队列 RocketMQ 版开启商业化的同时,阿里云将 RocketMQ 捐赠给了 Apache,进入孵化期; + 2017 年,RocketMQ 以较短的时间孵化成为 Apache TLP,并快速迭代新功能特性,顺利发布 4.0 版本,支持了顺序、事务、定时等特殊类型的消息; + 2018 年,随着大数据、互联网技术的发展,数据量爆发式增长,云消息队列 Kafka 版商业化发布; + 2019 年,云消息队列 RabbitMQ 版、云消息队列 MQTT 版两款产品商业化发布,补齐了 ApsaraMQ 在 AMQP、MQTT 协议适配上的缺失; + 2021 年,经过一段时间的孵化,ApsaraMQ 家族中的另一款产品事件总线 EventBridge 开始公测,开始融合事件、消息、流处理; + 2023 年,ApsaraMQ 全系列产品依托存算分离架构,完成 Serverless 化升级,打造事件、消息、流一体的融合型消息处理平台; + 今年,我们专注提升核心技术能力,包括秒级弹性、无感发布、计算存储层之间的 RDMA 等,实现 Serverless 能力的进一步升级,并结合当下客户需求,通过事件驱动架构赋能 AI 原生应用。 存算分离架构全景 第二部分,我们来看 ApsaraMQ 存算分离架构的全景,这是 Serverless 化升级不可或缺的前序准备。 ApsaraMQ 存算分离架构全景:云原生时代的选择 ApsaraMQ 的存算分离架构,是云原生时代的选择。 + 从下往上看,这套架构是完全构建在云原生基础之上的,整个架构 K8s 化,充分利用 IaaS 层的资源弹性能力,同时也基于 OpenTelemetry 的标准实现了metrics、tracing 和 logging 的标准化。 + 再往上一层是基于阿里云飞天盘古构建的存储层,存储节点身份对等,Leaderless 化,副本数量灵活选择,可以在降低存储成本和保证消息可靠性之间实现较好的平衡。 + 再往上一层是在本次架构升级中独立抽出来的计算层,即无状态消息代理层,负责访问控制、模型抽象、协议适配、消息治理等。比较关键的点是,它是无状态的,可独立于存储层进行弹性。 + 在用户接入层,我们基于云原生的通信标准 gRPC 开发了一个全新的轻量级 SDK,与之前的富客户端形成了很好的互补。 “Proxy” 需要代理什么? 接下来我们重点看下这套架构里的核心点,即独立抽出来的 Proxy。 它是一层代理,那到底需要代理什么呢? 在之前的架构中,客户端与 Broker/NameServer 是直连模式,我们需要在中间插入一个代理层,由原先的二层变成三层。然后,将原先客户端侧部分业务逻辑下移,Broker、Namesrv 的部分业务逻辑上移,至中间的代理层,并始终坚持一个原则:往代理层迁移的能力必须是无状态的。 从这张图中,我们将原先客户端里面比较重的负载均衡、流量治理(小黑屋机制)以及 Push/Pull 的消费模型下移至了 Proxy,将原先 Broker 和 Namesrv 侧的访问控制(ACL)、客户端治理、消息路由等无状态能力上移至了 Proxy。然后在 Proxy 上进行模块化设计,抽象出访问控制、多协议适配、通用业务能力、流量治理以及通用的可观测性。 ApsaraMQ 存算分离的技术架构 接下来看 ApsaraMQ 存算分离的技术架构,在原先的架构基础上剥离出了无状态 Proxy 组件,承接客户端侧所有的请求和流量;将无状态 Broker,即消息存储引擎和共享存储层进行剥离,让存储引擎的职责更加聚焦和收敛的同时,充分发挥共享存储层的冷热数据分离、跨可用区容灾的核心优势。 这个架构中无状态 Proxy 承担的职责包括: 1. 多协议适配: 能够识别多种协议的请求,包括 remoting、gRPC 以及 Http 等协议,然后统一翻译成 Proxy 到 Broker、Namesrv 的 remoting 协议。 2. 流量治理、分发: Proxy 具备按照不同维度去识别客户端流量的能力,然后根据分发规则将客户端的流量导到后端正确的 Broker 集群上。 3. 通用业务能力扩展: 包含但不限于访问控制、消息的 Tracing 以及可观测性等。 4. Topic 路由代理: Proxy 还代理了 Namesrv 的 TopicRoute 功能,客户端可以向 Proxy 问询某个 topic 的详细路由信息。 5. 消费模型升级: 使用 Pop 模式来避免单客户端消费卡住导致消息堆积的历史问题。 无状态 Broker,即消息存储引擎的职责更加的聚焦和收敛,包括: 1. 核心存储能力的迭代: 专注消息存储能力的演进,包括消息缓存、索引的构建、消费状态的维护以及高可用的切换等。 2. 存储模型的抽象: 负责冷热存储的模型统一抽象。 共享存储为无状态 Broker 交付了核心的消息读写的能力,包括: 1. 热存储 EBS: 热存储 EBS,提供高性能、高可用存储能力。 2. 冷存储 OSS: 将冷数据卸载到对象存储 OSS,获取无限低成本存储空间。 3. 跨可用区容灾: 基于 OSS 的同城冗余、Regional EBS 的核心能力构建跨可用区容灾,交付一写多读的消息存储能力。 基于云存储的存算分离架构,兼顾消息可靠性和成本 存算分离架构中的存储层是完全构建在阿里云飞天盘古存储体系之上的。基于这套架构,我们能够灵活控制消息的副本数量,在保证消息可靠性和降低存储成本之间做到一个比较好的平衡。 左图是存算分离存储架构中上传和读取的流程。可以看到,我们是在 CommitLog 异步构建 consumeQueue 和 Index 的过程中额外添加了按照 topic 拆分上传到对象存储的过程;在读取过程中智能识别读取消息的存储 Level,然后进行定向化读取。 基于云存储的架构,我们提供了 ApsaraMQ 的核心能力,包括: 1. 超长时间定时消息: 超过一定时间的定时消息所在的时间轮会保存至对象存储上,快到期时时间轮再拉回到本地进行秒级精准定时。 2. 集群缩容自动接管: 消息数据实时同步到对象存储,对象存储的数据能够动态挂载到任意 Broker,实现彻底存算分离,Broker 的无状态化。 3. 按需设置 TTL 成本优化: 支持按需设置消息 TTL,不同重要程度的消息可设置不同的 TTL,满足消息保存时长需求的同时兼顾成本控制。 4. 冷热消息分离、分段预取: 热数据的读取命中本地存储,速度快;冷数据的读取命中远端存储,速率恒定且不会影响热数据的读取。 5. 动态调控云存储的比例: 动态调控 EBS 和 OSS 的比例,大比例的 EBS 能够具备更高的稳定性,应对 OSS 负载过高的背压,提升 EBS 作为 OSS 的前置容灾的能力;小比例的 EBS 则是可以最大化降低消息单位存储成本,让整体的存储成本趋同于 OSS 存储成本。 Serverless 能力再升级 有了存算分离架构的铺垫,ApsaraMQ 全系列产品 Serverless 化就更加顺畅了。接下来展开介绍 ApsaraMQ Serverless 能力的升级。 消息场景下的 Serverless 化 在消息场景下通常会有两个角色:消息服务的使用方和提供方。 在传统架构中通常是这样的流程:使用方会给提供方提需求:我要大促了,你保障下;提供方说给你部署 10 台够不够,使用方说好的;结果真到大促那一刻,真实流量比预估的要大很多倍,整个消息服务被击穿,硬生生挂了半小时。 在 Serverless 化改造前,仍需提前规划容量;但相比传统架构的提升点是,依托 IaaS 层的快速扩容能力,能够极大缩短扩容时间;再演进到当前的 Serverless 架构,我们期望消息服务提供方是能够非常淡定地应对大促。 Serverless 现在已经成为了一个趋势和云的发展方向。ApsaraMQ 全线产品实现弹性灵活的 Serverless 架构,既彰显了技术架构的先进性,也提升了客户的安全感。业务部门减少了容量评估的沟通成本,用多少付多少,更省成本;运维部门免去了容量规划,实现自动弹性扩缩,降低运维人员投入的同时,大大提升了资源的利用率。 Serverless 方案的 Why / What / How Serverless 化预期达到的收益是:省心——秒级弹性,免容量规划;省钱——用多少付多少;省力——少运维、免运维。 要解决的痛点是:用户使用容量模型比较难做精准预估;运维侧手动扩容,是一个非常耗时耗力的过程。 核心目标是: + 弹性要快: 特别是针对一些脉冲型的秒杀业务,需要具备秒级万 TPS 的弹性能力。 + 缩容无感: 应对 MQ 客户端与服务侧 TCP 长连接的特性,缩容、服务端发布时要无感。 + 成本要低: 极致的性能优化,才能进一步降低用户侧的单位 TPS 成本。 通过如下几个关键路径构建 ApsaraMQ Serverless 核心能力: + 多租、安全隔离、业务流量隔离: 是构建 Serverless 核心能力的基础。 + 物理预弹&逻辑弹性: 物理预弹和逻辑弹性结合的极致弹性方案,通过镜像加速、元数据批量创建、主动路由更新等核心技术加速物理弹性,结合逻辑弹性最终交付秒级万 TPS 的极致弹性能力。 + RDMA: 在存储和计算层之间引入 RDMA 技术,充分利用 RDMA 的零拷贝、内核旁路、CPU 卸载等特性进一步降低计算层与存储层之间的通信开销。 + 优雅上下线: 依托 gRPC 协议的 GOAWAY 以及 Remoting 协议的扩展实现 TCP 长连接的优雅上下线。 + 控制资源水位: 通过智能化的集群流量调度以及平滑 Topic 迁移方案,进一步控制单个集群的资源水位在一个合理的值。 这套 Serverless 方案可以同时满足如下几种场景: + 第一种是平稳型:流量上升到一定水位后会平稳运行。 + 第二种是稳中有“进”型:流量平稳运行的同时,偶尔会有一些小脉冲。 + 第三种是周期性“脉冲型”:流量会周期性地变化。 计算、存储与网络:充分利用云的弹性能力 我们前面也有提到,这套架构是完全构建在云原生基础设施之上的,我们在计算、存储、网络上都充分利用了云的弹性能力,具体来看: + 在计算侧,通过 K8s 充分利用 ECS 的弹性能力,通过弹性资源池、HPA 等核心技术支持计算层的快速弹性,并通过跨可用区部署提升可用性。 + 在存储侧,充分利用飞天盘古存储的弹性能力,支持自定义消息的存储时长,冷热数据分离,额外保障冷读的 SLA。 + 在网络侧,公网随开随用,安全和方便兼具,支持多种私网形态接入,并基于 CEN 构建全球互通的消息网络。 在这之上,结合 SRE 平台的智能集群流量调度、集群水位监控、物理预弹性等核心能力,最终交付了一套完整的 ApsaraMQ Serverless 化物理弹性能力。 秒级万 TPS 的极致弹性能力保障 依托于上面的基础物理资源的弹性能力,来看下我们是如何保障秒级万 TPS 的极致弹性能力的? 从两个维度来看用户视角的弹性: + 从限流维度看: 在无损弹性上限以内的 TPS,都不会触发限流;超过无损弹性 TPS 上限后,会有秒级别的限流,通过逻辑弹性调整实例级别的限流阈值后,即可实现最终的 TPS 弹性。 + 从付费角度看: 在预留规格内按规格进行预付费;超过预留规格的上限 TPS,超过部分按量付费,即用多少付多少。 为了满足用户视角的秒级弹性能力,我们通过物理弹性和逻辑弹性相结合的方式来进行保障: + 物理弹性是预弹的机制,弹性时间在分钟级,用户侧无任何感知,由服务侧来 Cover 成本。 + 逻辑弹性是实时生效的,弹性时间在秒级,同时在触发无损弹性 TPS 上限时,用户侧是会有秒级别的限流感知的,另一方面,用户是需要为弹出来的那部分流量进行按量付费的。 两者的关系是:物理弹性是逻辑弹性的支撑,逻辑弹性依赖物理弹性。从弹性流程上看,当用户的流量触发无损弹性上限时,优先判断物理资源是否充足,充足的话就进行秒级逻辑弹性,不充足的话就等待物理资源扩容后再进行逻辑弹性。当然这里会有个预弹的机制,尽量保障逻辑弹性时物理资源都是充足的。 从物理弹性加速来看,通过以下几个方面进行加速: + 计算、存储层按需弹性: 计算层更关注 CPU、客户端连接数等核心指标;存储层更关注内存、磁盘 IO 等性能指标。 + 镜像下载加速: 通过 PlaceHolder + 镜像缓存组件加速新节点的扩容。 + 新增元数据批量创建的机制: 新增存储节点创建 5000 级别的 Topic 下降至 3 秒。 + 新增主动路由更新机制: 降低存储节点物理扩容后承接读写流量的延迟。 我们的系统设计精密,旨在确保用户体验到极致的弹性能力,特别是实现每秒万次事务处理(TPS)的秒级弹性扩展。这一能力的保障策略围绕两个核心维度展开,并深度融合物理与逻辑弹性机制,确保在高吞吐需求下的无缝响应与成本效率。 ApsaraMQ on RDMA:降低计算与存储层之间通信开销 RDMA(全称远程内存直接访问)是一种高性能的网络通信技术,相比 TCP/IP 的网络模式,具有零拷贝、内核旁路、CPU 卸载等核心优势。ApsaraMQ Serverless 化架构具备存算分离的特点,非常适合在计算层和存储层之间引入 RDMA 技术,进一步降低内部组件之间的数据通信开销。 具体来看,计算层 Proxy 在接收到客户端各种协议的消息收发请求以后,通过内置的 Remoting Client 和存储层 Broker 进行数据交换。在原先的 TCP/IP 网络模式中,这些数据是需要从操作系统的用户态拷贝到内核态后,再经由网卡和对端进行交互。依托 RDMA 内核旁路特性,通过实现 RdmaEventLoop 的适配器,消息直接由用户态到 RDMA 网卡和对端进行交互。 从最终 BenchMark 的测试效果来看,引入 RDMA 技术后,存储层 Broker 的 CPU 资源消耗下降了 26.7%,计算层 Proxy 的 CPU 资源消耗下降了 10.1%,计算到存储的发送 RT 下降了 23.8%。 优雅上下线:ApsaraMQ Serverless 弹性如丝般顺滑 在 Serverless 场景下,物理资源的水平弹性扩缩是一个常态化的过程,同时结合 MQ 客户端和计算层 Proxy TCP 长连接的特点,在 Proxy 上下线过程中是比较容易出现连接断开的感知,比如客户端刚发出一个接收消息的请求,连接就被服务侧强行关闭了,是非常容易造成单次请求超时的异常的。 因此,我们通过 gRPC 协议 Http2 的 Go Away 机制以及 Remoting 协议层的扩展,结合 SLB 连接优雅终端的能力来实现 ApsaraMQ 在扩容态、缩容态、以及发布态的无感。 右图展示了缩容态下单台 Proxy 优雅下线的过程: 1. 当收到 Proxy0 需要下线的指令后,SLB 会将 Proxy0 摘除,保障新的连接不再建立到 Proxy0 上,同时 Proxy0 进入 Draining 状态,旧连接进行会话保持。 2. Proxy0 上收到新的请求后,返回的 Response Code 都更新为 Go Away;同时客户单收到 Go Away 的 Response 后,断开原先的连接,向 SLB 发起新建连接的请求。 3. SLB 收到新建连接的请求,会导流至 Proxy1。 4. 等待一段时间后 Proxy0 上的连接会自然消亡,最终达到无感下线的目的。 事件驱动架构赋能 AI 应用 AI 无疑是当今互联网行业的热门话题,同时也是本届云栖大会的核心议题之一。接下来,将阐述面向 AI 原生应用的事件驱动架构如何拥抱 AI,赋能 AI 应用蓬勃发展。 + 面向 AI 应用的实时处理,具备实时 Embedding 至向量数据库、更新私有数据存储的能力,全面提升 AI 应用实时性、个性化和准确度。 + 面向 AI 应用的异步解耦,解耦 AI 推理链路的快慢服务,加快应用响应速度。 + 面向 AI 应用的消息负载,ApsaraMQ 支持主动 Pop 消费模式,允许动态设置每一条消息的个性化消费时长. 同时也支持优先级队列,满足下游多个大模型服务的优先级调度。 + 面向 AI 应用的消息弹性,ApsaraMQ 提供全模式的 Serverless 弹性能力,支持纯按量和预留+弹性、定时弹性等多种流量配置模型; 最后,让我们来看下阿里云事件总线 EventBridge 是如何实现数据实时向量化,提升 AI 应用的实时性和准确度的? 阿里云事件总线的 Event Streaming 事件流处理框架,具备监听多样化数据源,然后经过多次 Transform 之后再投递给下游数据目标的原子能力;依托这个框架,我们是很容易去实现数据的实时向量化的,从 RocketMQ、Kafka、OSS 等多个源监听到实时数据流入之后,经过文本化、切片、向量化等操作,最终存入向量数据库,作为 AI 应用实时问答的多维度数据检索的依据,最终提升了 AI 应用的实时性和准确度。
作者:金吉祥
#行业实践 #云原生

2024年7月24日

Apache RocketMQ ACL 2.0 全新升级
引言 RocketMQ 作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ 现有的 ACL 1.0 版本已经无法满足未来的发展。因此,我们推出了 RocketMQ ACL 2.0 升级版,进一步提升 RocketMQ 数据的安全性。本文将介绍 RocketMQ ACL 2.0 的新特性、工作原理,以及相关的配置和实践。 升级的背景 ACL 1.0 痛点问题 RocketMQ ACL 1.0 的认证和授权流程如上图所示,在使用过程中,存在着以下痛点问题: 绕过访问控制的 IP 白名单:在标准安全实践中,IP 白名单通常用于限制客户端只能从特定 IP 或 IP 段访问资源。然而,ACL 1.0 中,IP 白名单被异常用于绕过鉴权验证的手段, 偏离了标准实践中的安全意图。这种设计上的偏差可能造成潜在的安全隐患,特别是在公网场景中,多个客户端共享同一个 IP 的情况下,会导致未授权的 IP 地址绕过正常的访问控制检查对集群中的数据进行访问。 缺乏管控 API 精细化控制:RocketMQ 提供了 130 多个管控 API,支持了集群配置,Topic、Group 的元数据管理,以及消息查询、位点重置等操作。这些操作涉及到敏感数据的处理,以及影响系统的稳定性。因此,根据用户不同角色或职责,精确定义可访问的 API 和数据范围变得至关重要。然而,ACL 1.0 仅对其中 9 个 API 进行了支持,包括 Topic、Group 元数据,以及Broker配置,剩下的 API 有可能被攻击者利用,对系统进行攻击,窃取敏感的数据。此外,要实施对这么多的管控 API 进行访问控制,现有的设计会导致大量的编码工作,并且在新增 API 时也增加了遗漏的风险。 缺少集群组件间访问控制:在 RocketMQ 架构中,涵盖了 NameServer、Broker 主从节点、Proxy 等多个关键组件。目前,这些组件之间的互相访问缺失了关键的的权限验证机制。因此,一但旦在集群外自行搭建 Broker 从节点或 Proxy 组件,便可以绕过现有的安全机制,访问并获取集群内的敏感数据,这无疑给系统的数据安全和集群的稳定性造成巨大的威胁。 特性与原理 ACL 2.0 新特性 RocketMQ ACL 2.0 针对 ACL 1.0 中的问题进行了解决,同时还带来了六个主要的新特性,具体如下: 精细的API资源权限定义:ACL 2.0 对 RocketMQ 系统中所有的资源都进行了定义,包括集群、命名空间、主题、消费者组,以实现对所有类型的资源进行独立的访问控制。此外,它将所有的 API 都纳入权限控制范畴,覆盖了包括消息收发、集群管理、元数据等各项操作,确保所有资源的任何操作都施加了严格的权限控制。 授权资源的多种匹配模式:在资源众多的集群环境中,为每个资源进行逐一授权会带来繁复的配置过程和管理负担。因此,ACL 2.0 引入了三种灵活的匹配模式:完全匹配、前缀匹配,以及通配符匹配。这些模式可以让用户根据资源的命名规范和结构特点,快速地进行统一的设定,简化权限的管理操作,提升配置的效率。 支持集群组件间访问控制:由于将所有资源类型和API操作都纳入了访问控制体系,集群内部组件之间的连接和访问也受到了权限控制,包括 Broker 主从之间的 Leader 选举、数据复制的过程,以及 Proxy 到 Broker 的数据访问等环节,这可以有效地避免潜在的数据泄露问题和对系统稳定性的风险,加强了整个集群的安全性和可靠性。 用户认证和权限校验分离:通过对认证和授权这两个关键模块进行解耦,系统可以提供类似“只认证不鉴权”等方式的灵活选择,以适应各种不同场景的需求。此外,两个组件可以分别演进、独立发展,从而诞生出多样的认证方式和先进的鉴权方法。 安全性和性能之间的平衡:当启用 ACL 后,客户端的每次请求都必须会经过完整的认证和授权流程。这确保了系统的安全性,但同时也引入了性能上的开销。在 ACL 2.0 中,提供了无状态认证授权策略和有状态认证授权策略,来分别满足对安全有极致要求,以及安全可控但性能优先这两种不同的安全和性能需求。 灵活可扩展的插件化机制:当前市场上,认证方式存在多种实现,授权方式也有不同场景的定制需求。因此,ACL 2.0 设计了一套插件化的框架,在不同层面上进行接口的定义和抽象,以支持未来对认证和授权进行扩展,满足用户根据自身业务需求定制和实现相应的解决方案。 访问控制模型 基于角色的访问控制(RBAC)和基于属性的访问控制(ABAC)是访问控制体系中两种主要的方法。RocketMQ ACL 2.0 将这两种方法进行了融合,打造出了一套更加灵活和强大的访问控制系统。RBAC 是基于角色的访问控制模型,通过角色进行权限的分配。RocketMQ ACL 2.0 将用户角色划分为超级用户(Super)和普通用户(Normal),超级用户具有最高级别的权限,能够无需授权即可访问资源,这简化了集群初始化及日常运维过程中的权限依赖问题。而普通用户在访问资源之前,需要被赋予相应的权限,适用于业务场景中,对资源进行按需访问。ABAC 是基于属性的访问控制模型,通过用户、资源、环境、操作等多维属性来表达访问控制策略。RocketMQ ACL 2.0 为普通用户提供了这种灵活的访问控制机制。帮助管理员根据业务需求、用户职责等因素,对资源进行更加精细的访问控制。在安全体系中,认证和授权分别扮演着不同的角色,RocetMQ ACL 2.0 将认证和授权进行了模块分离。这可以确保两个组件各司其职,降低系统的复杂度。认证服务致力于验证用户身份的合法性,而授权服务则专注于管理用户权限和访问控制。这样的划分不仅可以让代码更易于管理、维护和扩展,也为用户提供了使用上的灵活性。根据需求,用户可以选择单独启用认证或授权服务,也可以选择同时启用两者。这使得 RocketMQ ACL 既可以满足简单场景的快速部署,也能够适应复杂环境下对安全性的严格要求。 认证(Authentication) 认证作为一种安全机制,旨在验证发起访问请求者的身份真实性。它用于确保只有那些经过身份验证的合法用户或实体才能访问受保护的资源或执行特定的操作。简而言之,认证就是在资源或服务被访问之前回答“你是谁?”这个问题。RocketMQ ACL 2.0 版本维持了与 ACL 1.0 相同的认证机制,即基于 AK/SK 的认证方式。这种方式主要通过对称加密技术来核验客户端的身份,保证敏感的认证信息(如密码)不会在网络上明文传输,从而提升了整体的认证安全性。 主体模型 为了提升 RocketMQ 系统的访问控制和权限管理,ACL 2.0 针对主体模型做了以下改进和扩展: 1. 统一主体模型的抽象:为了实现不同实体的访问控制和权限管理,设计了统一的主体接口,允许系统中多个实例作为资源访问的主体。用户作为访问资源的主体之一,按照该模型实现了主体的接口。这为未来新实体类型的权限适配提供了扩展能力。 2. 角色分级与权限赋予: 超级用户:为了简化管理流程,超级用户被自动授予了全部权限,无需单独配置,从而简化了系统的初始化和日常的运维管理工作。 普通用户:普通用户的权限则需要明确授权。ACL 2.0 提供了相关的权限管理工具,可以根据组织的政策和安全需求,为普通用户赋予合适的权限。 3. 支持用户状态管理:为了应对可能出现的安全风险,比如用户密码泄露,ACL 2.0 提供了用户的启用与禁用功能。当发生安全事件,可以通过禁用用户状态,快速进行止血,从而达到阻止非法访问的目的。 认证流程 客户端流程: 1. 客户端在构建 RPC 请求时,检查是否设置了用户名和密码,若未配置,则直接发送请求; 2. 若已配置,则使用预设的加密算法对请求参数进行加密处理,并生成对应的数字签名(Signature)。 3. 在请求中附加用户名和 Signature,并将其发送至服务端以进行身份验证。 服务端流程: 1. 服务端接收到请求后,首先检查是否开启认证,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求进行认证相关的参数进行解析和组装,获取包括用户名和 Signature 等信息。 3. 通过用户名在本地库中查询用户相关信息,用户不存在,则返回处理无;用户存在,则进入下一步。 4. 获取用户密码,采用相同的加密算法对请求进行加密生成 Signature,并和客户端传递的 Signature 进行比对,若两者一致,则认证成功,不一致,则认证失败。 授权(Authorization) 核心概念 授权作为一种安全机制,旨在确定访问请求者是否拥有对特定资源进行操作的权限。简而言之,授权就是在资源被访问之前回答“谁在何种环境下对哪些资源执行何种操作”这个问题。基于“属性的访问控制(ABAC)”模型,RocketMQ ACL 2.0 涵盖了以下一系列的核心概念。在系统实现中,都会以以下概念作为指导,完成整个权限管理和授权机制的设计和实现。 权限模型 基于属性的访问控制(ABAC)模型的核心概念,ACL 2.0 对权限模型做了精心的设计,要点如下: 向后兼容的权限策略:默认情况下,ACL 2.0 只匹配和检验用户自定义的权限,若未找到匹配项,则视为无权限访问资源。但考虑到 ACL 1.0 中,存在默认权限的设置,允许对未匹配资源进行“无权限访问”和“有权限访问”的默认判定。因此,我们针对默认权限策略进行了兼容,确保 ACL 1.0 到 ACL 2.0 的无缝迁移。 灵活的资源匹配模式:在资源类型方面,ACL 2.0 支持了集群(Cluster)、命名空间(Namespace)、主题(Topic)、消费者组(Group)等类型,用于对不同类型的资源进行访问控制。在资源名称方面,引入了完全匹配(LITERAL)、前缀匹配(PREFIXED),以及通配符匹配(ANY)三种模式,方便用户根据资源的命名规范和结构,快速设定统一的访问规则,简化权限的管理。 精细的资源操作类型:在消息的发送和消费的接口方面,分别定义为 PUB 和 SUB 这两种操作。在集群和资源的管理的接口方面,分别定义为 CREATE、UPDATE、DELETE、LIST、GET 五种操作。通过这种操作类型的细化,可以帮助用户在资源的操作层面,无需关心具体的接口定义,简化对操作的理解和配置。 坚实的访问环境校验:在请求访问的环境方面,ACL 2.0 加入了客户端请求 IP 来源的校验,这个校验控制在每个资源的级别,可以精确到对每个资源进行控制。IP 来源可以是特定的 IP 地址或者是一个 IP 段,来满足不同粒度的 IP 访问控制,为系统的安全性增添一道坚实的防线。 授权流程 客户端流程: 1. 客户端在构建 RPC 请求时,构建本次调用的接口入参,接口对应权限背后的操作定义。 2. 客户端在接口入参中设置本次访问的资源信息,然后将用户和资源等参数传递到服务端。 服务端流程: 1. 服务端在收到请求后,首先检查是否开启授权,若未开启,则不校验直接通过;若已开启了,则进入下一步。 2. 服务端对请求中和授权相关的参数进行解析和组装,这些数据包括用户信息、访问的资源、执行的操作,以及请求的环境等。 3. 通过用户名在本地数据存储中查询用户相关信息,若用户不存在,则返回错误;若用户存在,则进入下一步。 4. 判断当前用户是否是超级用户,若超级用户,则直接通过请求,无需做授权检查,若普通用户,则进入下一步进行详细的授权检查。 5. 根据用户名获取相关的授权策略列表,并对本次请求的资源、操作,以及环境进行匹配,同时按照优先级进行排序。 6. 根据优先级最高的授权策略做出决策,若授权策略允许该操作,则返回授权成功,若拒绝该操作,则返回无权限错误。 授权参数的解析 在 ACL 2.0 中,更具操作类型和请求频率,对授权相关参数(包括资源、操作等)的解析进行了优化。1. 硬编码方式解析对于消息发送和消费这类接口,参数相对较为复杂,且请求频次也相对较高。考虑到解析的便捷性和性能上的要求,采用硬编码的方式进行解析。2. 注解方式解析对于大量的管控接口,采用硬编码的方式工作量巨大,且这些接口调用频次较低,对性能要求不高,所以采用注解的方式进行解析,提高编码效率。 权限策略优先级 在权限策略匹配方面,由于支持了模糊的资源匹配模式,可能出现同一个资源对应多个权限策略。因此,需要一套优先级的机制来确定最终使用哪一套权限策略。假设配置了以下授权策略,按照以上优先级资源的匹配情况如下: 认证授权策略 出于安全和性能的权衡和考虑,RocketMQ ACL 2.0 为认证和授权提供了两种策略:无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful)。 无状态认证授权策略(Stateless): 在这种策略下,每个请求都会经过独立的认证和授权过程,不依赖于任何先前的会话和状态信息。这种严格的策略可以保证更高级别的安全保证。对权限进行变更,可以更加实时的反应在随后的请求中,无需任何等待。然而,这种策略在高吞吐的场景中可能会导致显著的性能负担,如增加系统 CPU 的使用率以及请求的耗时。 有状态认证授权策略(Stateful): 在这种策略下,同一个客户端连接,相同资源以及相同的操作下,第一次请求会经过完整的认证和授权,后续请求则不再进行重复认证和授权。这种方法可以有效地降低性能小号,减少请求的耗时,特别适合吞吐量较高的场景。但是,这种策略可能引入了安全上的妥协,对权限的变更也无法做到实时的生效。 在这两者策略的选择上,需要权衡系统的安全性要求和性能需求。如果系统对安全性的要求很高,并且可以容忍一定的性能损耗,那么无状态认证授权策略可能是更好的选择。相反,如果系统需要处理大量的并发请求,且可以在一定程度上放宽安全要求,那么有状态认证授权策略可能更合适。在实际部署时,还应该结合具体的业务场景和安全要求来做出决策。 插件化机制 为了适应未来持续发展的认证鉴权方式,以及满足用户针对特定场景的定制需求,RocketMQ ACL 2.0 在多个环节上提供了灵活性和可扩展性。 认证和授权策略的扩展:默认情况下,RocketMQ ACL 2.0 提供了无状态认证授权策略(Stateless)和有状态认证授权策略(Stateful),以满足绝大多数用户对安全和性能的要求。但是,后续仍然可以探索出更优的策略,来兼顾安全和性能之间的平衡。 认证和授权方式的扩展:当前,在认证方面,市场上已经沉淀了多种成熟的实现,RocketMQ 目前只实现了其中一种,通过插件化的能力进行预留,未来可以轻松的引入更多的认证机制。在授权方面,RocketMQ 基于 ABAC 模型实现了一套主流的授权方式,以适应广泛的用户需求。但也提供了插件化的能力,方便未来能适配出更多贴合未来发展的解决方案。 认证和授权流程的编排:基于责任链设计模式,RocketMQ ACL 2.0 对其默认的认证和授权流程进行了灵活的编排。用户可以扩展或重写这些责任链节点,从而能够定制针对其具体业务场景的认证和授权逻辑。 用户和权限存储的扩展:RocketMQ 默认采用 RocksDB 在 Broker 节点上本地存储用户和权限数据。然而,通过实现预定义的接口,用户可以轻松地将这些数据迁移至任何第三方服务或存储系统中,从而优化其架构设计和操作效率。 审计日志 审计日志,用于记录和监控所有关于认证和授权的访问控制操作。通过升级日志,我们可以追踪到每一个访问的请求,确保系统的可靠性和安全性,同时,它也有助于问题的排查,进行安全的升级和满足合规的要求。RocketMQ ACL 2.0 对认证和授权相关的审计日志都进行了支持,格式如下: 认证日志 ``` 认证成功日志 [AUTHENTICATION] User:rocketmq is authenticated success with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+gg=. 认证失败日志 [AUTHENTICATION] User:rocketmq is authenticated failed with Signature = eMX/+tH/7Bc0TObtDYMcK9Ls+xx=. ``` 授权日志 ``` 授权成功日志 [AUTHORIZATION] Subject = User:rocketmq is Allow Action = Pub from sourceIp = 192.168.0.2 on resource = Topic:TPTEST for request = 10. 授权失败日志 [AUTHORIZATION] Subject = User:rocketmq is Deny Action = Sub from sourceIp = 192.168.0.2 on resource = Topic:GIDTEST for request = 10. ``` 配置与使用 部署架构 在部署架构方面,RocketMQ 提供了两种部署形态,分别是存算一体架构和存算分离架构。 存算一体架构 在 RocketMQ 存算一体架构中,Broker 组件同时承担了计算和存储的职责,并对外提供服务,接收所有客户端的访问请求。因此,由 Broker 组件承担认证和授权的重要角色。此外,Broker 组件还负责认证和授权相关的元数据的维护和存储。 存算分离架构 在 RocketMQ 存算分离架构中,存储由 Broker 组件负责,计算由 Proxy 组件负责,所有的对外请求都是由 Proxy 对外进行服务。因此,请求的认证和授权都由 Proxy 组件承担。Broker 承担元数据存储,为 Proxy 组件提供所需的认证和授权元数据的查询和管理服务。 集群配置 认证配置 参数列表 想要在服务端开启认证功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authenticationEnabled = true authenticationProvider = org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider initAuthenticationUser = {"username":"rocketmq","password":"12345678"} innerClientAuthenticationCredentials = {"accessKey":"rocketmq","secretKey":"12345678"} authenticationMetadataProvider = org.apache.rocketmq.auth.authentication.provider.LocalAuthenticationMetadataProvider ``` Proxy 配置 ``` { "authenticationEnabled": true, "authenticationProvider": "org.apache.rocketmq.auth.authentication.provider.DefaultAuthenticationProvider", "authenticationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthenticationMetadataProvider", "innerClientAuthenticationCredentials": "{\"accessKey\":\"rocketmq\", \"secretKey\":\"12345678\"}" } ``` 授权配置 参数列表 想要在服务端开启授权功能,相关的参数和使用案例主要包含如下: Broker 配置 ``` authorizationEnabled = true authorizationProvider = org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider authorizationMetadataProvider = org.apache.rocketmq.auth.authorization.provider.LocalAuthorizationMetadataProvider ``` Proxy 配置 ``` { "authorizationEnabled": true, "authorizationProvider": "org.apache.rocketmq.auth.authorization.provider.DefaultAuthorizationProvider", "authorizationMetadataProvider": "org.apache.rocketmq.proxy.auth.ProxyAuthorizationMetadataProvider" } ``` 如何使用 命令行使用 用户管理关于 ACL 用户的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建用户 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq 创建用户,指定用户类型 sh mqadmin createUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p rocketmq t Super 更新用户 sh mqadmin updateUser n 127.0.0.1:9876 c DefaultCluster u rocketmq p 12345678 删除用户 sh mqadmin deleteUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户详情 sh mqadmin getUser n 127.0.0.1:9876 c DefaultCluster u rocketmq 查询用户列表 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster 查询用户列表,带过滤条件 sh mqadmin listUser n 127.0.0.1:9876 c DefaultCluster f mq ``` ACL 管理关于 ACL 授权的管理,相关的接口定义和使用案例如下。 接口定义 使用案例 ``` 创建授权 sh mqadmin createAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Allow 更新授权 sh mqadmin updateAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic:,Group: a Pub,Sub i 192.168.1.0/24 d Deny 删除授权 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq 删除授权,指定资源 sh mqadmin deleteAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权列表 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster 查询授权列表,带过滤条件 sh mqadmin listAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq r Topic: 查询授权详情 sh mqadmin getAcl n 127.0.0.1:9876 c DefaultCluster s User:rocketmq ``` 客户端使用 关于 ACL 的使用,ACL 2.0 和 ACL 1.0 的使用方式一样,没有任何区别,具体参考官方案例。 消息发送 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); StaticSessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); Producer producer = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(TOPICS) .build(); ``` 消息消费 ``` ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(ENDPOINTS) .setCredentialProvider(sessionCredentialsProvider) .build(); FilterExpression filterExpression = new FilterExpression(TAG, FilterExpressionType.TAG); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(CONSUMER_GROUP) .setSubscriptionExpressions(Collections.singletonMap(TOPIC, filterExpression)) .setMessageListener(messageView { return ConsumeResult.SUCCESS; }) .build(); ``` 扩容与迁移 扩容 如果想要在运行过程中的集群扩容一台 Broker,就需要将所有的元数据都同步到这台新的 Broker 上,ACL 2.0 提供了相应的拷贝用户和拷贝授权的接口来支持这项操作。 接口定义 使用案例 ``` 拷贝用户 sh mqadmin copyUser n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 拷贝授权 sh mqadmin copyAcl n 127.0.0.1:9876 f 192.168.0.1:10911 t 192.168.0.2:10911 ``` 迁移 如果已经使用上了 ACL 1.0,想要无缝地迁移至 ACL 2.0,也提供了相应的解决方案,只需要做以下配置即可。 配置定义 在 Broker 的配置文件中开启以下配置: ``` migrateAuthFromV1Enabled = true ``` 特别说明 启用以上配置后,将在 Broker 启动过程中自动触发执行。该迁移功能会把 ACL 1.0 中的用户权限信息写入 ACL 2.0 的相应存储结构中。对于在 ACL 2.0 中尚未存在的用户和权限,系统将自动添加。对于已存在的用户和权限,迁移功能不会进行覆盖,以避免重写 ACL 2.0 中已经进行的任何修改。ACL 1.0 中关于 IP 白名单,由于是用于绕过访问控制的检查,和 ACL 2.0 的行为不匹配,所以不会迁移到 ACL 2.0 中。如果已经使用相关的能力,请完成改造后再做迁移。 规划与总结 规划 关于 RocketMQ ACL 的未来规划,可能会体现在以下两个方面: 丰富的认证和授权扩展:市场上存在丰富的认证和授权解决方案,其他的存储或计算产品也都采用了各种各样的实现方式。为了紧跟行业的发展趋势,RocketMQ ACL 未来也将努力创新,以满足更为广泛和多变的客户需求。同时,也将持续深化研究和发展更加出色的认证和授权策略,以达到安全性和性能之间的理想平衡。 可视化的用户权限操作:当前,在 ACL 中进行用户和权限的配置仅能通过命令行工具,不够友好。未来我们希望能在 RocketMQ Dashboard 上提供一个清晰、易用的可视化管理界面,从而简化配置流程并降低管理的技术门槛。另一方面,现有的 Dashboard 尚未集成 ACL 访问控制体系,后续也要将它纳入进来,以实现用户在 Dashboard 上对各项资源进行操作的访问权限。 总结 RocketMQ ACL 2.0 不管是在模型设计、可扩展性方面,还是安全性和性能方面都进行了全新的升级。旨在能够为用户提供精细化的访问控制,同时,简化权限的配置流程。欢迎大家尝试体验新版本,并应用在生产环境中。非常期待大家的在社区中反馈、讨论,和参与贡献,共同推进 RocketMQ 社区的成长和技术进步。
作者:徒钟
#行业实践 #最佳实践 #功能特性

2024年7月24日

基于 RocketMQ Connect 构建数据流转处理平台
从问题中来的 RocketMQ Connect 在电商系统、金融系统及物流系统,我们经常可以看到 RocketMQ 的身影。原因不难理解,随着数字化转型范围的扩大及进程的加快,业务系统的数据也在每日暴增,此时为了保证系统的稳定运行,就需要把运行压力分担出去。RocketMQ 就担任着这样的角色,它的异步消息处理与高并发读写能力,决定了系统底层的重构不会影响上层应用的功能。而 RocketMQ 的另一个优势——可伸缩能力,使系统在面临流量的不确定性时,实现对流量的缓冲处理。此外,RocketMQ 的顺序设计特性使其成为一个天然的排队引擎,例如,三个应用同时对一个后台引擎发起请求,确保不引起“撞车”事故。因此,RocketMQ 被用在异步解耦、削峰填谷以及事务消息等场景中。 但是,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ 自身不具备数据分析能力,但是有不少用户希望从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,使用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些分析系统中虽然是一种可行方案,却会引入新的组件,造成数据同步的链路较长,时延相对较高,用户体验不佳。 举个例子,假设业务场景中使用 OceanBase 作为数据存储,同时希望将这些数据同步到 Elasticsearch 进行全文搜索,有两种可行的数据同步方案。 方案一:从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此方案没什么问题,一旦数据增多,开发和维护都非常复杂,此时就要用到第二种方案。 方案二:引入消息中间件对上下游进行解藕,这能解决第一种方案的问题,但是一些较为复杂的问题还没有完全解决。比如,如何将数据从源数据同步到目标系统并保证高性能,如果保证同步任务的部分节点挂掉,数据同步依然正常进行,节点恢复依然可以断点续传,同时随着数据管道的增多,如何管理数据管道也变得十分困难。 总的来说,数据集成过程中的挑战主要有五个。 挑战一:数据源多,市面上可能有上百个数据源,且各数据源的系统差异较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。 挑战二:高性能问题,如何高效地从源数据系统同步到目的数据系统,并保障其性能。 挑战三:高可用问题,即Failover能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。 挑战四:弹性扩缩容能力,根据系统流量动态增加或减少节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。 挑战五:数据管道的管理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越复杂,如何高效管理监控众多的同步任务。 面对上述挑战 RocketMQ 如何解决? 第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中增加 Connect 组件,一方面对数据集成过程抽象,抽象标准的数据格式以及描述数据的 Schema,另一方面对同步任务进行抽象,任务的创建、分片都抽象成一套标准化的流程。 第二,基于标准的 API 实现 Connect Runtime。Runtime 提供了集群管理、配置管理、位点管理、负载均衡相关的能力,拥有了这些能力,开发者或者用户就只需要关注数据如何获取或如何写入,从而快速构建数据生态,如与 OceanBase、MySQL、Elasticsearch 等快速建立连接,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简单调用即可。 第三,提供完善的运维工具,方便管理同步任务,同时提供丰富的 Metrics 信息,方便查看同步任务的 TPS,流量等信息。 RocketMQ Connect 两大使用场景 这里为大家整理了 RocketMQ Connect 的两大使用场景。 场景一,RocketMQ 作为中间媒介,可以将上下游数据打通。 比如在新旧系统迁移的过程中,如果在业务量不大时使用 MySQL 就可以满足业务需求,而随着业务的增长,MySQL 性能无法满足业务要求时,需要对系统进行升级,选用分布式数据库 OceanBase 提升系统性能。 如何将旧系统数据无缝迁移到 OceanBase 中呢?在这个场景中 RocketMQ Connect 就可以发挥作用,RocketMQ Connect 可以构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁移。RocketMQ Connect 还可以用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic 当中,目标存储只需要对接 Elasticsearch 就可以构建一个搜索平台,目标存储如果是数据湖就可以构建一个数据湖平台。 除此之外,RocketMQ 自身也可以作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,可以构建 RocketMQ 多活容灾能力,这是社区正在孵化的 Replicator 可以实现的能力。 场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件——RocketMQ Streams,Connector 将各个存储系统的数据集成到RocketMQ Topic 当中,下游使用 RocketMQ Streams 流计算的能力就可以构建一个实时的流计算平台。当然也可以配合业务系统的 Service 实现业务系统快速从其它存储统一快速获取数据的能力。 还可以将 RocketMQ 作为端点的上游,将业务消息发到 Topic 中,使用 Connector 对数据做持久化或转存的操作。 如此一来,RocketMQ 就具备数据集成能力,可以实现任意任意异构数据源之间的数据同步,同时也具备统一的集群管理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只需要专注于数据拷贝,简单配置就可以得到一个具备配置化、低代码、低延时、高可用,支持故障处理和动态扩缩容数据集成平台。 RocketMQ Connect 实现原理 那么, RocketMQ Connect 是如何实现的呢?在介绍实现原理前,先来了解两个概念。 概念一:什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入 RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到目标系统,这种是 SinkConnector。Connector 决定需要创建任务的数量,从 Worker 接收配置传递给任务。 概念二:什么是 Task ? Task 是 Connector 任务分片的最小分配单位,是实际将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从 RocketMQ 读出写入到目标系统(SinkTask)真正的执行者,Task 是无状态的,可以动态的启停任务,多个 Task 可以并行执行,Connector 复制数据的并行度主要体现在 Task 上。一个 Task 任务可以理解为一个线程,多个 Task 则以多线程的方式运行。 通过 Connect 的 API 也可以看到 Connector 和 Task 各自的职责,Connector 实现时就已经确定数据复制的流向,Connector 接收数据源相关的配置,taskClass 获取需要创建的任务类型,通过 taskConfigs 的数量确定任务数量,并且为 Task 分配好配置。Task 拿到配置以后数据源建立连接并获取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector 和 Task 处理基本流程。 一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些任务运行在 Worker(进程)中。Worker 进程是 Connector 和 Task 运行环境,它提供 RESTFull 能力,接收 HTTP 请求,将获取到的配置传递给 Connector 和 Task,它还负责启动 Connector 和 Task,保存 Connector 配置信息,保存 Task 同步数据的位点信息,除此以外,Worker 还提供负载均衡能力,Connect 集群高可用、扩缩容、故障处理主要依赖 Worker 的负责均衡能力实现的。Worker 提供服务的流程如下: Worker 提供的服务发现及负载均衡的实现原理如下: 服务发现: 用过 RocketMQ 的开发者应该知道,它的使用很简单,就是发送和接收消息。消费模式分为集群模式和广播模式两种,集群消费模式下一个 Topic 可以有多个 Consumer 消费消息,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还可以将客户端上下线信息通知给其它节点,利用 RocketMQ 这个特性就实现了 Worker 的服务发现。 配置 / Offset 同步: Connector 的配置/Offset 信息同步通过每个 Worker 订阅相同的 Topic,不同 Worker 使用不同的 Consumer Group 实现的, Worker 节点可以通过这种方式消费到相同 Topic 的所有数据,即 Connector 配置/ Offset 信息,这类似于广播消费模式,这种数据同步模式可以保证任何一个 Worker 挂掉,该 Worker 上的任务依旧可以在存活的 Worker 正常拉起运行 ,并且可以获取到任务对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的基础。 负载均衡: RocketMQ 消费场景中,消费客户端 与 Topic Queue 之间有负载均衡能力,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载均衡,与 RocketMQ 客户端负载均衡一样,可以根据使用场景选择不同负载均衡算法。 上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP可以创建 Connector,管理Connector 以及查看 Connector 状态,简单列举: POST /connectors/{connector name} GET /connectors/{connector name}/config GET /connectors/{connector name}/status POST /connectors/{connector name}/stop 目前 Connector 支持单机、集群两种部署模式。集群模式至少要有两个节点,才能保证它的高可用。并且集群可以动态增加或者减少,做到了动态控制提升集群性能和节省成本节省的能力。单机模式更多方便了开发者开发测试 Connector 。 如何实现一个 Connector呢? 还是结合一个具体的场景看一看,例如业务数据当前是写入 MySQL 数据库中的,希望将 MySQL中数据实时同步到数据湖 Hudi 当中。只要实现 MySQL Source Connector 、Hudi Sink Connector 这两个 Connector 即可。 下面就以 MySQLSource Connector 为例,来看一下具体的如何实现。 实现 Connector 最主要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相关的 API 外,还有任务如何分配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API是需要创建的 Task,Connector 通过任务分配将相关的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,密码,IP,端口后就会创建数据库连接,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 方法,实现 Connector 时只要调用到 Poll 方法时可以获取到数据即可,这样 Connector 就基本写完了。然后打包以 Jar 包的形式提供出来,将它加载到 Worker 的节点中。 创建 Connector 任务后, Worker 中会创建一个或者多个线程,不停的轮询 Poll 方法,从而获取到 MySQL 表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。 RocketMQ Connect 现状与未来 RocketMQ Connect 的发展历程分为三个阶段。 第一阶段:Preview 阶段 RocketMQ Connect 发展的初期也即 Preview 阶段,实现了 Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 可以简单实现端到端的数据源同步,但功能还不够完善,不支持数据转换,序列化等能力,生态相对还比较贫乏。 第二阶段:1.0 阶段 在 1.0 阶段,Open Messaging Connect API 进行了升级,支持Schema、Transform,Converter等能力,在此基础上对 Connect Runtime 也进行了重大升级,对数据转换,序列化做了支持,复杂Schema也做了完善的支持。该阶段的 API、Runtime 能力已经基本完善,在此基础上,还有30+ Connecotor 实现,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等Connector实现,还做了Kafka Connector Adaptor可以运行Kafka生态的Connector。 第三阶段:2.0 阶段 RocketMQ Connect当前处于这个阶段,重点发展Connector生态,当 RocketMQ 的 Connector生态达到 100 + 时,RocketMQ 基本上可以与任意的一个数据系统去做连接。 目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中发布,欢迎感兴趣的同学试用。 总结 RocketMQ 是一个可靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,可以实现 RocketMQ 与其他数据系统之间的数据流入与流出。通过 RocketMQ Connect 可以实现 CDC,构建数据湖,结合流计算可实现数据价值。
作者:周波
#行业实践 #最佳实践 #生态集成

2023年3月28日

RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
作者简介:艾阳坤,Apache RocketMQ PMC Member/Committer,CNCF OpenTelemetry Member,CNCF Envoy contributor。 在分布式系统中,多个服务之间的交互涉及到复杂的网络通信和数据传输,其中每个服务可能由不同的团队或组织负责维护和开发。因此,在这样的环境下,当一个请求被发出并经过多个服务的处理后,如果出现了问题或错误,很难快速定位到根因。分布式全链路追踪技术则可以帮助我们解决这个问题,它能够跟踪和记录请求在系统中的传输过程,并提供详细的性能和日志信息,使得开发人员能够快速诊断和定位问题。对于分布式系统的可靠性、性能和可维护性起到了非常重要的作用。 RocketMQ 5.0 与分布式全链路追踪 Apache RocketMQ 5.0 版本作为近几年来最大的一次迭代,在整个可观测性上也进行了诸多改进。其中,支持标准化的分布式全链路追踪就是一个重要的特性。 RocketMQ 5.0 可观测 而由 Google、Microsoft、Uber 和 LightStep 联合发起的 CNCF OpenTelemetry 作为 OpenTracing 和 OpenCensus 的官方继任者,已经成为可观测领域的事实标准,RocketMQ 的分布式全链路追踪也围绕 OpenTelemetry 进行展开。 分布式链路追踪系统的起源可以追溯到 2007 年 Google 发布的论文。这篇论文详细介绍了 Google 内部使用的链路追踪系统 Dapper,其中使用的 span 概念被广泛采用,并成为后来开源链路追踪系统中的基础概念之一。 Dapper Trace Tree 在 Dapper 中,每个请求或事务被追踪时都会创建一个 span,记录整个请求或事务处理过程中的各个组件和操作的时间和状态信息。这些 span 可以嵌套,形成一个树形结构,用于表示整个请求或事务处理过程中各个组件之间的依赖关系和调用关系。后来,很多开源链路追踪系统,如 Zipkin 和 OpenTracing,也采用了类似的 span 概念来描述分布式系统中的链路追踪信息。现在,合并了 OpenTracing 和 OpenCensus 的 CNCF OpenTelemetry 自然也一样采用了 span 概念,并在此基础上进行了进一步发展。 OpenTelemetry 为 messaging 相关的 span 定义了,旨在制定一套与特定消息系统无关的 specification,而 OpenTelmetry 自身的开发其实也都是由 specification 驱动进行展开。 Specification Driven Development Messaging Span 定义 Specifaition 中描述了 messaging span 的拓扑关系,包括代表消息发送、接收和处理的不同 span 之间的父子和链接关系。关于具体的定义可以参考:。对应到 RocketMQ 中,有三种不同的 span: | Span | Description | | | | | send | 消息的发送过程。span 以一次发送行为开始,成功或者失败/抛异常结束。消息发送的内部重试会被记录成多条 span。 | | receive | 消费者中接收消息的长轮询过程,与长轮询的生命周期保持一致。 | | process | 对应 PushConsumer 里 MessageListener 中对消息的处理过程,span 以进入 MessageListener 为开始,离开 MessageListener 为结束。 | 特别地,默认情况下,receive span 是不启用的。在 receive span 启用和不启用的两种情况下,span 之间的组织关系是不同的: 启用 receive span 前后的 span 关系 在没有启用 receive span 的情况下,process span 会作为 send span 的 child;而当 receive span 启用的情况下,process span 会作为 receive span 的 child,同时 link 到 send span。 Messaging Attributes 定义 语义约定中规定了随 span 携带的通用属性的统一名称,这包括但不限于: messaging.message.id: 消息的唯一标识符。 messaging.destination:消息发送的目的地,通常是一个队列或主题名称。 messaging.operation:对消息的操作类型,例如发送、接收、确认等。 具体可以查看 。 特别地,不同的消息系统可能会有自己特定的行为和属性,,这包括: | Attribute | Type | Description | | | | | | messaging.rocketmq.namespace | string | RocketMQ 资源命名空间,暂未启用 | | messaging.rocketmq.client_group | string | RocketMQ producer/consumer 负载均衡组,5.0 只对 consumer 生效 | | messaging.rocketmq.client_id | string | 客户端唯一标识符 | | messaging.rocketmq.message.delivery_timestamp | int | 定时消息定时时间,只对 5.0 生效 | | messaging.rocketmq.message.delay_time_level | int | 定时消息定时级别,只对 4.0 生效 | | messaging.rocketmq.message.group | string | 顺序消息分组,只对 5.0 生效 | | messaging.rocketmq.message.type | string | 消息类型,可能为 normal/fifo/delay/transaction,只对 5.0 生效 | | messaging.rocketmq.message.tag | string | 消息 tag | | messaging.rocketmq.message.keys | string[] | 消息 keys,可以有多个 | | messaging.rocketmq.consumption_model | string | 消息消费模型,可能为 clustering/broadcasting,5.0 broadcasting 被废弃 | 快速开始 在 OpenTelemetry 中有两种不同的方式可以为应用程序添加可观测信息: Automatic Instrumentation:无需编写任何代码,只需进行简单的配置即可自动生成可观测信息,包括应用程序中使用的类库和框架,这样可以更方便地获取基本的性能和行为数据。 Manual Instrumentation:需要编写代码来创建和管理可观测数据,并通过 exporter 导出到指定的目标。这样可以更灵活自由地控制用户想要观测的逻辑和功能。 在 Java 类库中,前者是一种更为常见的使用形式。RocketMQ 5.0 客户端的 trace 也依托于 automatic instrumentation 进行实现。在 Java 程序中,automatic instrumentation 的表现形式为挂载 Java agent。在过去的一年里,我们将 推入了 OpenTelemetry 官方社区。现在,只需要在 Java 程序运行时挂载上 OpenTelemetry agent,即可实现对应用程序透明的分布式全链路追踪。 除此之外,Automatic Instrumentation 和 Manual Instrumentation 并不冲突,Automatic Instrumentation 中所使用的关键对象会被注册成全局对象,在 Manual Instrumentation 的使用方式中也可以非常方便的获取。实现两个 Instrumentation 共用一套配置,非常灵活和方便。 首先准备好 RocketMQ 5.0 Java 客户端,可以参考 进行消息的收发。关于 RocketMQ 5.0 的更多细节,欢迎大家参考和关注 和 。 然后准备好 OpenTelemetry agent jar,可以从 OpenTelemetry 官方,在应用程序启动时增加 javaagent:yourpath/opentelemetryjavaagent.jar 即可。可以通过设置 OTEL_EXPORTER_OTLP_ENDPOINT 环境变量来设置 OpenTelemetry collector 的接入点。 默认情况下,按照 OpenTelemetry 中关于 messaging 的规范,只有 send 和 process 的 span 会被启用,receive 的 span 是默认不启用的,如果想要启用 receive span,需要手动设置 Dotel.instrumentation.messaging.experimental.receivetelemetry.enabled=true。 场景最佳实践 目前,主流的云服务供应商都为 OpenTelemetry 提供了良好的支持,阿里云上的 SLS 和 ARMS 两款可观测产品都提供了基于 OpenTelemetry 的分布式全链路追踪服务。 为了更好地展示分布式全链路追踪的过程,这里提供了一个代码示例: 。在这个代码示例中,会启动三个不同的进程,涉及三种不同类库和业务逻辑之间的相互调用,展示了一个在分布式环境较复杂中间件之间进行交互的典型案例。 请求首先会从 gRPC 客户端发往 gRPC 服务端,在 gRPC 服务端收到请求之后,会向 RocketMQ 5.0 的 Producer 往服务端发送一条消息,然后再回复对应的 response 给客户端。在 RocketMQ 5.0 的 PushConsumer 接受到消息之后,会在 MessageListener 中使用 Apache HttpClient 往淘宝网发送一条 GET 请求。 示例代码调用链路 特别地,gRPC 客户端在发起具体的调用是在一个上游业务 span 的生命周期之内进行的,这个 span 我们称之为 ExampleUpstreamSpan,RocketMQ 5.0 PushConsumer 在收到消息之后,也会在 MessageListener 里执行其他的业务操作,也会有对应的 span,我们称之为 ExampleDownstreamSpan。那么默认在 receive span 没有启用的情况下,按照开始时间的顺序,会先后存在 7 个 span。分别是: ExampleUpstreamSpan。 gRPC 客户端请求 span。 gRPC 服务端响应 span。 RocketMQ 5.0 Producer 的 send span。 RocketMQ 5.0 Producer 的 process span。 HTTP 请求 span。 ExampleDownstreamSpan。 RocketMQ 5.0 对接 SLS Trace 服务 首先在阿里云日志服务中创建 Trace 服务。然后获取接入点,项目和实例名称等信息,具体可以参考。 在补充好信息之后完成接入之后,稍等一会就可以看到对应的 Trace 信息已经被上传到 SLS trace 服务中: SLS Trace 服务分布式全链路展示 Trace 服务其实是将相关数据存储到日志中,因此这些数据也可以通过 SLS 的 SQL 语法查询得到。 通过 Trace 数据,我们可以很方便知道用户的操作系统环境,Java 版本等一系列基础信息。消息的发送延时,失败与否,消息是否准时投递到了客户端,以及客户端本地消费耗时,消费失败与否等一系列有效信息,可以帮助我们十分有效地进行问题排查。 除此之外,SLS Trace 服务的 demo 页也提供了基于 RocketMQ 5.0 定制的消息中间件大盘,生动展示了利用 Trace 数据得到的发送成功率,端到端延时等一系列指标。 :展示利用 Trace 数据得到的包括发送延时、发送成功率、消费成功率、端到端延时在内的一系列指标。 :可以根据上一步得到的差错长 message id 进行进一步的细粒度查询。 消息中间件分析 RocketMQ 5.0 对接应用实时监控服务(ARMS) 首先进入应用实时监控服务 ARMS 控制台,点击接入中心中的 OpenTelemetry,选择 java 应用程序下的自动探测,获取启动参数并修改至自己的 java 应用程序,具体可以参考。 配置好参数之后,启动自己的相关应用程序,稍等一会儿,就可以在 ARMS Trace Explorer 里看到对应的数据了。 Trace Explorer 还可以查看 span 之间的时序关系。 ARMS Trace Explorer 分布式全链路追踪展示 具体地,可以点进每个 span 查看详细的 attributes/resources/events 等信息。除此之外,ARMS 还支持通过使用 OpenTelemetry Collector 转发的形式来收集应用程序的 Trace 数据。 趋势与思考 随着现代应用程序架构的不断演进,可观测性的重要性日益凸显。它不仅可以帮助我们快速发现和解决系统中的问题,还提高应用程序的可靠性和性能,同时也是实现 DevOps 的关键部分。在相关领域,也陆续诞生了像 DataDog 和 Dynatrace 这样的明星公司。 近年来涌现了一些新兴技术,如 eBPF(Extended Berkeley Packet Filter)和 Service Mesh 也为可观测领域提供了一些新的思路: eBPF 可以在内核层面运行,通过动态注入代码来监控系统的行为。它被广泛应用于实时网络和系统性能监控、安全审计和调试等任务,并且性能影响很小,未来也可以作为 continuous profiling 的一种选择。Service Mesh 则通过在应用程序之间注入代理层实现流量管理、安全和可观测性等功能。代理层可以收集和报告有关流量的各种指标和元数据,从而帮助我们了解系统中各个组件的行为和性能。 Service Mesh 中反映出的技术趋势很大一部分已经在 RocketMQ 5.0 proxy 中得到了应用,我们也在更多地将可观测指标往 proxy 进行收敛。当前的 Trace 链路未来也在考虑和服务端一起进行关联,并打造用户侧,运维侧,跨多应用的全方位链路追踪体系。除此之外还可以将 Trace 数据与 Metrics 数据通过 Exemplars 等技术进行联动。实现面到线,线到点的终极排查效果。 在可观测领域,RocketMQ 也在不断探索和摸索更加领先的可观测手段,以帮助开发者和客户更快更省心地发现系统中的隐患。 特别感谢阿里云 SLS 团队的千乘同学和 ARMS 团队的垆皓同学在接入过程提供的帮助和支持! 相关链接 RocketMQ 5.0 客户端: OpenTelemetry Instrumentation for RocketMQ 5.0: RocketMQ OpenTelemetry 示例: 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列 RocketMQ 5.0 版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85 折优惠! 了解活动详情:
作者:艾阳坤
#行业实践 #可观测

2023年1月6日

基于 EventBridge API Destination 构建 SaaS 集成实践方案
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 目前 HTTP 的不足有以下几点: HTTP 的能力较弱,比如:授权方式单一、只支持 Body 传参、网络互通能力未对齐。只能满足客户最简单的场景。 用户无法基于 API 来统一管理(修改/下线)Target,用户体验交叉口; 对于基于 HTTP 实现的 SaaS API,无法简单快捷的引入到 EB 中,作为 Target 给用户使用。 本次新增集成中心(Integration Center)是负责 EventBridge 与外界系统对接的模块,通过抽象与配置快速获取第三方事件并将事件集成到第三方系统。并且优化现有 HTTP Sink 集成方案,为用户下游集成创造更多适配场景。 集成中心重点服务对象包括但不限于 SaaS 系统,对标 IPaaS 平台的能力提供完整的全面的通用系统集成方案。 集成源(Integration Source):指集成到 EventBridge 的第三方源; API 端点(API Destination ):指被集成到 EventBridge 的第三方 API 端点; 连接配置(Connection):是 API 端点模块的子集,与API 端点的平级资源,主要负责记录连接及配置信息,连接配置可被任意 API 端点复用。 针对市场上其他云厂商服务,EventBridge 发布了 API 端点 Sink 能力,主要作用在于承接 EventBridge 下游端数据,帮助用户快速完成下游数据集成。提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 API 端点 Sink 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 DB 产品。 具体而言,API 端点 Sink 事件目标是 EventBridge 支持的事件目标的一种,是通过 EventBridge 将数据投递至指定 Web Server 中。 API 端点 Sink 基本使用 首先现阶段 API 端点的 Sink 支持三种鉴权方式: 同时网络支持公网和专有网络(后续支持)。 1、创建 Connection 添加连接配置基本信息,并配置鉴权。 链接配置支持三种鉴权方式 : Basic 鉴权方式 : OAuth 2.0 鉴权方式: 添加授权接入点、授权请求方式、Client ID、ClientSecret 和授权相关的 Http 请求参数。 API Key 鉴权方式: 2、创建 ApiDestination API 端点配置 :配置需要访问 API 的 URL 地址和 HTTP 调用类型。 添加请求地址和请求方式: 在创建 API 端点时可以直接创建连接配置也可以选择已有的连接配置,例如上面已经创建成功的连接配置。 3、创建 Rule 创建事件规则,用于将事件投递到具体的 API 端点中。 步骤一 :点击事件规则并创建事件规则 步骤二 :是选择事件源,可以选择阿里云官方的或者选择自定义事件源,这里选择的是自定义事件源 步骤三 :第三步是选择 API 端点事件目标 支持自定义创建和使用已有,同时可以添加请求 HTTP 参数。 使用已有 使用选择已有的以后只需要添加请求 HTTP 参数即可: 选择已有的 API 端点来自于集成中心下面的 API 端点: 最佳实践 常见场景案例,比如: 用户可以把 RocketMQ 或者 RabbitMQ 的消息产品的消息动态投递到不同的 Web Server 中,这样可以让不同的 web 平台处理消息数据,实现了跨平台或者跨语言的消息流通。 用户可以把日志服务 SLS 数据投递到指定的 Web Server 或者 ELK 中,方便业务部门或者大数据平台对日志数据处理,可以更好的完善用户画像和用户行为分析,方便给用户打标签,从而可以进一步完善大数据个性化用户推荐系统。 例如下面是访问的国内外 SaaS 生态: 典型场景 :与 Buildkite 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 Buildkite 的持续集成和持续交付(CI / CD)平台进行集成。 集成产品背景描述 :Buildkite 是大型持续集成和持续交付(CI / CD)平台会有各种管理的变更、构建和作业等任务,运维人员需要快速感知、处理这些变更,以便决赛风险。 用户痛点 :构建的事件收集困难,需要手动触发构建和手动创建管道。 方案优势 :EventBridge 支持集成 Buildkite 的持续集成和持续交付平台,用户只需要简单配置即可创建和处理平台的事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建管道、创建构建和重试作业等。 文档地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与 Freshdesk 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 CRM(Freshdesk)进行集成。 集成产品背景描述 :不同的平台都需要对接 CRM(Freshdesk)管理系统。 用户痛点 :不同的平台的事件收集困难,需要用户自定义实现。 方案优势 :EventBridge 支持集成 CRM(Freshdesk)平台,用户只需要简单配置即可实现动态的创建会话、创建联系人和创建技能等事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建会话、创建联系人和创建技能等。 文档地址 : 创建 API 端点 创建事件规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与有成财务集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与有成财务进行集成 集成产品背景描述 :不同的 HR 系统或者 OA 系统需要对接有成财务时 用户痛点 :不同的系统的事件收集困难,需要用户自定义实现 方案优势 :EventBridge 支持集成有成财务平台,用户只需要简单配置即可实现动态生成报销科目和财务凭证等事件 举例介绍:比如用户想把 mns 的消息或者其他消息产品,同步到钉钉产品等接口中,或者也可以利用消息生成报销单据,可以生成报销科目和财务凭证等 地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:赵海
#行业实践 #生态集成

2022年12月14日

事件总线 + 函数计算构建云上最佳事件驱动架构应用
距离阿里云事件总线(EventBridge)和 Serverless 函数计算(Function Compute,FC)宣布全面深度集成已经过去一年,站在系统元数据互通,产品深度集成的肩膀上,这一年我们又走过了哪些历程?从事件总线到事件流,从基于 CloudEvents 的事件总线触发到更具个性化的事件流触发,函数计算已成为事件总线生态不可或缺的重要组成部分,承载了 EventBridge 系统架构中越来越多的角色,事件流基础架构的函数 Transform,基于函数计算的多种下游 Sink Connector 投递目标支持,函数作为 EventBridge 端点 API Destination;基于事件总线统一,标准的事件通道能力,和基于函数计算敏捷、轻量、弹性的计算能力,我们将又一次起航探索云上事件驱动架构的最佳实践。 今天的主题围绕事件总线+函数计算,构建云上最佳事件驱动架构应用。希望通过今天的分享,能够帮助大家深入理解 Serverless 函数计算、EventBridge 事件总线对于构建云上事件驱动架构应用的价值和背后的逻辑、 为什么函数计算是云上事件驱动服务最佳实践?为什么我们如此需要事件总线服务?伴随着这些谜题的解开,最后,让我们一起了解应用于实际生产的一些 Serverless 事件驱动客户案例。 事件驱动架构的本质 Back to the Nature of EventDriven 大家可能会疑惑,事件驱动家喻户晓,为什么我们又要重新讨论事件驱动呢?我想这也正是我们需要讨论它的原因,回归本质,重新起航;事件驱动可能是一个比较宽泛的概念,而本文聚焦事件驱动架构的讨论,事件驱动架构作为一种软件设计模式,的确不是一个新的概念,伴随着计算机软件架构的演进,它已经存在了一段很久的时间,大家对它的讨论也从未停止过,当我们需要重新讨论一个已经存在的概念的时候,我想我们有必要重新回到它最开始的定义,一起探索那些本质的东西,重新认识它。 上面的这些内容是我从相关的一些资料上摘录的关于事件驱动的一些描述,“abstract”,“simple”,“asynchronous”,“messagedriven”这些具有代表性的词汇很好的给予事件驱动一个宏观的描述;从事件驱动的抽象概念,到它简洁的架构,以及事件驱动架构要达成的目的,和它在实际的系统架构中所展现的形态。 事件驱动架构基本概念及形态 在了解了关于事件驱动架构的一些基本描述之后,我们需要进一步明确事件驱动架构所涉及的一些基本概念和架构形态。根据维基百科描述,事件驱动架构涉及的核心概念如下所示: 围绕事件的流转,根据事件驱动架构的概念和基本形态,主要涉及以下四个核心部分: Event Producer:负责产生事件,并将产生的事件投递到事件通道; Event Channel:负责接收事件,并将接收的事件持久化存储,投递给订阅该事件的后端处理引擎; Event Processing Engine:负责对于订阅的事件做出响应和处理,根据事件更新系统状态; Downstream eventdriven activity:事件处理完成之后,对于事件处理响应的一种展示; 事件驱动架构要达成的目的 了解了事件驱动架构的基本形态,架构中事件通道的引入,解耦了事件生产和事件处理这两个最基本的系统角色,那么这样的架构模型所要达成的最终目的到底是什么? 系统架构松耦合 事件生产者与事件订阅者在逻辑上是分开的。事件的生成与使用的分离意味着服务具有互操作性,但可以独立扩缩、更新和部署。 只面向事件的松散耦合可以减少系统依赖项,并允许您以不同的语言和框架实现服务。您无需更改任何一个服务的逻辑,即可添加或移除事件生成方和接收方。您无需编写自定义代码来轮询、过滤和路由事件。 系统的可伸缩性 基于事件驱动架构的松耦合特性,意味着可以独立对事件生产者,事件通道服务,以及事件处理引擎进行独立的扩缩容;尤其对于后端事件处理引擎,可以根据消息处理响应 SLA 和后端资源供给进行弹性扩缩容;同时可以基于事件粒度构建不同规格的后端处理服务,实现更细粒度的系统弹性伸缩。 系统的可扩展性 系统的可扩展性,主要表现在当系统需要增加新的功能,如何快速的基于现有系统架构快速构建支持新的业务逻辑,在事件驱动架构应用中,围绕事件粒度的处理模式,能够天然快速支持增加新的基于事件的数据流抽象;当系统中增加新的事件类型的时候,无需调整变更发布整个系统,只需要关注需要订阅的事件进行事件处理逻辑的开发和部署即可,也可以基于原来的系统做很少的代码变更即可实现,也可以在业务初期通过独立的服务定于指定事件完成特定的业务逻辑支持。 为什么函数计算是云上事件驱动服务最佳实践? 在讨论完事件驱动架构基本模型之后,我想关于事件驱动的概念,形态我们有了统一的认识和理解,接下来我们进入议题的第二个部分,为什么函数计算是云上事件驱动服务最佳实践? 函数计算简介 函数计算是一款基于事件驱动的全托管计算服务,相关的产品细节可以见官网介绍。作为一款通用的事件驱动型计算服务,接下来我会从三个方面进行详细的介绍。 编程范式 使用函数计算,用户无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为你准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等开箱即用功能,编程范式带来开发的敏捷性。按照函数粒度进行独立的功能单元开发,快速调试,快速的部署上线,省去了大量资源购买,环境搭建的运维工作;同时函数计算是一个事件驱动的模型,事件驱动,意味着用户不需要关注服务产品数据传递的问题,省去了在编写代码中涉及的大量服务访问连接的逻辑;“事件驱动” + “函数粒度开发” + “免服务器运维”等几个维度特征帮助函数计算支撑“聚焦业务逻辑敏捷开发”的底层逻辑。 计算模型 除了开发模式带来的研发效能提升之外,函数计算提供非常细粒度的计算资源和毫秒级计费模型,支撑按需计算,按量收费;能够支持按用户的请求,根据用户流量的模型为计算付费;当然按用户请求付费存在技术上巨大的挑战,要求函数计算实例的启动小于用户的 RT 要求,冷启动性能尤为重要,这时候极致弹性成为了 Serverless 按需付费,业务降本的底层技术支撑。函数计算通过“极致弹性” + “按需付费”的模型帮助 Serverless 函数计算实现真正的按需计算逻辑。 事件驱动 在基于云的开发环境,云产品承载的服务相对内聚,各自扮演着分布式系统架构中的各个重要角色,云产品之间的事件触发机制能够帮助客户更好的基于多个云产品构建自己的业务系统;否则在云产品之间 Watch 事件是非常复杂,开发代价非常昂贵的一件事;除了产品连接带来的开发效率之外,当用户订阅某个事件,并提供处理逻辑的时候,客户已经潜在的过滤掉了不需要处理的事件请求,事件驱动意味着每一次的事件触发请求都是一次完全有效的计算。 函数计算对于事件驱动架构的价值 为什么函数计算是云上最佳的事件驱动架构服务?函数计算对于事件驱动架构的核心价值到底是什么?事件驱动架构一直存在,在没有函数计算的时候,同样也有事件驱动架构,在微服务的时候也同样有事件驱动架构。如今,当我们重新再来讨论事件驱动架构的时候,到底是什么发生了变化,有哪些本质的区别?在整个事件驱动架构中,函数计算最大的价值在于帮助构建 “Event Processing Engine” 这个角色,我想主要是以下两个方面发生了本质的变化: 系统可扩展性价值 开发模式发生了本质的变化:函数计算提供的框架能力及编程模型,最大化的消除了客户业务逻辑之外的处理内容,极大的加速了客户业务开发,同时基于这样这样的开发模式,用户对于新增事件处理逻辑能够在最短的时间完成处理并上线,细粒度,专注业务的敏捷开发模式能够加速业务快速上线。 系统可伸缩性价值 计算模式发生了本质的变化:基于事件驱动架构事件粒度的处理逻辑和函数计算更细粒度力度计算弹性能力,能够从多个维度实现 “Event Processing Engine” 组件的弹性能力, 这我想这也是函数计算对于事件驱动架构的一个最核心价值。 为什么我们如此需要事件总线服务? 构建云上事件驱动架构挑战 函数计算以其轻量,快捷,能够利用事件驱动的方式与其他云产品进行联动的特点, 成为很多客户利用事件驱动架构构建业务系统的首选,随着业务及客户需求的不断增加,客户对于函数计算和更多云产品及服务的连接需求变得越来越多,同时对于其他云产品的客户而言, 也希望能够利用 Serverless 函数计算的特点帮助处理一些系统任务和事件。 尽管函数计算和云上的众多云产品进行了集成,提供了一些开箱即用的事件触发能力,那么我们为什么还需要事件总线服务来构建事件驱动应用架构呢?围绕函数计算构建事件驱动架构生态的过程中,我们面临主要来自三个方面的挑战。面对这些挑战,基于函数计算和事件总线帮助云上客户构建完备的事件驱动架构生态迫在眉睫。 事件源多样性挑战 事件驱动作为函数计算产品核心竞争力,打通函数计算和其它云产品,以及用户自定义应用,SaaS 服务的连通成为函数计算生态集成的迫切需求,但系统集成,生态建设从来都不是一件容易的事情。函数计算系统在和 EventBridge 集成之前,已经和 OSS,SLS 等用户典型场景的云产品进行了集成,也和阿里云的其它大概十多款产品进行了集成,不同系统具有不同的事件格式,不同系统的注册通知机制也各不相同,以及上游不同系统的失败处理机制也各不相同;部分系统支持同步的调用方式,部分系统支持异步的调用方式,调用方式的差异主要取决于上游系统在接入函数计算的时候当时面临的产品业务场景,对于新的产品能力和业务场景的扩展支持,在当时并未有太多的考虑。随着和更多云产品的集成,集成的投入,集成的困难度和底层数据管理难度越来越大。面对多种事件源集成的客观困难,函数计算急需提高和其他云产品的集成效率。 授权复杂及安全隐患 除此之外, 函数计算希望提升用户体验,保证用户只关心事件的处理;同时希望能够在面对大量的云产品时保证系统授权层面的复杂度。用户在使用事件触发的时候, 需要了解不同产品接入函数计算的权限要求,针对不同的产品需要提供不同的授权策略,对于客户使用函数计算带来了非常大的困难,为了加速产品接入,大量用户经常使用FullAcees权限,造成较大产品安全隐患, 和其它云产品的集成急需统一的授权界面,统一用户体验。 通用能力难以沉淀 面对上游不同的事件源, 如何更好的投递事件、更好的消费事件?如何进行事件的错误处理?函数计算调用方式如何选择?以及函数计算后端错误 Backpressure 能力的反馈、重试策略和上游系统参数设置、触发器数量的限制等问题获成为函数计算事件触发不得不面对的问题。为了更好的服务客户,提供可靠的消费处理能力,函数计算希望能够有一个统一的接入层,基于统一的接入层进行消费能力和流控能力的建设。通过沉淀在这样一个标准的层面,在保证调用灵活性的同时,提供可靠的服务质量。 事件总线简介 阿里云事件总线(EventBridge) 是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务 (SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件; 总线模式(EventBus) 从整个架构来看,EventBridge 通过事件总线,事件规则将事件源和事件目标进行连接。首先,让我们快速普及下 EventBridge 架构中涉及的几个核心概念: 事件:状态变化的记录; 事件源:事件的来源,事件的产生者,产生事件的系统和服务, 事件源生产事件并将其发布到事件总线; 事件总线:负责接收来自事件源的事件;EventBridge 支持两种类型的事件总线: 云服务专用事件总线:无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件。 自定义事件总线:标准存储态总线,用于接收自定义应用或存量消息数据的事件,一般事件驱动可选该总线。 事件规则:用于过滤,转化事件,帮助更好的投递事件; 事件目标:事件的消费者,负责具体事件的处理。 通过上面的流程,完成了事件的产生,事件的投递,事件的处理整个过程。当然事件并不是一个新的概念,事件驱动架构也不是一个新的概念,事件在我们的系统中无处不在,事件驱动架构同样伴随着整个计算机的架构演进,不断地被讨论。对于 EventBridge,采用云原生事件标准 CloudEvents 来描述事件;带来事件的标准化,这样的标准化和事件标准的开放性带来一个最显著的优势:接入的标准化,无论是对于事件源还是事件目标。 事件流模式(EventStreaming) 消息产品凭借其异步解耦、削峰填谷的特点,成为了互联网分布式架构的必要组成部分,Serverless 函数计算有着与其完全吻合的应用场景,针对消息产品生态集成,函数计算在架构层面做了专门的建设,基于 EventBridge 产品提供的 EventStreaming 通道能力建设了通用的消息消费服务 Poller Service,基于该架构对用户提供了 RocketMQ,Kafka,RabbitMQ,MNS 等多个消息类型触发能力。 将消费的逻辑服务化,从业务逻辑中剥离由平台提供,消费逻辑和处理逻辑的分离,将传统架构的消息拉模型转化成 Serverless 化的事件驱动推模型,能够支撑由函数计算承载消息处理的计算逻辑 ,实现消息处理的 Serverless 化。基于这样的架构,能够帮助客户解决消息客户端的集成连接问题,简化消息处理逻辑的实现,同时对于波峰波谷的业务模型,结合函数计算提供细粒度的计算弹性能力,能够实现资源的动态扩容,降低用户成本。 事件总线对于事件驱动架构的价值 简化统一事件源接入 沉淀通用事件通道能力 提升优化用户集成体验 利用函数计算提供的 HTTP 函数 URL 能力,结合事件总线端点 API 能力,能够快速的帮助客户进行系统扩展和集成。 客户场景案例分享 总线模式 + 函数计算用户案例 利用 ActionTrail 事件触发函数进行多账号审计管理 利用 OSS 文件上传事件触发函数扩容 ACK  集群资源 利用 OSS 文件上传执行 Terraform 文件并访问外部 API 做结果通知 事件流模式 + 函数计算用户案例 利用函数计算细粒度资源弹性特征,结合业务波峰波谷的特点,实现快速的消息清洗和处理。 事件流触发函数计算处理业务消息 事件流触发函数计算进行简单 ETL 数据同步 事件流触发函数进行简单 ETL 数据清洗入库 函数异步+事件流触发函数构建电商运营通知系统 在购物车加购,商品变更通知场景,利用函数计算异步系统(内部自带 Queue 能力),触发大量运营通知,利用函数异步的 Destination 能力将运营通知结果写入 MQ,然后利用事件流能力对 MQ 数据进行再次处理,写入HBase数据库中。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:世如
#行业实践 #事件驱动架构 #云原生

2022年11月25日

RocketMQ 的消费者类型详解与最佳实践
在 RocketMQ 5.0 中,更加强调了客户端类型的概念,尤其是消费者类型。为了满足多样的 RocketMQ 中一共有三种不同的消费者类型,分别是 PushConsumer、SimpleConsumer 和 PullConsumer。不同的消费者类型对应着不同的业务场景。 消费者类型概览 本篇文章也会根据不同的消费者类型来进行讲述。在介绍不同的消息类型之前,先明确一下不同 RocketMQ 消费者中的一个通用工作流程:在消费者中,到达客户端的消息都是由客户端主动向服务端请求并挂起长轮询获得的。为了保证消息到达的及时性,客户端需要不断地向服务端发起请求(请求是否需要由客户端主动发起则与具体的客户端类型有关),而新的符合条件的消息一旦到达服务端,就会客户端请求走。最终根据客户端处理的结果不同,服务端对消息的处理结果进行记录。 另外 PushConsumer 和 SimpleConsumer 中还会有一个 ConsumerGroup 的概念,ConsumerGroup 相当于是一组相同订阅关系的消费者的共同身份标识。而服务端也会根据 ConsumerGroup 来记录对应的消费进度。同一个 ConsumerGroup 下的消息消费者将共同消费符合当前订阅组要求的所有消息,而不是独立进行消费。相比较于 PullConsumer,PushConsumer 和 SimpleConsumer 更加适用于业务集成的场景,由服务端来托管消费状态和进度,相对来说更加的轻量与简单。 简单来说: PushConsumer :全托管的消费者类型,用户只需要注册消息监听器即可,符合对应订阅关系的消息就会调用对应的消费方法,是与业务集成最为普遍的消费者类型。 SimpleConsumer:解耦消息消费与进度同步的消费者类型,用户自主接受来自服务端的消息,并对单条消息进行消息确认。和 PushConsumer 一样也由服务端托管消费进度,适用于用户需要自主控制消费速率的业务场景。 PullConsumer:使用流处理框架进行管理的消费者类型,用户按照队列(Topic 的最小逻辑组成单位)来进行消息的接收并可以选择自动或者手动提交消费位点。 PushConsumer PushConsumer 是 RocketMQ 目前使用最为广泛的消费者。用户只需要确认好订阅关系之后,注册相对应的 Listener 即可。符合对应订阅关系的消息在由 Producer 发出后,消费者的 Listener 接口也会被即时调用,那么此时用户需要在 Listener 中去实现对应的业务逻辑。 使用简介 以下是 Push 消费者的使用示例: PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // set the consumer group name. .setConsumerGroup(consumerGroup) // set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // handle the received message and return consume result. LOGGER.info("consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); // block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // close the push consumer when you don't need it anymore. pushConsumer.close(); 用户需要根据自己业务处理结果的不同来返回 ConsumeResult.SUCCESS或者 ConsumeResult.FAILURE。当用户返回 ConsumeResult.SUCCESS时,消息则被视为消费成功;当用户返回 ConsumeResult.FAILURE时,则服务端视为消费失败,会进行该条消息的退避重试,消息的退避重试是指,在消息被消费成功之前,当前消息会被多次投递到用户注册的 MessageListener 中直到消费成功,而两次消费之间的时间间隔则是符合退避规律的。 特别的,每个 ConsumerGroup 都会有一个最大消费次数的设置,如果当前消息的消费次数超过了这个设置,则消息不会再被投递,转而被投递进入死信队列。这个消费次数在消息每次被投递到 MessageListener 时都会进行自增。譬如:如果消息的最大消费次数为 1,那么无论对于这条消息,当前是被返回消费成功还是消费失败,都只会被消费这一次。 应用场景与最佳实践 PushConsumer 是一种近乎全托管的消费者,这里的托管的含义在于用户本身并不需要关心消息的接收,而只需要关注消息的消费过程,除此之外的所有逻辑都在 Push 消费者的实现中封装掉了,用户只需要根据每条收到的消息返回不同的消费结果即可,因此也是最为普适的消费者类型。 MessageListener 是针对单条消息设计的监听器接口: / MessageListener is used only for the push consumer to process message consumption synchronously. Refer to {@link PushConsumer}, push consumer will get message from server and dispatch the message to the backend thread pool to consumer message concurrently. / public interface MessageListener { / The callback interface to consume the message. You should process the {@link MessageView} and return the corresponding {@link ConsumeResult}. The consumption is successful only when {@link ConsumeResultSUCCESS } is returned, null pointer is returned or exception is thrown would cause message consumption failure too. / ConsumeResult consume(MessageView messageView); } 绝大多数场景下,使用方应该快速处理消费逻辑并返回消费成功,不宜长时间阻塞消费逻辑。对于消费逻辑比较重的情形,建议可以先行提交消费状态,然后对消息进行异步处理。 实际在 Push 消费者的实现中,为了保证消息消费的及时性,消息是会被预先拉取客户端再进行后续的消费的,因此在客户端中存在对已拉取消息大小的缓存。为了防止缓存的消息过多导致客户端内存泄漏,也提前预留了客户端参数供使用者自行进行设置。 // 设置本地最大缓存消息数目为 16 条 pushConsumer.setMaxCachedMessageCount(16); // 设置本地最大缓存消息占用内存大小为 128 MB pushConsumer.setMaxCachedMessageSizeInBytes(128 1024 1024); SimpleConsumer 相比较 PushConsumer,SimpleConsumer 则暴露了更多的细节给使用者。在 SimpleConsumer 中,用户将自行控制消息的接收与处理。 使用简介 以下是 SimpleConsumer 的使用示例: SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // set await duration for longpolling. .setAwaitDuration(awaitDuration) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); // Max message num for each long polling. int maxMessageNum = 16; // Set message invisible duration after it is received. Duration invisibleDuration = Duration.ofSeconds(15); final List messages = consumer.receive(maxMessageNum, invisibleDuration); LOGGER.info("Received {} message(s)", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } // Close the simple consumer when you don't need it anymore. consumer.close(); 在 SimpleConsumer 中用户需要自行进行消息的拉取,这一动作通过 SimpleConsumerreceive 这个接口进行,然后再根据自己业务逻辑处理结果的不同再对拉取到的消息进行不同的处理。SimpleConsumerreceive 也是通过长轮询来接受来自服务端的消息,具体的长轮询时间可以使用 SimpleConsumerBuildersetAwaitDuration 来进行设置。 在 SimpleConsumer 中,用户需要通过 SimpleConsumerreceive 设置一个消息不重复的时间窗口(或者说关于通过这个接口收到的消息的一个不可见时间窗口),这个时间窗口从用户接受到这条消息开始计时,在这段时间之内消息是不会重复投递到消费者的,而超出这个时间窗口之后,则会对这条消息进行再一次的投递。在这个过程中,消息的消费次数也会进行递增。与 PushConsumer 类似的是,一旦消费次数超出 ConsumerGroup 的最大次数,也就不会进行重投了。 相比较于 PushConsumer 而言,SimpleConsumer 用户可以自主控制接受消息的节奏。SimpleConsumerreceive 会针对于当前的订阅关系去服务端拉取符合条件的消息。SimpleConsumer 实际上的每次消息接收请求是按照具体 Topic 的分区来 one by one 发起请求的,实际的 Topic 分区可能会比较多,因此为了保证消息接收的及时性,建议综合自己的业务处理能力一定程度上提高 SimpleConsumerreceive 的并发度。 用户在接受到消息之后,可以选择对消息使用 ack 或者 changeInvisibleDuration,前者即对服务端表示对这条消息的确认,与 PushConsumer 中的消费成功类似,而 changeInvisibleDuration 则表示延迟当前消息的可见时间,即需要服务端在当前一段时间之后再向客户端进行投递。值得注意的是,这里消息的再次投递也是需要遵循 ConsumerGroup 的最大消费次数的限制,即一旦消息的最大消费次数超出了最大消费次数(每次消息到达可见时间都会进行消费次数的自增),则不再进行投递,转而进入死信队列。举例来说: 进行 ack,即表示消息消费成功被确认,消费进度被服务端同步。 进行 changeInvisibleDuration, 1)如果消息已经超过当前 ConsumerGroup 的最大消费次数,那么消息后续会被投递进入死信队列 2)如果消息未超过当前 ConsumerGroup 的最大消费次数,若请求在上一次消息可见时间到来之前发起,则修改成功,否则则修改失败。 应用场景与最佳实践 在 PushConsumer 中,消息是单条地被投递进入 MessageListener来处理的,而在 SimpleConsumer 中用户可以同时拿到一批消息,每批消息的最大条数也由 SimpleConsumerreceive 来决定。在一些 IO 密集型的应用中,会是一个更加方便的选择。此时用户可以每次拿到一批消息并集中进行处理从而提高消费速度。 PullConsumer PullConsumer 也是 RocketMQ 一直以来都支持的消费者类型,RocketMQ 5.0 中全新的 PullConsumer API 还在演进中,敬请期待。下文中的 PullConsumer 会使用 4.0 中现存的 LitePullConsumer 进行论述,也是当前推荐的方式。 使用简介 现存的 LitePullConsumer 中的主要接口 // PullConsumer 中的主要接口 public interface LitePullConsumer { // 注册路由变化监听器 void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) throws MQClientException; // 将队列 assign 给当前消费者 void assign(Collection messageQueues); // 针对当前 assigned 的队列获取消息 List poll(long timeout); // 查找当前队列在服务端提交的位点 Long committed(MessageQueue messageQueue) throws MQClientException; // 设置是否自动提交队列位点 void setAutoCommit(boolean autoCommit); // 同步提交队列位点 void commitSync(); } 在 RocketMQ 中,无论是消息的发送还是接收,都是通过队列来进行的,一个 Topic 由若干个队列组成,消息本身也是按照队列的形式来一个个进行存储的,同一个队列中的消息拥有不同的位点,且位点的大小是随随消息达到服务端的时间逐次递增的,本质上不同 ConsumerGroup 在服务端的消费进度就是一个个队列中的位点信息,客户端将自己的消费进度同步给服务端本质上其实就是在同步一个个消息的位点。 在 PullConsumer 中将队列这个概念完整地暴露给了用户。用户可以针对自己关心的 topic 设置路由监听器从而感知队列的变化,并将队列 assign 给当前消费者,当用户使用 LitePullConsumerpoll 时会尝试获取已经 assign 好了的队列中的消息。如果设置了 LitePullConsumersetAutoCommit 的话,一旦消息达到了客户端就会自动进行位点的提交,否则则需要使用 LitePullConsumercommitSync 接口来进行手动提交。 应用场景与最佳实践 PullConsumer 中用户拥有对消息位点管理的绝对自主权,可以自行管理消费进度,这是与 PushConsumer 和 SimpleConsumer 最为本质的不同,这也使得 PullConsumer 在流计算这种需要同时自主控制消费速率和消费进度的场景能得到非常广泛的应用。更多情况下,PullConsumer 是与具体的流计算框架进行集成的。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:凌楚
#行业实践 #功能特性

2022年11月18日

RocketMQ 客户端负载均衡机制详解及最佳实践
前言 本文介绍 RocketMQ 负载均衡机制,主要涉及负载均衡发生的时机、客户端负载均衡对消费的影响(消息堆积/消费毛刺等)并且给出一些最佳实践的推荐。 负载均衡意义 上图是 RocketMQ 的消息储存模型:消息是按照队列的方式分区有序储存的。RocketMQ 的队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间都可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势。尤其是在流式处理场景下有天然优势,能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 消费者消费某个 topic 的消息等同于消费这个 topic 上所有队列的消息(上图中 Consumer A1 消费队列 1,Consumer A2 消费队列 2、3)。 所以,要保证每个消费者的负载尽量均衡,也就是要给这些消费者分配相同数量的队列,并保证在异常情况下(如客户端宕机)队列可以在不同消费者之间迁移。 负载均衡机制解析 负载均衡时机 负载均衡是客户端与服务端互相配合的过程,我们先综合服务端和客户端的职责回答第一个问题:何时会发生负载均衡。 客户端主动负载均衡   上图是 RocketMQ 客户端相关类的结构,其中 MQClientInstance 负责和服务端的交互以及底层服务的协调,这其中就包括负载均衡。 MQClientInstance 中有两个相关的方法 rebalanceImmediately 和 doRebalance,我们分析负载均衡的时机只要找到何时调用这两个方法即可: 1. 启动时立即进行负载均衡; 2. 定时(默认 20s)负载均衡一次。   服务端通知负载均衡   服务端通知客户端进行负载均衡也是通过 MQClientInstancerebalanceImmediately 方法实现的,我们同样在服务端代码中寻找相关调用。 分析以上几个方法可以得出结论,在如下场景服务端会主动通知客户端触发负载均衡: 1. 客户端上下线 上线 1. 新客户端发送心跳到服务端 下线 2. 客户端发送下线请求到服务端 3. 底层连接异常:响应 netty channel 的 IDLE/CLOSE/EXCEPTION 事件 2. 订阅关系变化:订阅新 topic 或有旧的 topic 不再订阅 负载均衡策略 前文已经介绍了负载均衡实际是变更消费者负责处理的队列数量,这里每次需要变更的队列数量和受到影响的客户端数量是由负载均衡策略决定的。 我们来分析一下比较常见的负载均衡策略: 平均分配 平均分配(AllocateMessageQueueAveragely)是默认的负载均衡策略: 如果我们有 4 个客户端,24 个队列,当第二个客户端下线时: 以默认的负载均衡策略(AllocateMessageQueueAveragely)为例,重新分配队列数量为 8。 默认的负载均衡策略能将队列尽量均衡的分配到每个客户端,但是每次负载均衡重新分配队列数量较多,尤其是在客户端数量很多的场景。 | 客户端 | 队列分配变化 | 队列数变化 | | | | | | Client1 | 1~6 1~8 | 6 8 | | Client2 | 7~12 | 6 0 | | Client3 | 13~18 9~16 | 6 8 | | Client4 | 19~24 17~24 | 6 8 | 一致性哈希 基于一致性哈希算法的负载均衡策略(AllocateMessageQueueConsistentHash)每次负载均衡会重新分配尽可能少的队列数量,但是可能会出现负载不均的情况。 | 客户端 | 队列分配变化 | 队列数变化 | | | | | | Client1 | 1~6 1~9 | 6 9 | | Client2 | 7~12 | 6 0 | | Client3 | 13~18 10~18 | 6 9 | | Client4 | 19~24 19~24 | 6 8 | 负载均衡对消费的影响 我们以一个真实的线上场景来举例: 下图中绿色的线代表发送 tps,黄色的线代表消费 tps,我们很容易发现在 21:00 和 21:50 分左右存在消费毛刺。 这两个时间点在进行应用发布,根据我们上文的分析某个消费者下线后同组的其他消费者感知这一变化需要一定时间,导致有秒级的消费延迟产生。在发布结束后消费者快速处理堆积的消息,可以发现消费速度有一个明显的上涨。 这个例子展示了下线时由于负载均衡带来了短暂的消息处理延迟,新的消费者会从服务端获取消费位点继续之前的消费进度。如果消费者异常宕机或者没有调用 shutdown 优雅下线,没有上传自己的最新消费位点,会使得新分配的消费者重复消费。 这里我们总结下负载均衡对消费的影响,当某个客户端触发负载均衡时: 1. 对于新分配的队列可能会重复消费,这也是官方要求消费要做好幂等的原因; 2. 对于不再负责的队列会短时间消费停止,如果原本的消费 TPS 很高或者正好出现生产高峰就会造成消费毛刺。  最佳实践 避免频繁上下线 为了避免负载均衡的影响应该尽量减少客户端的上下线,同时做好消费幂等。 同时在有应用重启或下线前要调用 shutdown 方法,这样服务端在收到客户端的下线请求后会通知客户端及时触发负载均衡,减少消费延迟。 选择合适的负载均衡策略 需要根据业务需要灵活选择负载均衡策略: 需要保证客户端的负载尽可能的均衡:选择默认的平均分配策略; 需要降低应用重启带来的消费延迟:选择一致性哈希的分配策略。  当然还有其他负载均衡策略由于时间关系不一一介绍了,留给读者自行探索。 保证客户端订阅一致 RocketMQ 的负载均衡是每个客户端独立进行计算,所以务必要保证每个客户端的负载均衡算法和订阅语句一致。 负载均衡策略不一致会导致多个客户端分配到相同队列或有客户端分不到队列; 订阅语句不一致会导致有消息未能消费。  RocketMQ 5.0 消息级别负载均衡 为了彻底解决客户端负载均衡导致的重复消费和消费延迟问题,RocketMQ 5.0 提出了消息级别的负载均衡机制。 同一个队列的消息可以由多个消费者消费,服务端会确保消息不重不漏的被客户端消费到: 消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。 在 4.x 的客户端中,顺序消费的实现强依赖于队列的分配。RocketMQ 5.0 在消息维度的负载均衡的基础上也实现了顺序消费的语意:不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。 如上图所述,队列 Queue1 中有 4 条顺序消息,这 4 条消息属于同一消息组 G1,存储顺序由 M1 到 M4。在消费过程中,前面的消息 M1、M2 被 消费者Consumer A1 处理时,只要消费状态没有提交,消费者 A2 是无法并行消费后续的 M3、M4 消息的,必须等前面的消息提交消费状态后才能消费后面的消息。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:玄珏
#行业实践

2022年10月31日

RocketMQ 重试机制详解及最佳实践
引言 本文主要介绍在使用 RocketMQ 时为什么需要重试与兜底机制,生产者与消费者触发重试的条件和具体行为,如何在 RocketMQ 中合理使用重试机制,帮助构建弹性,高可用系统的最佳实践。 RocketMQ 的重试机制包括三部分,分别是生产者重试,服务端内部数据复制遇到非预期问题时重试,消费者消费重试。本文中仅讨论生产者重试和消费者消费重试两种面向用户侧的实现。 生产者发送重试 RocketMQ 的生产者在发送消息到服务端时,可能会因为网络问题,服务异常等原因导致调用失败,这时候应该怎么办?如何尽可能的保证消息不丢失呢? 1. 生产者重试次数 RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。  2. 生产者重试间隔 在介绍生产者重试前,我们先来了解下流控的概念,流控一般是指服务端压力过大,容量不足时服务端会限制客户端收发消息的行为,是服务端自我保护的一种设计。RocketMQ 会根据当前是否触发了流控而采用不同的重试策略: 非流控错误场景:其他触发条件触发重试后,均会立即进行重试,无等待间隔。 流控错误场景:系统会按照预设的指数退避策略进行延迟重试。 为什么要引入退避和随机抖动?  如果故障是由过载流控引起的,重试会增加服务端负载,导致情况进一步恶化,因此客户端在遇到流控时会在两次尝试之间等待一段时间。每次尝试后的等待时间都呈指数级延长。指数回退可能导致很长的回退时间,因为指数函数增长很快。指数退避算法通过以下参数控制重试行为,更多信息,请参见 connectionbackoff.md。 INITIAL_BACKOFF:第一次失败重试前后需等待多久,默认值:1 秒; MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6; JITTER :随机抖动因子,默认值:0.2; MAX_BACKOFF :等待间隔时间上限,默认值:120 秒; MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。 ConnectWithBackoff() current_backoff = INITIAL_BACKOFF current_deadline = now() + INITIAL_BACKOFF while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS) SleepUntil(current_deadline) current_backoff = Min(current_backoff MULTIPLIER, MAX_BACKOFF) current_deadline = now() + current_backoff + UniformRandom(JITTER current_backoff, JITTER current_backoff) 特别说明:对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。 3. 重试带来的副作用 不停的重试看起来很美好,但也是有副作用的,主要包括两方面:消息重复,服务端压力增大 远程调用的不确定性,因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果;客户端进行的消息发送重试可能会导致消费方重复消费,应该按照用户ID、业务主键等信息幂等处理消息。  较多的重试次数也会增大服务端的处理压力。  4. 用户的最佳实践是什么 1)合理设置发送超时时间,发送的最大次数 发送的最大次数在初始化客户端时配置在 ClientConfiguration;对于某些实时调用类场景,可能会导致消息发送请求链路被阻塞导致业务请求整体耗时高或耗时;需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。 2)如何保证发送消息不丢失 由于分布式环境的复杂性,例如网络不可达时 RocketMQ 客户端发送请求重试机制并不能保证消息发送一定成功。业务方需要捕获异常,并做好冗余保护处理,常见的解决方案有两种: 1. 向调用方返回业务处理失败; 2. 尝试将失败的消息存储到数据库,然后由后台线程定时重试,保证业务逻辑的最终一致性。  3)关注流控异常导致无法重试 触发流控的根本原因是系统容量不足,如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议执行服务端扩容,将请求调用临时替换到其他系统进行应急处理。 4)早期版本客户端如何使用故障延迟机制进行发送重试? 对于 RocketMQ 4.x 和 3.x 以下客户端开启故障延迟机制可以用: producer.setSendLatencyFaultEnable(true) 配置重试次数使用: producer.setRetryTimesWhenSendFailed() producer.setRetryTimesWhenSendAsyncFailed() 消费者消费重试 消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,那应该怎么做呢? RocketMQ 的消息确认机制以及消费重试策略可以帮助分析如下问题: 如何保证业务完整处理消息? 消费重试策略可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息消费异常导致业务状态不一致。 业务应用异常时处理中的消息状态如何恢复? 当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,消费重试具体行为是什么。 1. 什么是消费重试? 什么时候认为消费失败? 消费者在接收到消息后将调用用户的消费函数执行业务逻辑。如果客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),只要服务端服务端一定时间内没收到响应,将认为消费失败。  消费重试是什么? 消费者在消费某条消息失败后,服务端会根据重试策略重新向客户端投递该消息。超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中;  重试过程状态机:消息在重试流程中的状态和变化逻辑;  重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间;  最大重试次数:消息可被重试消费的最大次数。   2. 消息重试的场景 需要注意重试是应对异常情况,给予程序再次消费失败消息的机会,不应该被用作常态化的链路。 推荐使用场景: 业务处理失败,失败原因跟当前的消息内容相关,预期一段时间后可执行成功; 是一个小概率事件,对于大批的消息只有很少量的失败,后面的消息大概率会消费成功,是非常态化的。   正例:消费逻辑是扣减库存,极少量商品因为乐观锁版本冲突导致扣减失败,重试一般立刻成功。 错误使用场景: 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的。  反例:订单在数据库中状态已经是已取消,此时如果收到发货的消息,处理时不应返回消费失败,而应该返回成功并标记不用发货。 消费处理中使用消费失败来做处理速率限流,是不合理的。 限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。 这种做法会让消息反复在服务端和客户端之间传递,增大了系统的开销,主要包括以下方面: RocketMQ 内部重试涉及写放大,每一次重试将生成新的重试消息,大量重试将带来严重的 IO 压力; 重试有复杂的退避逻辑,内部实现为梯度定时器,该定时器本身不具备高吞吐的特性,大量重试将导致重试消息无法及时出队。重试的间隔将不稳定,将导致大量重试消息延后消费,即削峰的周期被大幅度延长。  3. 不要以重试替代限流 上述误用的场景实际上是组合了限流和重试能力来进行削峰,RocketMQ 推荐的削峰最佳手段为组合限流和堆积,业务以保护自身为前提,需要对消费流量进行限流,并利用 RocketMQ 提供的堆积能力将超出业务当前处理的消息滞后消费,以达到削峰的目的。下图中超过处理能力的消息都应该被堆积在服务端,而不是通过消费失败进行重试。 如果不想依赖额外的产品/组件来完成该功能,也可以利用一些本地工具类,比如 Guava 的 RateLimiter 来完成单机限流。如下所示,声明一个 50 QPS 的 RateLimiter,在消费前以阻塞的方式 acquire 一个令牌,获取到即处理消息,未获取到阻塞。 RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 设置订阅组名称 .setConsumerGroup(consumerGroup) // 设置订阅的过滤器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView { // 阻塞直到获得一个令牌,也可以配置一个超时时间 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build(); 4. PushConsumer 消费重试策略 PushConsumer 消费消息时,消息的几个主要状态如下: Ready:已就绪状态。消息在消息队列RocketMQ版服务端已就绪,可以被消费者消费; Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态; Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机; DLQ:死信状态 消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试。 该消息会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。 最大重试次数   PushConsumer 的最大重试次数由创建时决定。 例如,最大重试次数为 3 次,则该消息最多可被投递 4 次,1 次为原始消息,3 次为重试投递次数。 重试间隔时间 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下: 说明:若重试次数超过 16 次,后面每次重试间隔都为 2 小时。 顺序消息:重试间隔为固定时间,默认为 3 秒。  5. SimpleConsumer 消费重试策略 和 PushConsumer 消费重试策略不同,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,可以通过 API 接口修改不可见时间。 例如,预设消息处理耗时最多 20 ms,但实际业务中 20 ms内消息处理不完,可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。 修改消息不可见时间需要满足以下条件: 消息处理未超时 消息处理未提交消费状态  如下图所示,消息不可见时间修改后立即生效,即从调用 API 时刻开始,重新计算消息不可见时间。 最大重试次数 与 PushConsumer 相同。 消息重试间隔   消息重试间隔 = 不可见时间 - 消息实际处理时长 例如:消息不可见时间为 30 ms,实际消息处理用了 10 ms 就返回失败响应,则距下次消息重试还需要 20 ms,此时的消息重试间隔即为 20 ms;若直到 30 ms 消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为 0 ms。 SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。 //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。 ClientServiceProvider provider1 = ClientServiceProvider.loadService(); String topic1 = "Your Topic"; FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG); SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder() //设置消费者分组。 .setConsumerGroup("Your ConsumerGroup") //设置接入点。 .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build()) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); List messageViewList = null; try { //SimpleConsumer需要主动获取消息,并处理。 messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30)); messageViewList.forEach(messageView { System.out.println(messageView); //消费处理完成后,需要主动调用ACK提交消费结果。 //没有ack会被认为消费失败 try { simpleConsumer.ack(messageView); } catch (ClientException e) { e.printStackTrace(); } }); } catch (ClientException e) { //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 e.printStackTrace(); } 修改消息的不可见时间   案例:某产品使用消息队列来发送解耦“视频渲染”的业务逻辑,发送方发送任务编号,消费方收到编号后处理任务。由于消费方的业务逻辑耗时较长,消费者重新消费到同一个任务时,该任务未完成,只能返回消费失败。在这种全新的 API 下,用户可以调用可以通过修改不可见时间给消息续期,实现对单条消息状态的精确控制。 simpleConsumer.changeInvisibleDuration(); simpleConsumer.changeInvisibleDurationAsync(); 6. 功能约束与最佳实践 设置消费的最大超时时间和次数   尽快明确的向服务端返回成功或失败,不要以超时(有时是异常抛出)代替消费失败。 不要用重试机制来进行业务限流  错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。 发送重试和消费重试会导致相同的消息重复消费,消费方应该有一个良好的幂等设计  正确示例:某系统中消费的逻辑是为某个用户发送短信,该短信已经发送成功了,当消费者应用重复收到该消息,此时应该返回消费成功。 总结 本文主要介绍重试的基本概念,生产者消费者收发消息时触发重试的条件和具体行为,以及 RocketMQ 收发容错的最佳实践。 重试策略帮助我们从随机的、短暂的瞬态故障中恢复,是在容忍错误时,提高可用性的一种强大机制。但请谨记 “重试是对于分布式系统来说自私的”,因为客户端认为其请求很重要,并要求服务端花费更多资源来处理,盲目的重试设计不可取,合理的使用重试可以帮助我们构建更加弹性且可靠的系统。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者: 斜阳
#行业实践 #功能特性

2022年10月24日

EventBridge 生态实践:融合 SLS 构建一体化日志服务
引言 阿里云日志服务 SLS 是一款优秀的日志服务产品,提供一站式地数据采集、加工、查询与分析、可视化、告警、消费与投递等服务。对于使用 SLS 的用户业务而言,SLS 上存储的日志信息反映着业务的运行状态,通过适当地流转加工即可创建一定价值。 另一方面,阿里云 EventBridge 作为云上事件枢纽,每天承载着大量事件的流转。云上资源的操作事件、消息队列中的数据、用户业务中的自定义事件等,是否有一站式的配置工具来将这些数据统一收敛到 SLS,进而使用 SLS 强大的加工、分析能力也是一个具有价值的问题。 为了支持上述日志、数据流入流出 SLS 的场景,阿里云 EventBridge 在近期支持了 SLS 能力。用户在 EventBridge 上通过简单地配置,即可实现数据写入 SLS 和将 SLS 中日志路由到不同的 EventBridge 目标端。EventBridge 对 SLS 的支持是全面的,用户既可以在事件总线中使用 SLS,也可以在事件流中使用。本文将从 SLS 在 EventBridge上 的使用以及若干最佳实践场景等方面,为大家介绍如何基于 EventBridge 构建 SLS 相关应用。 基于 EventBridge 使用 SLS 阿里云 SLS 日志服务 SLS[1] 是一款云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。 SLS 在 EventBridge 上的应用 阿里云 EventBridge 提供了事件总线[2]与事件流[3]两款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件经事件规则路由到多个目标。而事件流则更轻量化,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便,适用于端到端的流式数据处理场景。SLS 目前对事件总线与事件流均已支持。 针对 SLS 事件源,EventBridge 会构造一个 SLS source connector,其会实时地从 SLS 服务端拉取日志。数据拉取到 EventBridge 后,会进行一定的结构封装,保留用户日志、SLS 系统参数等数据,同时增加 event 所需要的一些系统属性。 SLS Event 样例可参考如下示例。 data 部分代表用户日志内容,其中以“__”开头和结尾的字段表示日志项的 SLS 系统属性。 { "datacontenttype": "application/json;charset=utf8", "aliyunaccountid": "1756789", "data": { "key1": "value1", "key2": "value2", "__topic__": "TopicCategory", "__source__": "SourceCategory", "__client_ip__": "122.231..", "__receive_time__": "1663487595", "__pack_id__": "59b662b2257796280" }, "subject": "acs:log:cnqingdao:1756789:project/demoproject/logstore/logstore1", "aliyunoriginalaccountid": "1756789", "source": "testSLS", "type": "sls:connector", "aliyunpublishtime": "20220918T07:53:15.387Z", "specversion": "1.0", "aliyuneventbusname": "demoBus", "id": "demoprojectlogstore11MTY2MzExODM5ODY4NjAxOTQyMw==0", "time": "20220918T07:53:12Z", "aliyunregionid": "cnqingdao", "aliyunpublishaddr": "10.50.132.112" } 针对 SLS 事件目标,EventBridge 使用 logProducer 将 event 整体作为一个字段投递到 SLS,字段 key 名称为“content”。 使用介绍 SLS 事件源   在使用 SLS 作为事件源时(这里包含了事件总线中的事件源和事件流中的事件源),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) 起始消费位点 调用角色  在创建 SLS 事件源时,EventBridge 会自动在对应 LogStore 下创建一个以“eventbridge”开头的消费组,事件源或事件流被删除时,对应消费组资源也会被清理。 日志项目与日志库参数,用户根据已创建的 Project 和 LogStore 去填写即可。 起始消费位点参数指定了新任务启动时的初始消费位点。这里可以选择“最早位点”、“最新位点”与“指定时间”。“最早位点”即从当前 LogStore 中最早的日志开始消费,会导致大量历史日志被读取,建议结合业务谨慎选择;“最新位点”则表示消费对应 EventBridge 任务启动后的日志;“指定时间”需要用户填写时间戳(以秒为单位),消费从此时刻开始的日志。 针对调用角色,其实是允许 EventBridge 以这个角色的身份去调用读取用户 SLS 日志。用户需要创建一个自定义角色,并将其授信给事件总线 EventBridge。角色的权限方面则可以按照需要去进行设置,在权限最小的原则基础上,权限策略提供的角色应保证事件总线 EventBridge 可以读取对应 LogStore 日志与消费组的增删操作,至少赋予角色 LogStore 消费权限与消费组的增删操作。参考示例: { "Version": "1", "Statement": [ { "Action": [ "log:ListShards", "log:GetCursorOrData", "log:GetConsumerGroupCheckPoint", "log:UpdateConsumerGroup", "log:ConsumerGroupHeartBeat", "log:ConsumerGroupUpdateCheckPoint", "log:ListConsumerGroup", "log:CreateConsumerGroup", "log:DeleteConsumerGroup" ], "Resource": [ "acs:log:::project//logstore/", "acs:log:::project//logstore//" ], "Effect": "Allow" } ] } SLS 事件目标   在使用 SLS 作为事件目标时(这里包含了事件总线中的事件目标和事件流中的事件目标),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) Topic 调用角色  日志项目、日志库参数含义同 SLS 事件源。Topic 即 SLS 日志主题,用户可以根据需要进行设置,非必填内容。 在创建 SLS 事件目标时,确保使用的调用角色有写入给定日志库权限即可。参考示例: { "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:::project//logstore/" ] } ] } 使用示例 SLS 事件源和事件目标,其事件总线与事件流的参数配置相同,这里示例了如何创建  SLS 事件源和事件目标的 EventBridge 事件流。 前期准备   1. 开通 EventBridge 服务; 2. 开通 SLS 服务并创建 Project 与 Store。 创建 SLS 事件源   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“日志服务 SLS”; 4. 在“日志服务 SLS”一栏中选配置 SLS Project、LogStore、起始消费位点与角色配置。 创建 SLS 事件目标   1. 在创建事件流的事件目标时,服务类型选择“日志服务”; 2. 配置 SLS Project、LogStore、日志主题、日志内容、角色配置等参数。 3. 保存启动即可创建事件流。 最佳实践示例 异步架构完备性校验 在使用消息队列搭建异步应用架构时,会偶发遇到消息丢失的情况,这种情况下的问题排查通常较为麻烦,需要确定问题到底是出在发送端、消费端还是消息队列上,这种场景可以使用 SLS + EventBridge 来进行相关预警和现场保留。 1. 业务 1 发送消息到消息队列,业务 2 异步消费 MQ 中的消息,实现架构解耦; 2. 消息发送端和消费端,在完成消费发送、消费的相关操作后,均将操作日志打印出来,并采集到 SLS 上,日志中可以包含消息 ID 等字段以确保可溯源; 3. 配置 EventBridge 事件流,事件提供方为 SLS,事件接收方为函数计算 FC; 4. FC 中的服务读取 SLS 中日志内容,若发现针对某条消息,若仅有发送日志无消费日志,则说明可能存在漏消息的可能性,需要相关人员及时介入排查。 异常业务异步处理 部分消息队列如 RocketMQ 有死信队列能力,当用户消费失败达到一定次数时,消息会被投递到死信队列。用户也可以使用 SLS + EventBridge 构建业务死信队列,以完成对异常情况的处理。 例如下图是一个电商平台的订单处理系统,当订单处理成功时,相关信息会被写入 DB 或者进行后续操作。但如果订单处理异常用户又不想要阻塞现有订单处理流程,则可以将处理异常订单的流程异步处理。 1. 用户下单/付款,订单系统进行业务处理,处理成功则将数据变更写入 DB; 2. 订单处理异常,记录相关信息日志; 3. 搭建 EventBridge 事件规则。事件源为 SLS,事件目标为函数计算 FC; 4. 当有异常业务日志产生时,日志内容被 SLS 事件源拉取,随后投递到 FC,由专门的服务来处理异常订单。当然,在架构设计时也可以将异常订单信息直接投递到函数计算,但对于大部分业务系统而言,当有异常出现时通常都会进行相关日志的打印,即异常日志大概率是存在的,这个时候使用 SLS + EventBridge 则无需再使用函数计算的发送客户端,仅按需打印日志即可,对业务的侵入性更小。 消息备份 目前阿里云上的消息队列产品种类丰富,用户在使用消息队列实现业务解耦的同时,也会产生对消息内容进行加工分析的需求。SLS 拥有强大的数据加工能力,使用 EventBridge 将消息路由到 SLS,在实现消息备份的同时也可以利用 SLS 的分析加工能力来提升业务的可观测性。 1. 搭建 EventBridge 事件流。事件提供方为各种云上消息队列,事件目标方为日志服务 SLS; 2. 使用 SLS 的能力完成消息的加工、查询、分析与可视化。 自建 SQL 审计 目前 EventBridge 已经支持了 DTS 作为事件源的能力,使用 EventBridge 可以轻松实现构建自定义 SQL 审计的需求。 1. 用户新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 _相关链接_ _[1] 日志服务SLS_ _[2] 事件总线_ _[3] 事件流_ 感 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#行业实践 #生态集成