2023年4月11日

RocketMQ 多级存储设计与实现
设计总览 RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景: 1. 冷热数据分离:RocketMQ 新近产生的消息会缓存在 page cache 中,我们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。如果有少许消费者尝试消费冷数据就会从硬盘中重新加载冷数据到 page cache,这会导致读写 IO 竞争并挤压 page cache 的空间。而将冷数据的读取链路切换为多级存储就可以避免这个问题; 2. 延长消息保留时间:将消息卸载到更大更便宜的存储介质中,可以用较低的成本实现更长的消息保存时间。同时多级存储支持为 topic 指定不同的消息保留时间,可以根据业务需要灵活配置消息 TTL。 RocketMQ 多级存储对比 Kafka 和 Pulsar 的实现最大的不同是我们使用准实时的方式上传消息,而不是等一个 CommitLog 写满后再上传,主要基于以下几点考虑: 1. 均摊成本:RocketMQ 多级存储需要将全局 CommitLog 转换为 topic 维度并重新构建消息索引,一次性处理整个 CommitLog 文件会带来性能毛刺; 2. 对小规格实例更友好:小规格实例往往配置较小的内存,这意味着热数据会更快换出成为冷数据,等待 CommitLog 写满再上传本身就有冷读风险。采取准实时上传的方式既能规避消息上传时的冷读风险,又能尽快使得冷数据可以从多级存储读取。 Quick Start 多级存储在设计上希望降低用户心智负担:用户无需变更客户端就能实现无感切换冷热数据读写链路,通过简单的修改服务端配置即可具备多级存储的能力,只需以下两步: 1. 修改 Broker 配置,指定使用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn 2. 配置你想使用的储存介质,以卸载消息到其他硬盘为例:配置 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,同时指定新储存的文件路径:tieredStoreFilepath 可选项:支持修改 tieredMetadataServiceProvider 切换元数据存储的实现,默认是基于 json 的文件存储 更多使用说明和配置项可以在 GitHub 上查看多级存储的 技术架构 architecture 接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher 接入层实现 MessageStore 中的部分读写接口,并为他们增加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别实现了多级存储的上传/下载逻辑,相比于底层接口这里做了较多的性能优化:包括使用独立的线程池,避免慢 IO 阻塞访问热数据;使用预读缓存优化性能等。 容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue 容器层实现了和 DefaultMessageStore 类似的逻辑文件抽象,同样将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都通过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。 驱动层:TieredFileSegment 驱动层负责维护逻辑文件到物理文件的映射,通过实现 TieredStoreProvider 对接底层文件系统读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的实现,可以将数据转移到其他硬盘或通过 fuse 挂载的对象存储上。 消息上传 RocketMQ 多级存储的消息上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每当有消息发送到 Broker 会调用 TieredDispatcher 进行消息分发,TieredDispatcher 将该消息写入到 upload buffer 后立即返回成功。整个 dispatch 流程中不会有任何阻塞逻辑,确保不会影响本地 ConsumeQueue 的构建。 TieredDispatcher TieredDispatcher 写入 upload buffer 的内容仅为消息的引用,不会将消息的 body 读入内存。因为多级储存以 queue 维度构建 CommitLog,此时需要重新生成 commitLog offset 字段 upload buffer 触发 upload buffer 上传时读取到每条消息的 commitLog offset 字段时采用拼接的方式将新的 offset 嵌入到原消息中 上传进度控制 每个队列都会有两个关键位点控制上传进度: 1. dispatch offset:已经写入缓存但是未上传的消息位点 2. commit offset:已上传的消息位点 upload progress 类比消费者,dispatch offset 相当于拉取消息的位点,commit offset 相当于确认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的消息 消息读取 TieredMessageStore 实现了 MessageStore 中的消息读取相关接口,通过请求中的逻辑位点(queue offset)判断是否从多级存储中读取消息,根据配置(tieredStorageLevel)有四种策略: DISABLE:禁止从多级存储中读取消息; NOT_IN_DISK:不在 DefaultMessageStore 中的消息从多级存储中读取; NOT_IN_MEM:不在 page cache 中的消息即冷数据从多级存储读取; FORCE:强制所有消息从多级存储中读取,目前仅供测试使用。 ${a} 需要从多级存储中读取的消息会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)发起拉取请求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取消息。 ${b} TieredFileSegment 维护每个储存在文件系统中的物理文件位点,并通过为不同存储介质实现的接口从中读取所需的数据。 ${c} 预读缓存 TieredMessageFetcher 读取消息时会预读一部分消息供下次使用,这些消息暂存在预读缓存中 ${d} 预读缓存的设计参考了 TCP Tahoe 拥塞控制算法,每次预读的消息量类似拥塞窗口采用加法增、乘法减的机制控制: 加法增:从最小窗口开始,每次增加等同于客户端 batchSize 的消息量。 乘法减:当缓存的消息超过了缓存过期时间仍未被全部拉取,在清理缓存的同时会将下次预读消息量减半。 预读缓存支持在读取消息量较大时分片并发请求,以取得更大带宽和更小的延迟。 某个 topic 消息的预读缓存由消费这个 topic 的所有 group 共享,缓存失效策略为: 1. 所有订阅这个 topic 的 group 都访问了缓存 2. 到达缓存过期时间 故障恢复 上文中我们介绍上传进度由 commit offset 和 dispatch offset 控制。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中恢复,继续从 commit offset 开始上传消息,之前缓存的消息会重新上传并不会丢失。 开发计划 面向云原生的存储系统要最大化利用云上存储的价值,而对象存储正是云计算红利的体现。 RocketMQ 多级存储希望一方面利用对象存储低成本的优势延长消息存储时间、拓展数据的价值;另一方面利用其共享存储的特性在多副本架构中兼得成本和数据可靠性,以及未来向 Serverless 架构演进。 tag 过滤 多级存储拉取消息时没有计算消息的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端增加 tag 过滤能力。 广播消费以及多个消费进度不同的消费者 预读缓存失效需要所有订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进度不一致的情况下很难触发,导致无用的消息在缓存中堆积。 需要计算出每个 group 的消费 qps 来估算某个 group 能否在缓存失效前用上缓存的消息。如果缓存的消息预期在失效前都不会被再次访问,那么它应该被立即过期。相应的对于广播消费,消息的过期策略应被优化为所有 Client 都读取这条消息后才失效。 和高可用架构的融合 目前主要面临以下三个问题: 1. 元数据同步:如何可靠的在多个节点间同步元数据,slave 晋升时如何校准和补全缺失的元数据; 2. 禁止上传超过 confirm offset 的消息:为了避免消息回退,上传的最大 offset 不能超过 confirm offset; 3. slave 晋升时快速启动多级存储:只有 master 节点具有写权限,在 slave 节点晋升后需要快速拉起多级存储断点续传。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:张森泽
#技术探索 #云原生

2023年3月28日

RocketMQ x OpenTelemetry 分布式全链路追踪最佳实践
作者简介:艾阳坤,Apache RocketMQ PMC Member/Committer,CNCF OpenTelemetry Member,CNCF Envoy contributor。 在分布式系统中,多个服务之间的交互涉及到复杂的网络通信和数据传输,其中每个服务可能由不同的团队或组织负责维护和开发。因此,在这样的环境下,当一个请求被发出并经过多个服务的处理后,如果出现了问题或错误,很难快速定位到根因。分布式全链路追踪技术则可以帮助我们解决这个问题,它能够跟踪和记录请求在系统中的传输过程,并提供详细的性能和日志信息,使得开发人员能够快速诊断和定位问题。对于分布式系统的可靠性、性能和可维护性起到了非常重要的作用。 RocketMQ 5.0 与分布式全链路追踪 Apache RocketMQ 5.0 版本作为近几年来最大的一次迭代,在整个可观测性上也进行了诸多改进。其中,支持标准化的分布式全链路追踪就是一个重要的特性。 RocketMQ 5.0 可观测 而由 Google、Microsoft、Uber 和 LightStep 联合发起的 CNCF OpenTelemetry 作为 OpenTracing 和 OpenCensus 的官方继任者,已经成为可观测领域的事实标准,RocketMQ 的分布式全链路追踪也围绕 OpenTelemetry 进行展开。 分布式链路追踪系统的起源可以追溯到 2007 年 Google 发布的论文。这篇论文详细介绍了 Google 内部使用的链路追踪系统 Dapper,其中使用的 span 概念被广泛采用,并成为后来开源链路追踪系统中的基础概念之一。 Dapper Trace Tree 在 Dapper 中,每个请求或事务被追踪时都会创建一个 span,记录整个请求或事务处理过程中的各个组件和操作的时间和状态信息。这些 span 可以嵌套,形成一个树形结构,用于表示整个请求或事务处理过程中各个组件之间的依赖关系和调用关系。后来,很多开源链路追踪系统,如 Zipkin 和 OpenTracing,也采用了类似的 span 概念来描述分布式系统中的链路追踪信息。现在,合并了 OpenTracing 和 OpenCensus 的 CNCF OpenTelemetry 自然也一样采用了 span 概念,并在此基础上进行了进一步发展。 OpenTelemetry 为 messaging 相关的 span 定义了,旨在制定一套与特定消息系统无关的 specification,而 OpenTelmetry 自身的开发其实也都是由 specification 驱动进行展开。 Specification Driven Development Messaging Span 定义 Specifaition 中描述了 messaging span 的拓扑关系,包括代表消息发送、接收和处理的不同 span 之间的父子和链接关系。关于具体的定义可以参考:。对应到 RocketMQ 中,有三种不同的 span: | Span | Description | | | | | send | 消息的发送过程。span 以一次发送行为开始,成功或者失败/抛异常结束。消息发送的内部重试会被记录成多条 span。 | | receive | 消费者中接收消息的长轮询过程,与长轮询的生命周期保持一致。 | | process | 对应 PushConsumer 里 MessageListener 中对消息的处理过程,span 以进入 MessageListener 为开始,离开 MessageListener 为结束。 | 特别地,默认情况下,receive span 是不启用的。在 receive span 启用和不启用的两种情况下,span 之间的组织关系是不同的: 启用 receive span 前后的 span 关系 在没有启用 receive span 的情况下,process span 会作为 send span 的 child;而当 receive span 启用的情况下,process span 会作为 receive span 的 child,同时 link 到 send span。 Messaging Attributes 定义 语义约定中规定了随 span 携带的通用属性的统一名称,这包括但不限于: messaging.message.id: 消息的唯一标识符。 messaging.destination:消息发送的目的地,通常是一个队列或主题名称。 messaging.operation:对消息的操作类型,例如发送、接收、确认等。 具体可以查看 。 特别地,不同的消息系统可能会有自己特定的行为和属性,,这包括: | Attribute | Type | Description | | | | | | messaging.rocketmq.namespace | string | RocketMQ 资源命名空间,暂未启用 | | messaging.rocketmq.client_group | string | RocketMQ producer/consumer 负载均衡组,5.0 只对 consumer 生效 | | messaging.rocketmq.client_id | string | 客户端唯一标识符 | | messaging.rocketmq.message.delivery_timestamp | int | 定时消息定时时间,只对 5.0 生效 | | messaging.rocketmq.message.delay_time_level | int | 定时消息定时级别,只对 4.0 生效 | | messaging.rocketmq.message.group | string | 顺序消息分组,只对 5.0 生效 | | messaging.rocketmq.message.type | string | 消息类型,可能为 normal/fifo/delay/transaction,只对 5.0 生效 | | messaging.rocketmq.message.tag | string | 消息 tag | | messaging.rocketmq.message.keys | string[] | 消息 keys,可以有多个 | | messaging.rocketmq.consumption_model | string | 消息消费模型,可能为 clustering/broadcasting,5.0 broadcasting 被废弃 | 快速开始 在 OpenTelemetry 中有两种不同的方式可以为应用程序添加可观测信息: Automatic Instrumentation:无需编写任何代码,只需进行简单的配置即可自动生成可观测信息,包括应用程序中使用的类库和框架,这样可以更方便地获取基本的性能和行为数据。 Manual Instrumentation:需要编写代码来创建和管理可观测数据,并通过 exporter 导出到指定的目标。这样可以更灵活自由地控制用户想要观测的逻辑和功能。 在 Java 类库中,前者是一种更为常见的使用形式。RocketMQ 5.0 客户端的 trace 也依托于 automatic instrumentation 进行实现。在 Java 程序中,automatic instrumentation 的表现形式为挂载 Java agent。在过去的一年里,我们将 推入了 OpenTelemetry 官方社区。现在,只需要在 Java 程序运行时挂载上 OpenTelemetry agent,即可实现对应用程序透明的分布式全链路追踪。 除此之外,Automatic Instrumentation 和 Manual Instrumentation 并不冲突,Automatic Instrumentation 中所使用的关键对象会被注册成全局对象,在 Manual Instrumentation 的使用方式中也可以非常方便的获取。实现两个 Instrumentation 共用一套配置,非常灵活和方便。 首先准备好 RocketMQ 5.0 Java 客户端,可以参考 进行消息的收发。关于 RocketMQ 5.0 的更多细节,欢迎大家参考和关注 和 。 然后准备好 OpenTelemetry agent jar,可以从 OpenTelemetry 官方,在应用程序启动时增加 javaagent:yourpath/opentelemetryjavaagent.jar 即可。可以通过设置 OTEL_EXPORTER_OTLP_ENDPOINT 环境变量来设置 OpenTelemetry collector 的接入点。 默认情况下,按照 OpenTelemetry 中关于 messaging 的规范,只有 send 和 process 的 span 会被启用,receive 的 span 是默认不启用的,如果想要启用 receive span,需要手动设置 Dotel.instrumentation.messaging.experimental.receivetelemetry.enabled=true。 场景最佳实践 目前,主流的云服务供应商都为 OpenTelemetry 提供了良好的支持,阿里云上的 SLS 和 ARMS 两款可观测产品都提供了基于 OpenTelemetry 的分布式全链路追踪服务。 为了更好地展示分布式全链路追踪的过程,这里提供了一个代码示例: 。在这个代码示例中,会启动三个不同的进程,涉及三种不同类库和业务逻辑之间的相互调用,展示了一个在分布式环境较复杂中间件之间进行交互的典型案例。 请求首先会从 gRPC 客户端发往 gRPC 服务端,在 gRPC 服务端收到请求之后,会向 RocketMQ 5.0 的 Producer 往服务端发送一条消息,然后再回复对应的 response 给客户端。在 RocketMQ 5.0 的 PushConsumer 接受到消息之后,会在 MessageListener 中使用 Apache HttpClient 往淘宝网发送一条 GET 请求。 示例代码调用链路 特别地,gRPC 客户端在发起具体的调用是在一个上游业务 span 的生命周期之内进行的,这个 span 我们称之为 ExampleUpstreamSpan,RocketMQ 5.0 PushConsumer 在收到消息之后,也会在 MessageListener 里执行其他的业务操作,也会有对应的 span,我们称之为 ExampleDownstreamSpan。那么默认在 receive span 没有启用的情况下,按照开始时间的顺序,会先后存在 7 个 span。分别是: ExampleUpstreamSpan。 gRPC 客户端请求 span。 gRPC 服务端响应 span。 RocketMQ 5.0 Producer 的 send span。 RocketMQ 5.0 Producer 的 process span。 HTTP 请求 span。 ExampleDownstreamSpan。 RocketMQ 5.0 对接 SLS Trace 服务 首先在阿里云日志服务中创建 Trace 服务。然后获取接入点,项目和实例名称等信息,具体可以参考。 在补充好信息之后完成接入之后,稍等一会就可以看到对应的 Trace 信息已经被上传到 SLS trace 服务中: SLS Trace 服务分布式全链路展示 Trace 服务其实是将相关数据存储到日志中,因此这些数据也可以通过 SLS 的 SQL 语法查询得到。 通过 Trace 数据,我们可以很方便知道用户的操作系统环境,Java 版本等一系列基础信息。消息的发送延时,失败与否,消息是否准时投递到了客户端,以及客户端本地消费耗时,消费失败与否等一系列有效信息,可以帮助我们十分有效地进行问题排查。 除此之外,SLS Trace 服务的 demo 页也提供了基于 RocketMQ 5.0 定制的消息中间件大盘,生动展示了利用 Trace 数据得到的发送成功率,端到端延时等一系列指标。 :展示利用 Trace 数据得到的包括发送延时、发送成功率、消费成功率、端到端延时在内的一系列指标。 :可以根据上一步得到的差错长 message id 进行进一步的细粒度查询。 消息中间件分析 RocketMQ 5.0 对接应用实时监控服务(ARMS) 首先进入应用实时监控服务 ARMS 控制台,点击接入中心中的 OpenTelemetry,选择 java 应用程序下的自动探测,获取启动参数并修改至自己的 java 应用程序,具体可以参考。 配置好参数之后,启动自己的相关应用程序,稍等一会儿,就可以在 ARMS Trace Explorer 里看到对应的数据了。 Trace Explorer 还可以查看 span 之间的时序关系。 ARMS Trace Explorer 分布式全链路追踪展示 具体地,可以点进每个 span 查看详细的 attributes/resources/events 等信息。除此之外,ARMS 还支持通过使用 OpenTelemetry Collector 转发的形式来收集应用程序的 Trace 数据。 趋势与思考 随着现代应用程序架构的不断演进,可观测性的重要性日益凸显。它不仅可以帮助我们快速发现和解决系统中的问题,还提高应用程序的可靠性和性能,同时也是实现 DevOps 的关键部分。在相关领域,也陆续诞生了像 DataDog 和 Dynatrace 这样的明星公司。 近年来涌现了一些新兴技术,如 eBPF(Extended Berkeley Packet Filter)和 Service Mesh 也为可观测领域提供了一些新的思路: eBPF 可以在内核层面运行,通过动态注入代码来监控系统的行为。它被广泛应用于实时网络和系统性能监控、安全审计和调试等任务,并且性能影响很小,未来也可以作为 continuous profiling 的一种选择。Service Mesh 则通过在应用程序之间注入代理层实现流量管理、安全和可观测性等功能。代理层可以收集和报告有关流量的各种指标和元数据,从而帮助我们了解系统中各个组件的行为和性能。 Service Mesh 中反映出的技术趋势很大一部分已经在 RocketMQ 5.0 proxy 中得到了应用,我们也在更多地将可观测指标往 proxy 进行收敛。当前的 Trace 链路未来也在考虑和服务端一起进行关联,并打造用户侧,运维侧,跨多应用的全方位链路追踪体系。除此之外还可以将 Trace 数据与 Metrics 数据通过 Exemplars 等技术进行联动。实现面到线,线到点的终极排查效果。 在可观测领域,RocketMQ 也在不断探索和摸索更加领先的可观测手段,以帮助开发者和客户更快更省心地发现系统中的隐患。 特别感谢阿里云 SLS 团队的千乘同学和 ARMS 团队的垆皓同学在接入过程提供的帮助和支持! 相关链接 RocketMQ 5.0 客户端: OpenTelemetry Instrumentation for RocketMQ 5.0: RocketMQ OpenTelemetry 示例: 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列 RocketMQ 5.0 版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85 折优惠! 了解活动详情:
作者:艾阳坤
#行业实践 #可观测

2023年1月13日

RocketMQ 集成生态再升级:轻松构建云上数据管道
阿里云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,面向互联网分布式应用场景提供微服务异步解耦、流式数据处理、事件驱动处理等核心能力。其自诞生以来一直为阿里集团提供稳定可靠的消息服务,历经多年双十一万亿级流量洪峰的验证。 随着业务需求场景日渐丰富,在多年经验积累后,阿里云 RocketMQ 也迎来了革命性的更新,正式发布了阿里云消息队列 RocketMQ 版 5.0,在架构、网络、负载均衡、存储等诸多方面进行了显著优化。其定位不再局限于消息解耦场景,将全新布局事件驱动和消息流式处理场景。 阿里云 EventBridge 作为云上事件枢纽一直以来都保持着对云上事件、数据的友好生态支持。随着 RocketMQ 5.0版本的用户日渐增多,EventBridge 在近期对 RocketMQ Connector 进行了全面升级。升级之后的 RocketMQ Connector 不仅可以支持RocketMQ 5.0 版本,同时也能支持云上自建 RocketMQ 实例。除此之外,基于成熟的事件流能力,用户使用 EventBridge 也能轻松构建消息路由能力,实现对灾备、数据同步的需求。 本文将从业务架构和 API 使用等方面讲解如何使用 EventBridge 创建阿里云 RocketMQ 4.0、5.0 版本,开源自建版本以及消息路由的相关任务。 EventBridgeRocketMQ 4.0 业务架构 RocketMQ 4.0 版本使用较为经典的 clientnameserverbroker 架构,整个应用主要由生产者、消费者、NameServer 和 Broker 组成。 Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。 Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。 生产者:与 Name Server 集群中的其中一个节点(随机)建立长连接(Keepalive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长连接,且定时向 Master Broker 发送心跳。 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从  Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。 EventBridge在获取用户授权之后,利用生成的 sts 临时授权对客户的  RocketMQ 实例进行消息读取或写入。 API 使用 在 API 介绍方面,我们以创建「自定义总线自定义事件源」为例,事件目标以及事件流中的API基本一致。 基于 EventBridge 创建 RocketMQ 4.0 任务的 API 和之前基本保持了一致。具体参数如下 版本:代表阿里云消息队列 RocketMQ 版本,可选择 4.x 或 5.x; RocketMQ 实例:RocketMQ 对应的实例 ID。用户在阿里云 RocketMQ控制台每创建一个实例都会有一个对应的实例 ID,如MQ_INST_123456789_BX6zY7ah; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; 消费位点:初始消费位点。可选择最新位点、最早位点、或者指定时间戳。 EventBridgeRocketMQ 5.0 业务架构 RocketMQ 5.0 版将通用的存储逻辑下沉,集中解决消息存储的多副本、低延迟、海量队列分区等技术问题,将上层的消息处理剥离出完全的无状态计算层,主要完成协议适配、权限管理、消费状态、可观测运维体系支持,Broker 则继续专注于存储能力的持续优化。存算分离的架构设计,使得从 SDK 接入到线上运维全链路带来全面提升: 1. 轻量版 SDK 的开放和全链路可观测系统的提升:同时支持 4.x 通信协议和全新的 gRPC 通信协议,并内置 OpenTelemetry 埋点支持,新版本 SDK 新增了 10 余个指标埋点。 2. 消息级负载均衡:新版本 SDK 不再参与实际存储队列的负载均衡,消息负载均衡将更加轻量,以单条消息为调度最小单元。 3. 多网络访问支持:新版本支持单一实例同时暴露公网、内网等访问形式,方便客户多网络接入访问。 4. 海量分级存储:新版本开放分级存储历史消息保存能力,消息低成本无大小限制,最长保存 30 天。冷热数据进行分离设计,极大降低消费历史消息对实例的性能影响。 RocketMQ 5.0 版本 可以支持 VPC 内部安全识别,用户上云无需修改代码。在用户授予 EventBridge 网络和 RocketMQ 相关权限之后,用户在 EventBridge 创建 MQ 5.0 Source&Sink 任务的时,EventBridge 会根据 RocketMQ 5.0 实例的 VPC 信息,调用网络组件获取相应代理信息。MQ sdk 侧通过配置代理实现消息的收发。 API 使用 相比于 4.0 实例,5.0 实例多了 VPC、交换机和安全组 3 个参数。 5.0 实例新增了 VPC 属性,用户需要在对应 vpc 内去访问 MQ 5.0 实例。EventBridge 在获得用户授权之后,也是经由 5.0 实例对应的 VPC 内进行消息的收发。创建任务时前端会自动填充好实例的 vpc 和交换机信息。 安全组参数限制了 EventBridge 在 vpc 内的访问策略,用户可以选择使用已有安全组也可以选择快速创建,让 EventBridge 快速创建一个安全组供任务使用。安全组策略推荐使用默认的安全组策略。使用上推荐第一次在此vpc内创建任务时,使用 EventBridge 自动创建一个安全组,后续在此 VPC 内再创建其他任务时,在使用已有中选择 EventBridge 创建的安全组。 EventBridge自建 Apache RocketMQ 针对用户在阿里云自建 Apache RocketMQ 集群的场景,EventBridge 也支持了消息导出能力。用户通过配置接入点、topic、groupID、VPC 等信息,即可将自建集群中的消息导入 EventBridge,进而对接 EventBridge 目前支持的大量下游生态。 业务架构 抽象来看,EventBridge 访问自建 MQ 实例的链路和阿里云 5.0 版本基本一致,都是从用户 vpc 发起对 MQ 实例的访问。区别在于接入点的不同,前者是用户自建 MQ 集群的nameserver,而后者为阿里云 RocketMQ 提供的接入点,不需要感知真实的 MQ 集群是部署在用户 vpc 还是阿里云 RocketMQ 自身的生产环境。 API 使用 在 API 使用方面,自建集群的大部分参数需要用户手动填入。 接入点:nameserver 地址。后续会支持 proxy 地址; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; FilterType:过滤模式,目前支持 Tag 过滤; 认证模式:如果开启 ACL 鉴权,可在此配置鉴权信息; 消费位点:初始消费位点; VPC:自建 MQ 集群对应的 VPC 参数信息; 交换机:自建 MQ 集群对应的交换机信息; 安全组:EventBridge使用此安全组访问用户自建 MQ 集群,安全组规定了 EventBridge 在此 vpc 内的访问策略。 RocketMQ 消息路由 当用户有灾备或者消息同步的需求时,可能就会需要消息路由能力,即将 A region 下某实例 topic 的消息同步到 B region 的某 topic 中。 对于 EventBridge 而言,消息路由并非单独的一个产品能力,用户通过使用事件流即可实现消息路由。 针对非跨境场景的消息路由,如从北京同步消息到上海,跨 region 网络打通能力由 EventBridge 来实现,用户无需关注过多实现细节。 针对跨境场景,如北京同步消息到新加坡,EventBridge 使用的是公网链路完成对目标实例的写入,使用的是目标 MQ 实例的公网接入点。消息出公网的能力需要用户提供,即需要用户提供 VPC、交换机和安全组配置,此VPC须带有NAT等访问公网能力, EventBridge 使用此 VPC 实现写入目标端公网接入点。 在 API 使用方面,创建消息路由任务本质上是创建事件流,API 参数和上面各类型 RocketMQ 实例任务一致,这里以创建一个青岛到呼和浩特的 RocketMQ 消息路由为例。 1.进入 EventBridge 控制台,regionBar 选择到呼和浩特,点击左侧“事件流”,然后选择“创建事件流”。 2.在事件源页面,事件提供方选择“消息队列 RocketMQ 版”,地域选择青岛,剩余 RocketMQ 相关参数按需求选择。 3.规则页面按需填写,这里选择默认内容。 4.在“目标”页面,服务类型选择“消息队列 RocketMQ 版”,剩余参数按需填写。 5.点击“创建”,等待事件流任务启动即可。 总结 本文介绍了 EventBridge 对接各类型 RocketMQ 实例的基本原理与对应的 API 使用说明,便于已经使用了 RocketMQ 5.0 版本和自建 MQ 实例的用户可以借助 EventBridge 的能力实现事件驱动业务架构的搭建。同时针对灾备和业务消息同步的场景,本文也基于事件流讲解了如何基于 EventBridge 创建 RocketMQ 消息路由任务。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#技术探索 #生态集成

2023年1月6日

基于 EventBridge API Destination 构建 SaaS 集成实践方案
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 目前 HTTP 的不足有以下几点: HTTP 的能力较弱,比如:授权方式单一、只支持 Body 传参、网络互通能力未对齐。只能满足客户最简单的场景。 用户无法基于 API 来统一管理(修改/下线)Target,用户体验交叉口; 对于基于 HTTP 实现的 SaaS API,无法简单快捷的引入到 EB 中,作为 Target 给用户使用。 本次新增集成中心(Integration Center)是负责 EventBridge 与外界系统对接的模块,通过抽象与配置快速获取第三方事件并将事件集成到第三方系统。并且优化现有 HTTP Sink 集成方案,为用户下游集成创造更多适配场景。 集成中心重点服务对象包括但不限于 SaaS 系统,对标 IPaaS 平台的能力提供完整的全面的通用系统集成方案。 集成源(Integration Source):指集成到 EventBridge 的第三方源; API 端点(API Destination ):指被集成到 EventBridge 的第三方 API 端点; 连接配置(Connection):是 API 端点模块的子集,与API 端点的平级资源,主要负责记录连接及配置信息,连接配置可被任意 API 端点复用。 针对市场上其他云厂商服务,EventBridge 发布了 API 端点 Sink 能力,主要作用在于承接 EventBridge 下游端数据,帮助用户快速完成下游数据集成。提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 API 端点 Sink 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 DB 产品。 具体而言,API 端点 Sink 事件目标是 EventBridge 支持的事件目标的一种,是通过 EventBridge 将数据投递至指定 Web Server 中。 API 端点 Sink 基本使用 首先现阶段 API 端点的 Sink 支持三种鉴权方式: 同时网络支持公网和专有网络(后续支持)。 1、创建 Connection 添加连接配置基本信息,并配置鉴权。 链接配置支持三种鉴权方式 : Basic 鉴权方式 : OAuth 2.0 鉴权方式: 添加授权接入点、授权请求方式、Client ID、ClientSecret 和授权相关的 Http 请求参数。 API Key 鉴权方式: 2、创建 ApiDestination API 端点配置 :配置需要访问 API 的 URL 地址和 HTTP 调用类型。 添加请求地址和请求方式: 在创建 API 端点时可以直接创建连接配置也可以选择已有的连接配置,例如上面已经创建成功的连接配置。 3、创建 Rule 创建事件规则,用于将事件投递到具体的 API 端点中。 步骤一 :点击事件规则并创建事件规则 步骤二 :是选择事件源,可以选择阿里云官方的或者选择自定义事件源,这里选择的是自定义事件源 步骤三 :第三步是选择 API 端点事件目标 支持自定义创建和使用已有,同时可以添加请求 HTTP 参数。 使用已有 使用选择已有的以后只需要添加请求 HTTP 参数即可: 选择已有的 API 端点来自于集成中心下面的 API 端点: 最佳实践 常见场景案例,比如: 用户可以把 RocketMQ 或者 RabbitMQ 的消息产品的消息动态投递到不同的 Web Server 中,这样可以让不同的 web 平台处理消息数据,实现了跨平台或者跨语言的消息流通。 用户可以把日志服务 SLS 数据投递到指定的 Web Server 或者 ELK 中,方便业务部门或者大数据平台对日志数据处理,可以更好的完善用户画像和用户行为分析,方便给用户打标签,从而可以进一步完善大数据个性化用户推荐系统。 例如下面是访问的国内外 SaaS 生态: 典型场景 :与 Buildkite 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 Buildkite 的持续集成和持续交付(CI / CD)平台进行集成。 集成产品背景描述 :Buildkite 是大型持续集成和持续交付(CI / CD)平台会有各种管理的变更、构建和作业等任务,运维人员需要快速感知、处理这些变更,以便决赛风险。 用户痛点 :构建的事件收集困难,需要手动触发构建和手动创建管道。 方案优势 :EventBridge 支持集成 Buildkite 的持续集成和持续交付平台,用户只需要简单配置即可创建和处理平台的事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建管道、创建构建和重试作业等。 文档地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与 Freshdesk 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 CRM(Freshdesk)进行集成。 集成产品背景描述 :不同的平台都需要对接 CRM(Freshdesk)管理系统。 用户痛点 :不同的平台的事件收集困难,需要用户自定义实现。 方案优势 :EventBridge 支持集成 CRM(Freshdesk)平台,用户只需要简单配置即可实现动态的创建会话、创建联系人和创建技能等事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建会话、创建联系人和创建技能等。 文档地址 : 创建 API 端点 创建事件规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与有成财务集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与有成财务进行集成 集成产品背景描述 :不同的 HR 系统或者 OA 系统需要对接有成财务时 用户痛点 :不同的系统的事件收集困难,需要用户自定义实现 方案优势 :EventBridge 支持集成有成财务平台,用户只需要简单配置即可实现动态生成报销科目和财务凭证等事件 举例介绍:比如用户想把 mns 的消息或者其他消息产品,同步到钉钉产品等接口中,或者也可以利用消息生成报销单据,可以生成报销科目和财务凭证等 地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:赵海
#行业实践 #生态集成

2023年1月6日

RocketMQ 监控告警:生产环境如何快速通过监控预警发现堆积、收发失败等问题?
本文主要向大家介绍如何利用 RocketMQ 可观测体系中的指标监控,对生产环境中典型场景:消息堆积、消息收发失败等场景配置合理的监控预警,快速发现问题,定位问题。 RocketMQ 可观测体系 作为一款典型的分布式中间件产品,RocketMQ 被广泛应用于业务核心链路中,每条消息都关联着核心业务数据的变化。业务链路有其明显的复杂性: 生产者、消费者多对多:业务调用链路网状结构,上下游梳理困难 上下游解耦、异步链路:异步化调用,信息收集不完整 消息是状态数据:未消费成功、定时中等状态增加排查的复杂度 消息链路耦合复杂的业务处理逻辑:无法快速定位问题边界 鉴于消息链路耦合业务系统,复杂带状态,RocketMQ 通过强大的可观测系统和经验支撑,及时发现问题、定位问题、解决问题有助于提升运维效率,对于业务运行是一项重要的保障能力。 RocketMQ 的可观测体系主要由指标(Metrics)、轨迹(Tracing)和日志(Logging)组成。 指标 RocketMQ中定义了详细的Metrics指标,这些指标覆盖生产者、消费者、服务端及消息收发关键接口和流程的统计数据,并支持从实例、Topic和Group等多个维度进行聚合展示,帮助您实时监控消息业务或RocketMQ服务的运行状态。和4.x版本相比,RocketMQ服务端5.x版本增加了消息堆积场景相关指标、关键接口的耗时指标、错误分布指标、存储读写流量等指标,帮助您更好地监控异常场景。 消息轨迹 在分布式应用中,RocketMQ作为全链路中异步解耦的关键服务,提供的Tracing数据可有效将业务上下游信息串联起来,帮助您更好地排查异常,定位问题。和4.x版本相比,RocketMQ服务端5.x版本支持OpenTelemetry开源标准,提供更加丰富的轨迹指标,针对消费场景、高级消息类型场景等细化轨迹内容,为问题定位提供更多关键信息。 日志 RocketMQ为不同的异常情况定义唯一的错误码及错误信息,并划分不同的错误级别,您可以根据客户端返回的错误码信息快速获取异常原因。和4.x版本相比,RocketMQ服务端5.x版本统一了ErrorCode和ErrorMessage,异常日志中增加了RequestID、资源信息,细化了错误信息,保证日志内容明确靠。 RocketMQ 监控告警介绍 RocketMQ 联合阿里云云监控提供了开箱即用且免费的监控报警服务,可帮助您解决如下问题: 实例规格水位监控预警 若您实际使用的指标值超过实例的规格限制,RocketMQ会进行强制限流。提前配置实例规格水位告警可以提前发现规格超限风险并及时升配,避免因限流导致的业务故障。 业务逻辑错误监控预警 您在消息收发时可能会收到异常报错,配置调用错误告警可以提前在业务反馈前发现异常,帮助您提前判断异常来源并及时修复。 业务性能指标监控预警 如果您的消息链路有相关性能指标要求,例如RT耗时、消息延迟等,提前配置业务指标告警可以帮助您提前治理业务风险。 RocketMQ 版提供了丰富的 Metric 指标和告警监控项。各监控项可分为运行水位、收发性能、异常错误事件三类告警。根据大量生产环境实践经验,建议您根据以下原则配置如下告警 接下来重点通过消息堆积和消息收发失败这两个典型场景来阐述基于可观测体系中的指标(Metrics),RocketMQ 如何通过云监控创建监控规则,将关键的 Metrics 指标作为告警项,帮助您自动监控服务的运行状态,并自动发送报警通知, 便于您及时预警服务的异常信息,提高运维效率。 应用场景1:消息堆积问题 消息堆积指标及监控配置 业界通用指标:使用消息堆积量(ready + inflight)来度量消费健康度,表示未处理完成的消息量;部分产品额外增加已就绪消息量来度量消息拉取的及时性;使用上述 2 个指标直接来配置报警有以下缺点: 有误报或无法触发报警的问题 及时性的间接指标,不直观 RocketMQ 指标:额外支持延时时间来度量消费健康度,涵盖了所有业务场景,根据业务容忍延迟度直接配置时间告警阈值。 消息处理延迟时间:表示业务处理完成及时度 已就绪消息排队时间:表示拉取消息及时度 建议对消息堆积敏感的用户,都在 RocketMQ 实例页的监控报警,添加如下报警指标,并设置符合业务需求的阈值。 如何定位和处理堆积问题 假如收到堆积报警,确认消息出现堆积情况,可参考以下措施进行定位和处理。 1. 判断消息堆积在 RocketMQ 服务端还是客户端 查看客户端本地日志文件 ons.log,搜索是否出现如下信息:the cached message count exceeds the threshold 出现相关日志信息,说明客户端本地缓冲队列已满,消息堆积在客户端,请执行步骤2。 若未出现相关日志,说明消息堆积不在客户端,若出现这种特殊情况,请直接提交工单联系阿里云技术支持。 2. 确认消息的消费耗时是否合理 若查看到消费耗时较长,则需要查看客户端堆栈信息排查具体业务逻辑,请执行步骤3。 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。 消息的消费耗时可以通过以下方式查看: 查看消费者状态,在客户端连接信息中查看业务处理时间,获取消费耗时的平均值。 3. 查看客户端堆栈信息。只需要关注线程名为 ConsumeMessageThread 的线程,这些都是业务消费消息的逻辑。 客户端堆栈信息可以通过以下方式获取:查看消费者状态,在客户端连接信息中查看 Java 客户端堆栈信息 使用 Jstack 工具打印堆栈信息。 常见的异常堆栈信息如下: 消费逻辑有抢锁休眠等待等情况。消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢。 示例一: 消费逻辑操作数据库等外部存储卡住。消费线程阻塞在外部的 HTTP 调用上,导致消费缓慢。 示例二: 4. 针对某些特殊业务场景,如果消息堆积已经影响到业务运行,且堆积的消息本身可以跳过不消费,您可以通过重置消费位点跳过这些堆积的消息从最新位点开始消费,快速恢复业务。 如何避免消息堆积 为了避免在业务使用时出现非预期的消息堆积和延迟问题,需要在前期设计阶段对整个业务逻辑进行完善的排查和梳理。整理出正常业务运行场景下的性能基线,才能在故障场景下迅速定位到阻塞点。其中最重要的就是梳理消息的消费耗时和消息消费的并发度。 梳理消息的消费耗时通过压测获取消息的消费耗时,并对耗时较高的操作的代码逻辑进行分析。梳理消息的消费耗时需要关注以下信息: 消息消费逻辑的计算复杂度是否过高,代码是否存在无限循环和递归等缺陷。 消息消费逻辑中的 I/O 操作(如:外部调用、读写存储等)是否是必须的,能否用本地缓存等方案规避。外部 I/O 操作通常包括如下业务逻辑: 读写外部数据库,例如 MySQL 数据库读写。 读写外部缓存等系统,例如 Redis 读写。 下游系统调用,例如 Dubbo 调用或者下游 HTTP 接口调用。 消费逻辑中的复杂耗时的操作是否可以做异步化处理,如果可以是否会造成逻辑错乱(消费完成但异步操作未完成)。 设置消息的消费并发度 逐步调大线程的单个节点的线程数,并观测节点的系统指标,得到单个节点最优的消费线程数和消息吞吐量。 得到单个节点的最优线程数和消息吞吐量后,根据上下游链路的流量峰值计算出需要设置的节点数,节点数=流量峰值/单线程消息吞吐量。 应用场景2:消息收发失败问题 消息收发的核心流程 从上图中可以看出消息收发都要先从 NameServer 返回路由,再通过 broker 的鉴权以及实例规格是否超限的判断,才能进行正常收发消息。根据经验检消息收发失败的原因有如下情况: API 请求频率是否超过实例规格限制 查网络是否正常 服务端是否是有重启造成的短期收发失败 操作资源是否有权限 常见的消息收发失败异常 在无论开发阶段还是生产运行阶段,遇到收发失败问题,我们都可以从客户端日志出发进行排查。以下列举下常见的消息收发失败异常场景: 1. 在客户端日志中出现ClusterName consumer groupId consumer topic messages flow control, flow limit threshold is , remainMs 异常信息 原因:RocketMQ 每个实例都明确了消息收发 API 调用 TPS,例如,标准版实例支持每秒 5000 次 API 调用,若实例消息收发 API 调用频率超过规格限制,会导致实例被限流。实例被限流后,导致部分消息收发请求失败。 建议措施: 1. 配置实例 API 调用频率监控告警 建议设置为规格上限的 70%。例如,您购买的实例消息收发 TPS 上限为 10000,则告警阈值建议设置为 7000。 1. 配置限流次数告警 RocketMQ 支持将指定实例触发限流的事件作为监控项,通过对限流次数的监控,可以帮助您了解当前业务的受损情况。 2. 在客户端日志中出现RemotingConnectException: connect to failed 或者 RemotingTimeoutException 等异常信息。 可能有如下原因: MQ 服务升级过程中 , 会出现短暂的网络闪断,查看官网公告看是否在服务升级窗口 检查应用服务器到broker的网络是否通畅,是否有网络延迟 检查应用的网络带宽情况,是否被打满 确认下应用是否出现 FGC 现象,FGC 会造成一定的网络延迟 3. 在客户端日志当中出现 system busy, start flow control for a while 或者 broker busy, start flow control for a while等异常信息。 可能原因:共享集群 broker(出现网络,磁盘,IO 等抖动)压力大,造成消息收发出现排队现象;若是偶尔短暂抖动,此类错误 SDK 会自动重试,但建议在自己的业务代码做好异常处理,当自动重试次数超限仍失败情况下,业务根据需要做好容灾。若长时间持续出现,可以提工单让技术人员跟进排查。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:合伯
#技术探索 #可观测

2023年1月5日

Apache RocketMQ 斩获 InfoQ 2022 年度十大开源新锐项目
以“深入数字经济·洞见技术价值”为主题的【InfoQ 2022 中国技术力量年终榜单】正式公布获奖名单。其中,Apache RocketMQ以其卓越的易用性、社区活跃性、成熟度、产品优越性、代码健康度等荣获【2022 年度十大开源新锐项目】。 作为主流的分布式消息中间件,RocketMQ于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目,持续迸发出旺盛的生命力。 伴随着云原生时代的到来以及实时计算的兴起, 生于云、长于云的 RocketMQ 5.0 应运而生,全新升级为云原生消息、事件、流融合处理平台,帮助用户更容易地构建下一代事件驱动和流处理应用。RocketMQ 5.0 专注于消息基础架构的云原生化演进,聚焦在消息领域的后处理场景,支持消息的流式处理和轻计算,帮助用户实现消息的就近计算和分析,并全面拥抱 Serverless 和 EDA。 在技术迎来重要革新的同时,回顾 Apache RocketMQ 社区这些年的成长历程。目前,全球 Apache RocketMQ Contributors  700+,促进整个社区长期和健康发展。同时,为了帮助社区开发者更好地找到感兴趣的技术方向,快速参与到社区并推动相关特性优化的快速演进,RocketMQ 还成立内核、批处理、Connect、Streaming、多语言客户端、RocketMQFlink、Operator、Exporter 等不同兴趣小组。 为更好聚集本地开发者,我们在北京、深圳、苏州等城市相继成立当地社区,定期举行线下活动,共同讨论 RocketMQ 相关的落地实践与新特性需求,大量创新从社区的各类活动中产生并且落地。除此之外,RocketMQ 还非常重视社区间的合作,先后与 Apache DolphinScheduler,Apache Hudi 等社区组织了多次联合 Meetup,在打造 RocketMQ 上下游生态的同时,也为不同社区开发者近距离讨论提供了平台。 在社区成员以及众多的开发者共同推动下,全球超过数万家企业在使用 Apache RocketMQ,这其中不仅有字节跳动、快手、小米、滴滴、同城艺龙等互联网头部企业,还有众多银行、券商、保险,基金公司等金融公司。经过多年发展,RocketMQ 已成为微服务领域业务消息首选。 本次获奖离不开全体社区成员的共同努力,是全体社区成员的共同荣誉!社区将再接再厉,不忘初心,持续促进  Apache RocketMQ 项目和社区的持续发展。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
#社区动态

2022年12月27日

RocketMQ 多语言 SDK 开源贡献召集令
目前 Apache RocketMQ 5.0 SDK [1]正在社区开源,开发与迭代也在火热进行中,欢迎广大社区的朋友们能够参与其中。我们欢迎任何形式的贡献,包括但不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学,踏出第一步,你就是 contributor!更有惊喜礼品和成为 committer 的机会等着你! 写在前面 Apache RocketMQ 是由阿里巴巴集团捐赠给 Apache 开源基金会的一款低延迟、高并发、高可用、高可靠的分布式消息中间件,并于 2017 年正式从 Apache 社区毕业,成为 Apache 顶级项目(TLP)。也是国内首个非 Hadoop 生态体系的互联网中间件顶级项目。 面向过去,RocketMQ 经过多年淘宝双十一的洗礼和考验,赢得了诸多客户的认可和青睐。面向未来,RocketMQ 历久弥新,为了更好地迎接云原生时代的来临,基于存算分离架构的 RocketMQ 5.0 应运而生。 RocketMQ 5.0 中引入了全新的无状态 Proxy 组件,在水平拓展,故障应急,多协议等方面都进行了诸多支持与改进(关于 RocketMQ 5.0 的详细介绍,欢迎关注 Rocketmq 官网[2])。同时也为接下来多语言客户端的实现打下了良好基础。 新的多语言SDK RocketMQ 5.0 客户端相比较于 4.x 的版本进行了诸多改进,会是未来社区客户端演进的主流方向。RocketMQ 4.x SDK 的多语言支持并不完美,协议的较高复杂度和语言绑定的实现细节使得多语言的支持与维护都变得棘手,而用户对多语言的诉求是强烈的。值此契机,RocketMQ 5.0 基于 gRPC 正式推出了全新的多语言 SDK。 相比较于 RocketMQ 4.x 的 SDK。RocketMQ 5.0 展现出了一副全新的面貌: 采用全新极简的,immutable 的 API 设计,使得 API 上手更简单,跨语言的对齐也变得更加简单; 完善的错误处理体系和错误码设计,开发者和用户对错误的处理可以更加得心应手; 在 PushConsumer/PullConsumer 之外新推出无状态 SimpleConsumer,实现逻辑轻量,用户可以自行管理消费侧消息的接收与应答,同时也对有更多定制化需求的客户提供了便利。 实现轻量化,代码量相比较旧有实现缩减 3/4 以上,开发和维护的成本更低; 标准化的 logging/tracing/metrics 输出,降低实现复杂度的同时,可观测性的提升会使得生产环境下的问题更容易被捕捉; gRPC 多语言特性,为 RocketMQ 5.0 客户端的多语言实现提供了支撑。RocketMQ 全新的客户端的协议层被替换,语言无关的 IDL 使得协议的维护和实现都更为极为简单。同时得益于 gRPC 强大的生态体系,使得 RocketMQ 与周边的集成也变得更为简便。 RocketMQ 5.0 中引入了新的的 pop 消费,创造性地在原生的队列模型之上支持了这种无状态的消费模式。不同于原始的更适用于流场景的队列模型,pop 机制更面向于业务消息的场景,使得开发者和用户可以只关心消息本身,可以通过「SimpleConsumer」提供单条消息级别的接受/重试/修改不可见时间以及删除等 API 能力。 Roadmap 目前 5.0 多语言 SDK 的 Java/C++ 已经有了相对比较完整的实现。 Go/C 已经提供了基础的 Producer/SimpleConsumer 的实现,其余的语言实现(PHP/Python/JavaScript/Rust 等)还在社区进行中,欢迎大家广泛参与。 对于一个从零开始的特定语言实现,一个大概的步骤如下: 部署 rocketmqnamesrv[3]和 rocketmqproxy[4] 方便与客户端进行调试,为降低部署成本,rocketmqproxy 可以采用 LOCAL 模式进行部署。 熟悉 rocketmqapis中的 IDL,适配新的 gRPC/Protobuf 协议:IDL 中描述了 5.0 SDK 中的语言无关的协议描述,通过 gRPC protoc 工具自动生成协议层代码。 应用新的 API 规范和设计:可以参考 Java 的 API 设计[5],总体指导思想是不可变性且行为明确。 实现 Producer/SimpleConsumer:Producer 提供最基本的四种不同类型消息的发送功能(普通/顺序/定时/事务),SimpleConsumer 提供基于 pop 语义的无状态消息接受/重试/修改不可见时间等能力。 统一的错误处理体系:由服务端产生的异常与错误均有完善的异常错误码和异常信息,各个语言实现需要以最适合的方式暴露给客户。 实现 PushConsumer:RocketMQ 4.x 中最为常用的消费者类型,用户侧只需要明确订阅关系和定义消息监听器行为即可,客户端实现中需要自动帮用户从远端获取消息。 客户端全方位可观测性:规范的日志输出,实现基于 OpenTelemetry/OpenCensus 的客户端 metrics 体系。 按照以上流程开发者在开发过程中出现的任何问题,都欢迎以 issue/pull request 的形式反馈到社区。 如何参与贡献 我们欢迎任何形式的贡献,包括且不限于新 feature、bugfix、代码优化、生态集成、测试工作、文档撰写。更加欢迎能够认领一个完整的特定语言实现的同学!不要犹豫,欢迎大家以 issue/pull request 的形式将你的想法反馈到社区,一起来建设更好的 RocketMQ! 相关资料 rocketmqclients: RocketMQ 5.0 多语言客户端实现 rocketmq: RocketMQ 主仓库(内置 5.0 proxy 实现) rocketmqapis: RocketMQ 5.0 协议具体定义 《RIP37: RocketMQ 全新统一 API 设计》 《RIP39: RocketMQ gRPC 协议支持》 相关链接 [1] Apache RocketMQ 5.0 SDK [2] rocketmq 官网 [3] rocketmqnamesrv [4] rocketmqproxy [5] Java 的 API 设计 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:艾阳坤
#社区动态

2022年12月21日

消息收发弹性——生产集群如何解决大促场景消息收发的弹性&降本诉求
产品介绍—什么是消息收发弹性 大家好,我是来自阿里云云原生消息团队的赖福智,花名宸罡,今天来给大家分享下阿里云 RocketMQ5.0 实例的消息弹性收发功能,并且通过该功能生产集群是如果解决大促场景消息收发的弹性以及降本诉求的。 阿里云弹性策略 本次将会从产品介绍,产品使用及限制,使用方式及演示三个方面来介绍。在介绍 Rocketmq5.0 实例的消息首发弹性之前,先从整体上看下阿里云的弹性策略。我们通常认为业务方往往存在预期外的突发业务热点和毛刺流量,常规扩容无法及时应对,这样一来服务会有不确定性的风险。因此为了应对突发流量,我们设计了一套处理机制,最基本的是要满足规格内的预期流量,然后是应对弹性区间内的突发流量可以随时开启的弹性能力,最后是要有对完全超过弹性上限流量的限流限流能力。针对弹性区间的突发流量,传统自建集群通过常规扩容方式应对,需要分钟级的处理时间,在这段时间内业务会受损,并且为了这部分偶尔的突发流量扩容到一个较大的规格并不划算。云上5.0实例的消息收发弹性能力对弹性区间内的突发流量可以做到秒级响应,针对大促这种预期内的短期突发流量可以按量收费更加实惠,仅当用户真正用到这部分弹性能力才收费。 消息收发弹性简介 接下来我们就看具体看下 5.0 实例的消息收发弹性,消息收发弹性最直观的感受就是在 5.0 实例的详情页面的自适应弹性 TPS 这部分,可以看到在正常消息收发 TPS 的旁边额外有一个自适应弹性 TPS。通过这部分弹性 TPS 的设置,用户可以快速、低成本的应对大促这种短时间突发流量的场景。 这时可能有小伙伴会问为什么我不直接升级规格提高标准收发 TPS,而是使用弹性 TPS 呢?让我们假设一个典型的大促场景,比如在今晚 0 点有大促活动,使用消息弹性功能的用户完全可以提前几天就把弹性功能打开,大促结束等流量恢复后再把弹性功能关闭,实际上不关闭也不会有什么问题,不使用则不收费。 如果通过升级规格来提升标准 TPS 应对大促流量,用户同样是提前几天就把规格升高了,那么在大促前这几天按照高规格收费但实际又跑不到高规格的 TPS,实际上花了更多的钱但是确造成了资源的浪费。如果用户为了避免资源浪费在大促当天 0 点前升级规格,一个是需要用户付出额外的精力来关注 RocketMQ 按时升配,再就是实例的升配是一个重资源操作,扩容耗时长,无法做到即开即用秒级生效,很有可能已经到 0 点了但是升配还没有完成。 使用消息弹性功能的话可以做到秒级生效开箱即用,并且如果没有使用到这部分额外的弹性 TPS 是不会收费的。但是弹性 TPS 也不是个解决问题的万能银弹,也是有上限的,基本上可以在规格标准 TPS 的基础上额外有一半的弹性 TPS,如果标准 TPS+ 弹性 TPS 仍然无法满足用户业务需求,此时意味着仅扩容弹性节点已经无法满足需求,同时需要扩容存储节点,所以需要升配规格,这部分的原理后面会详细解释。 也有用户会问,如果我的日常 TPS 在 2500 左右,可不可以购买一个 2000 标准 TPS 的实例并且一直开着 1000 的弹性 TPS 满足需求呢?这种情况我们建议直接使用标准 TPS 大于 2500 的实例,因为弹性 TPS 这部分的使用会额外计费,如果一天 24 小时都在大量使用弹性 TPS,从计费上来说直接使用更高规格的实例更实惠。 5.0 实例消息收发弹性的实现方式,和传统自建方式的对比 接下来我们看下阿里云 RocketMQ5.0 实例是怎么实现消息收发弹性的,并且在扩容场景和自建 RocketMQ 有什么优势。传统自建 RocketMQ 集群如左图显示,是一个存储计算不分离的架构,这种架构下 Broker 是一个很重的组件,因为它同时需要处理客户端的请求,也要负责数据的读取写入,Broker 同时负责计算和存储。作为一个有状态的节点,Broker 扩容是一个很重的操作,时间会很慢,而且在很多时候我们并不需要扩容存储能力,仅仅需要应对高 TPS 请求的计算能力,此时随着 Broker 扩容的存储扩容实际上被浪费了。 再来看下 RocketMQ5.0 实例消息收发弹性是怎么做的,首先 5.0 实例的架构是存储计算分离的模式,用户的客户端仅会请求计算层的计算节点,计算节点操作存储节点读写消息,客户端并不会直接访问存储节点。开启消息收发弹性功能意味着开启了计算层的弹性能力。得益于这种存储计算分离的架构,可以让我们快速低成本的扩容计算层节点,计算层节点作为无状态应用可以做到秒级扩容,十分便捷。而且在云厂商拥有大量资源池的前提下可以做到资源的弹性扩容。可以说 RocketMQ5.0 实例的消息收发弹性能力依赖于阿里云作为云厂商的弹性能力和存算分离的技术方案得以实现。 在大促这种短时间大流量的场景下,大部分都是不需要扩容存储节点的,此时就可以通过开通消息收发弹性的能力满足需求。 产品使用及限制:消息收发弹性的使用及限制 支持版本 消息收发弹性的功能仅在专业版和铂金版支持,标准版实例不支持,并且专业版的单机规格作为给用户使用的测试版本也不支持。 弹性上限 不同规格实例的弹性 TPS 上限不同,基本上在标准 TPS 的基础上额外有一半的弹性 TPS,下图所示为专业版的弹性 TPS 上限。受篇幅所限,其他规格的弹性上限可以参考官方文档,就不再列出了。 计费方式 弹性 TPS 是额外计费的,计费周期按小时计费,不足 1 小时,按 1 小时计算。计费方式为超过限制的 TPS× 使用时长(小时)× 弹性规格单价(元/TPS/小时)弹性规格单价如下图,不同地域的单价会有略微差异。 SLA 可能有小伙伴会担心使用到这部分额外的弹性 TPS 会不会有问题,毕竟这部分是在标准 TPS 之上额外的能力,有一种自己实例超负荷运转的感觉。这个是完全不用担心的,不同规格的弹性上限已经经过压测验证,和规格标准 TPS 享受一样的稳定性 SLA 保证。 使用方式及演示:结合业务场景的最佳使用方式 开启方式、生效时间、收发比例 最后我们来实际操作下开启弹性收发能力并且验证该功能。RocketMQ5.0实例依然支持使用 RocketMQ4.0 实例的 1.x 客户端访问,所以这里分别提供了 1.x 客户端和 5.x 客户端的测试代码实例。 该程序开启了 200 个线程的线程池通过 ratelimiter 根据输入参数设置每秒最大的发送消息条数,打印失败的原因,并且每秒统计成功发送的消息量 在这里我已经提前购买好了一个专业版的实例,默认是不会开启消息收发弹性能力的。我们可以点击这里的开启弹性按钮进入实例修改页面开启弹性功能。这里要注意的是开启之后的弹性 TPS 依然受实例整体的消息收发占比设置,用户可以根据自己的消息收发场景设置该比例。 再开启之前我们来尝试下每秒发送 2300 个消息会怎么样,可以看到已经被限流了,并且每秒成功发送的量要比 2000 多一些。接着我们将弹性开启,并且将默认的收发比 1:1 改为 4:5,这是修改后的实例状态,现在让我们继续每秒发送 2300 个消息来验证下,可以看到已经都成功发送了。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:宸罡
#技术探索

2022年12月14日

事件总线 + 函数计算构建云上最佳事件驱动架构应用
距离阿里云事件总线(EventBridge)和 Serverless 函数计算(Function Compute,FC)宣布全面深度集成已经过去一年,站在系统元数据互通,产品深度集成的肩膀上,这一年我们又走过了哪些历程?从事件总线到事件流,从基于 CloudEvents 的事件总线触发到更具个性化的事件流触发,函数计算已成为事件总线生态不可或缺的重要组成部分,承载了 EventBridge 系统架构中越来越多的角色,事件流基础架构的函数 Transform,基于函数计算的多种下游 Sink Connector 投递目标支持,函数作为 EventBridge 端点 API Destination;基于事件总线统一,标准的事件通道能力,和基于函数计算敏捷、轻量、弹性的计算能力,我们将又一次起航探索云上事件驱动架构的最佳实践。 今天的主题围绕事件总线+函数计算,构建云上最佳事件驱动架构应用。希望通过今天的分享,能够帮助大家深入理解 Serverless 函数计算、EventBridge 事件总线对于构建云上事件驱动架构应用的价值和背后的逻辑、 为什么函数计算是云上事件驱动服务最佳实践?为什么我们如此需要事件总线服务?伴随着这些谜题的解开,最后,让我们一起了解应用于实际生产的一些 Serverless 事件驱动客户案例。 事件驱动架构的本质 Back to the Nature of EventDriven 大家可能会疑惑,事件驱动家喻户晓,为什么我们又要重新讨论事件驱动呢?我想这也正是我们需要讨论它的原因,回归本质,重新起航;事件驱动可能是一个比较宽泛的概念,而本文聚焦事件驱动架构的讨论,事件驱动架构作为一种软件设计模式,的确不是一个新的概念,伴随着计算机软件架构的演进,它已经存在了一段很久的时间,大家对它的讨论也从未停止过,当我们需要重新讨论一个已经存在的概念的时候,我想我们有必要重新回到它最开始的定义,一起探索那些本质的东西,重新认识它。 上面的这些内容是我从相关的一些资料上摘录的关于事件驱动的一些描述,“abstract”,“simple”,“asynchronous”,“messagedriven”这些具有代表性的词汇很好的给予事件驱动一个宏观的描述;从事件驱动的抽象概念,到它简洁的架构,以及事件驱动架构要达成的目的,和它在实际的系统架构中所展现的形态。 事件驱动架构基本概念及形态 在了解了关于事件驱动架构的一些基本描述之后,我们需要进一步明确事件驱动架构所涉及的一些基本概念和架构形态。根据维基百科描述,事件驱动架构涉及的核心概念如下所示: 围绕事件的流转,根据事件驱动架构的概念和基本形态,主要涉及以下四个核心部分: Event Producer:负责产生事件,并将产生的事件投递到事件通道; Event Channel:负责接收事件,并将接收的事件持久化存储,投递给订阅该事件的后端处理引擎; Event Processing Engine:负责对于订阅的事件做出响应和处理,根据事件更新系统状态; Downstream eventdriven activity:事件处理完成之后,对于事件处理响应的一种展示; 事件驱动架构要达成的目的 了解了事件驱动架构的基本形态,架构中事件通道的引入,解耦了事件生产和事件处理这两个最基本的系统角色,那么这样的架构模型所要达成的最终目的到底是什么? 系统架构松耦合 事件生产者与事件订阅者在逻辑上是分开的。事件的生成与使用的分离意味着服务具有互操作性,但可以独立扩缩、更新和部署。 只面向事件的松散耦合可以减少系统依赖项,并允许您以不同的语言和框架实现服务。您无需更改任何一个服务的逻辑,即可添加或移除事件生成方和接收方。您无需编写自定义代码来轮询、过滤和路由事件。 系统的可伸缩性 基于事件驱动架构的松耦合特性,意味着可以独立对事件生产者,事件通道服务,以及事件处理引擎进行独立的扩缩容;尤其对于后端事件处理引擎,可以根据消息处理响应 SLA 和后端资源供给进行弹性扩缩容;同时可以基于事件粒度构建不同规格的后端处理服务,实现更细粒度的系统弹性伸缩。 系统的可扩展性 系统的可扩展性,主要表现在当系统需要增加新的功能,如何快速的基于现有系统架构快速构建支持新的业务逻辑,在事件驱动架构应用中,围绕事件粒度的处理模式,能够天然快速支持增加新的基于事件的数据流抽象;当系统中增加新的事件类型的时候,无需调整变更发布整个系统,只需要关注需要订阅的事件进行事件处理逻辑的开发和部署即可,也可以基于原来的系统做很少的代码变更即可实现,也可以在业务初期通过独立的服务定于指定事件完成特定的业务逻辑支持。 为什么函数计算是云上事件驱动服务最佳实践? 在讨论完事件驱动架构基本模型之后,我想关于事件驱动的概念,形态我们有了统一的认识和理解,接下来我们进入议题的第二个部分,为什么函数计算是云上事件驱动服务最佳实践? 函数计算简介 函数计算是一款基于事件驱动的全托管计算服务,相关的产品细节可以见官网介绍。作为一款通用的事件驱动型计算服务,接下来我会从三个方面进行详细的介绍。 编程范式 使用函数计算,用户无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为你准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等开箱即用功能,编程范式带来开发的敏捷性。按照函数粒度进行独立的功能单元开发,快速调试,快速的部署上线,省去了大量资源购买,环境搭建的运维工作;同时函数计算是一个事件驱动的模型,事件驱动,意味着用户不需要关注服务产品数据传递的问题,省去了在编写代码中涉及的大量服务访问连接的逻辑;“事件驱动” + “函数粒度开发” + “免服务器运维”等几个维度特征帮助函数计算支撑“聚焦业务逻辑敏捷开发”的底层逻辑。 计算模型 除了开发模式带来的研发效能提升之外,函数计算提供非常细粒度的计算资源和毫秒级计费模型,支撑按需计算,按量收费;能够支持按用户的请求,根据用户流量的模型为计算付费;当然按用户请求付费存在技术上巨大的挑战,要求函数计算实例的启动小于用户的 RT 要求,冷启动性能尤为重要,这时候极致弹性成为了 Serverless 按需付费,业务降本的底层技术支撑。函数计算通过“极致弹性” + “按需付费”的模型帮助 Serverless 函数计算实现真正的按需计算逻辑。 事件驱动 在基于云的开发环境,云产品承载的服务相对内聚,各自扮演着分布式系统架构中的各个重要角色,云产品之间的事件触发机制能够帮助客户更好的基于多个云产品构建自己的业务系统;否则在云产品之间 Watch 事件是非常复杂,开发代价非常昂贵的一件事;除了产品连接带来的开发效率之外,当用户订阅某个事件,并提供处理逻辑的时候,客户已经潜在的过滤掉了不需要处理的事件请求,事件驱动意味着每一次的事件触发请求都是一次完全有效的计算。 函数计算对于事件驱动架构的价值 为什么函数计算是云上最佳的事件驱动架构服务?函数计算对于事件驱动架构的核心价值到底是什么?事件驱动架构一直存在,在没有函数计算的时候,同样也有事件驱动架构,在微服务的时候也同样有事件驱动架构。如今,当我们重新再来讨论事件驱动架构的时候,到底是什么发生了变化,有哪些本质的区别?在整个事件驱动架构中,函数计算最大的价值在于帮助构建 “Event Processing Engine” 这个角色,我想主要是以下两个方面发生了本质的变化: 系统可扩展性价值 开发模式发生了本质的变化:函数计算提供的框架能力及编程模型,最大化的消除了客户业务逻辑之外的处理内容,极大的加速了客户业务开发,同时基于这样这样的开发模式,用户对于新增事件处理逻辑能够在最短的时间完成处理并上线,细粒度,专注业务的敏捷开发模式能够加速业务快速上线。 系统可伸缩性价值 计算模式发生了本质的变化:基于事件驱动架构事件粒度的处理逻辑和函数计算更细粒度力度计算弹性能力,能够从多个维度实现 “Event Processing Engine” 组件的弹性能力, 这我想这也是函数计算对于事件驱动架构的一个最核心价值。 为什么我们如此需要事件总线服务? 构建云上事件驱动架构挑战 函数计算以其轻量,快捷,能够利用事件驱动的方式与其他云产品进行联动的特点, 成为很多客户利用事件驱动架构构建业务系统的首选,随着业务及客户需求的不断增加,客户对于函数计算和更多云产品及服务的连接需求变得越来越多,同时对于其他云产品的客户而言, 也希望能够利用 Serverless 函数计算的特点帮助处理一些系统任务和事件。 尽管函数计算和云上的众多云产品进行了集成,提供了一些开箱即用的事件触发能力,那么我们为什么还需要事件总线服务来构建事件驱动应用架构呢?围绕函数计算构建事件驱动架构生态的过程中,我们面临主要来自三个方面的挑战。面对这些挑战,基于函数计算和事件总线帮助云上客户构建完备的事件驱动架构生态迫在眉睫。 事件源多样性挑战 事件驱动作为函数计算产品核心竞争力,打通函数计算和其它云产品,以及用户自定义应用,SaaS 服务的连通成为函数计算生态集成的迫切需求,但系统集成,生态建设从来都不是一件容易的事情。函数计算系统在和 EventBridge 集成之前,已经和 OSS,SLS 等用户典型场景的云产品进行了集成,也和阿里云的其它大概十多款产品进行了集成,不同系统具有不同的事件格式,不同系统的注册通知机制也各不相同,以及上游不同系统的失败处理机制也各不相同;部分系统支持同步的调用方式,部分系统支持异步的调用方式,调用方式的差异主要取决于上游系统在接入函数计算的时候当时面临的产品业务场景,对于新的产品能力和业务场景的扩展支持,在当时并未有太多的考虑。随着和更多云产品的集成,集成的投入,集成的困难度和底层数据管理难度越来越大。面对多种事件源集成的客观困难,函数计算急需提高和其他云产品的集成效率。 授权复杂及安全隐患 除此之外, 函数计算希望提升用户体验,保证用户只关心事件的处理;同时希望能够在面对大量的云产品时保证系统授权层面的复杂度。用户在使用事件触发的时候, 需要了解不同产品接入函数计算的权限要求,针对不同的产品需要提供不同的授权策略,对于客户使用函数计算带来了非常大的困难,为了加速产品接入,大量用户经常使用FullAcees权限,造成较大产品安全隐患, 和其它云产品的集成急需统一的授权界面,统一用户体验。 通用能力难以沉淀 面对上游不同的事件源, 如何更好的投递事件、更好的消费事件?如何进行事件的错误处理?函数计算调用方式如何选择?以及函数计算后端错误 Backpressure 能力的反馈、重试策略和上游系统参数设置、触发器数量的限制等问题获成为函数计算事件触发不得不面对的问题。为了更好的服务客户,提供可靠的消费处理能力,函数计算希望能够有一个统一的接入层,基于统一的接入层进行消费能力和流控能力的建设。通过沉淀在这样一个标准的层面,在保证调用灵活性的同时,提供可靠的服务质量。 事件总线简介 阿里云事件总线(EventBridge) 是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务 (SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件; 总线模式(EventBus) 从整个架构来看,EventBridge 通过事件总线,事件规则将事件源和事件目标进行连接。首先,让我们快速普及下 EventBridge 架构中涉及的几个核心概念: 事件:状态变化的记录; 事件源:事件的来源,事件的产生者,产生事件的系统和服务, 事件源生产事件并将其发布到事件总线; 事件总线:负责接收来自事件源的事件;EventBridge 支持两种类型的事件总线: 云服务专用事件总线:无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件。 自定义事件总线:标准存储态总线,用于接收自定义应用或存量消息数据的事件,一般事件驱动可选该总线。 事件规则:用于过滤,转化事件,帮助更好的投递事件; 事件目标:事件的消费者,负责具体事件的处理。 通过上面的流程,完成了事件的产生,事件的投递,事件的处理整个过程。当然事件并不是一个新的概念,事件驱动架构也不是一个新的概念,事件在我们的系统中无处不在,事件驱动架构同样伴随着整个计算机的架构演进,不断地被讨论。对于 EventBridge,采用云原生事件标准 CloudEvents 来描述事件;带来事件的标准化,这样的标准化和事件标准的开放性带来一个最显著的优势:接入的标准化,无论是对于事件源还是事件目标。 事件流模式(EventStreaming) 消息产品凭借其异步解耦、削峰填谷的特点,成为了互联网分布式架构的必要组成部分,Serverless 函数计算有着与其完全吻合的应用场景,针对消息产品生态集成,函数计算在架构层面做了专门的建设,基于 EventBridge 产品提供的 EventStreaming 通道能力建设了通用的消息消费服务 Poller Service,基于该架构对用户提供了 RocketMQ,Kafka,RabbitMQ,MNS 等多个消息类型触发能力。 将消费的逻辑服务化,从业务逻辑中剥离由平台提供,消费逻辑和处理逻辑的分离,将传统架构的消息拉模型转化成 Serverless 化的事件驱动推模型,能够支撑由函数计算承载消息处理的计算逻辑 ,实现消息处理的 Serverless 化。基于这样的架构,能够帮助客户解决消息客户端的集成连接问题,简化消息处理逻辑的实现,同时对于波峰波谷的业务模型,结合函数计算提供细粒度的计算弹性能力,能够实现资源的动态扩容,降低用户成本。 事件总线对于事件驱动架构的价值 简化统一事件源接入 沉淀通用事件通道能力 提升优化用户集成体验 利用函数计算提供的 HTTP 函数 URL 能力,结合事件总线端点 API 能力,能够快速的帮助客户进行系统扩展和集成。 客户场景案例分享 总线模式 + 函数计算用户案例 利用 ActionTrail 事件触发函数进行多账号审计管理 利用 OSS 文件上传事件触发函数扩容 ACK  集群资源 利用 OSS 文件上传执行 Terraform 文件并访问外部 API 做结果通知 事件流模式 + 函数计算用户案例 利用函数计算细粒度资源弹性特征,结合业务波峰波谷的特点,实现快速的消息清洗和处理。 事件流触发函数计算处理业务消息 事件流触发函数计算进行简单 ETL 数据同步 事件流触发函数进行简单 ETL 数据清洗入库 函数异步+事件流触发函数构建电商运营通知系统 在购物车加购,商品变更通知场景,利用函数计算异步系统(内部自带 Queue 能力),触发大量运营通知,利用函数异步的 Destination 能力将运营通知结果写入 MQ,然后利用事件流能力对 MQ 数据进行再次处理,写入HBase数据库中。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:世如
#行业实践 #事件驱动架构 #云原生

2022年11月30日

RocketMQ 5.0 可观测能力升级:Metrics 指标分析
从消息的生命周期看可观测能力 在进入主题之前先来看一下 RocketMQ 生产者、消费者和服务端交互的流程: message produce and consume process RocketMQ 的消息是按照队列的方式分区有序储存的,这种队列模型使得生产者、消费者和读写队列都是多对多的映射关系,彼此之间可以无限水平扩展。对比传统的消息队列如 RabbitMQ 是很大的优势,尤其是在流式处理场景下能够保证同一队列的消息被相同的消费者处理,对于批量处理、聚合处理更友好。 接下来我们来看一下消息的整个生命周期中需要关注的重要节点: message life cycle 首先是消息发送:发送耗时是指一条消息从生产者开始发送到服务端接收到并储存在硬盘上的时间。如果是定时消息,需要到达指定的定时时间才能被消费者可见。 服务端收到消息后需要根据消息类型进行处理,对于定时/事务消息只有到了定时时间/事务提交才对消费者可见。RocketMQ 提供了消息堆积的特性,即消息发送到服务端后并不一定立即被拉取,可以按照客户端的消费能力进行投递。 从消费者的角度上看,有三个需要关注的阶段: 拉取消息:消息从开始拉取到抵达客户端的网络和服务端处理耗时; 消息排队:等待处理资源,即从消息抵达客户端到开始处理消息; 消息消费:从开始处理消息到最后提交位点/返回 ACK。 消息在生命周期的任何一个阶段,都可以清晰地被定义并且被观测到,这就是 RocketMQ 可观测的核心理念。而本文要介绍的 Metrics 就践行了这种理念,提供覆盖消息生命周期各个阶段的监控埋点。借助 Metrics 提供的原子能力我们可以搭建适合业务需要的监控系统: 日常巡检与监控预警; 宏观趋势/集群容量分析; 故障问题诊断。 RocketMQ 4.x Metrics 实现 – Exporter RocketMQ 团队贡献的 RocketMQ exporter 已被 Prometheus 官方的开源 Exporter 生态所收录,提供了 Broker、Producer、Consumer 各个阶段丰富的监控指标。 exporter metrics spec Exporter 原理解析 RocketMQ expoter 获取监控指标的流程如下图所示,Expoter 通过 MQAdminExt 向 RocketMQ 集群请求数据。获取的数据转换成 Prometheus 需要的格式,然后通过 /metics 接口暴露出来。 rocketmq exporter 随着 RocketMQ 的演进,exporter 模式逐渐暴露出一些缺陷: 无法支持 RocketMQ 5.x 中新加入的 Proxy 等模块的可观测需求; 指标定义不符合开源规范,难以和其他开源可观测组件搭配使用; 大量 RPC 调用给 Broker 带来额外的压力; 拓展性差,增加/修改指标需要先修改 Broker 的 admin 接口。 为解决以上问题,RocketMQ 社区决定拥抱社区标准,在 RocketMQ 5.x 中推出了基于 OpenTelemtry 的 Metrics 方案。 RocketMQ 5.x 原生 Metrics 实现 基于 OpenTelemtry 的 Metrics OpenTelemetry 是 CNCF 的一个可观测性项目,旨在提供可观测性领域的标准化方案,解决观测数据的数据模型、采集、处理、导出等的标准化问题,提供与三方 vendor 无关的服务。 在讨论新的 Metrics 方案时 RocketMQ 社区决定遵守 OpenTelemetry 规范,完全重新设计新 metrics 的指标定义:数据类型选用兼容 Promethues 的 Counter、Guage、Histogram,并且遵循 Promethues 推荐的指标命名规范,不兼容旧有的 rocketmqexporter 指标。新指标覆盖 broker、proxy、producer、consumer 等各个 module,对消息生命周期的全阶段提供监控能力。 指标上报方式 我们提供了三种指标上报的方式: Pull 模式:适合自运维 K8s 和 Promethues 集群的用户; Push 模式:适合希望对 metrics 数据做后处理或接入云厂商的可观测服务的用户; Exporter 兼容模式:适合已经在使用 Exporter 和有跨数据中心(或其他网络隔离环境)传输 metrics 数据需求的用户。 Pull Pull 模式旨在与 Prometheus 兼容。在 K8s 部署环境中无需部署额外的组件,prometheus 可以通过社区提供的 K8s 服务发现机制(创建 PodMonitor、ServiceMonitor CDR)自动获取要拉取的 broker/proxy 列表,并从他们提供的 endpoint 中拉取 metrics 数据。 pull mode Push OpenTelemetry 推荐使用 Push 模式,这意味着它需要部署一个 collector 来传输指标数据。 push mode OpenTelemetry 官方提供了 collector 的实现,支持对指标做自定义操作如过滤、富化,可以利用社区提供的插件实现自己的 collector。并且云厂商提供的可观测服务(如 AWS CloudWatch、阿里云 SLS)大多已经拥抱了 OpenTelemetry 社区,可以直接将数据推送到它们提供的 collector 中,无需额外的组件进行桥接。 OpenTelemetry collector 兼容 RocketMQ Exporter 新的 Metrics 也提供对 RocketMQ Exporter 的兼容,现在使用 exporter 的用户无需变更部署架构即可接入新 Metrics。而且控制面应用(如 Promethues)和数据面应用(如 RocketMQ)有可能隔离部署。因此借助 Exporter 作为代理来获取新的 Metrics 数据也不失为一种好的选择。 RocketMQ 社区在 Exporter 中嵌入了一个 OpenTelemetry collector 实现,Broker 将 Metrics 数据导出到 Exporter,Exporter 提供了一个新的 endpoint(下图中的 metricsv2)供 Prometheus 拉取。 exporter mode 构建监控体系最佳实践 丰富的指标覆盖与对社区标准的遵循使得可以轻而易举的借助 RocketMQ 的 Metrics 能力构建出适合业务需求的监控体系,这个章节主要以一个典型的流程介绍构建监控体系的最佳实践: 集群监控/巡检 触发告警 排查分析。 集群状态监控与巡检 我们将指标采集到 Promethues 后就可以基于这些指标配置监控,这里给出一些示例: 接口监控: 监控接口调用情况,可以据此快速抓出异常的请求对症下药 下图给出一些相关示例:所有 RPC 的耗时(avg、pt90、pt99 等)、成功率、失败原因、接口调用与返回值分布情况等。 rpc metrics 客户端监控: 监控客户端的使用情况,发现非预期的客户端使用如超大消息发送、客户端上下线、客户端版本治理等。 下图给出一些相关示例:客户端连接数、客户端语言/版本分布、发送的消息大小/类型分布。 client metrics Broker 监控: 监控 Broker 的水位和服务质量,及时发现集群容量瓶颈。 下图给出一些相关示例:Dispatch 延迟、消息保留时间、线程池排队、消息堆积情况。 broker metrics 以上的示例只是 Metrics 的冰山一角,需要根据业务需要灵活组合不同的指标配置监控与巡检。 告警配置 有了完善的监控就可以对需要关注的指标配置告警,比如可以配置 Broker 监控中 Dispatch 延迟这个指标的告警: broker alert 收到告警后可以联动监控查看具体原因,关联发送接口的失败率可以发现有 1.7% 的消费发送失败,对应的报错是没有创建订阅组: promblem analysis 问题排查分析 最后以消息堆积这个场景为例来看一下如何基于 Metrics 分析线上问题。 从消息生命周期看堆积问题 正如本文开篇所述,排查 RocketMQ 的问题需要结合消息的生命周期综合分析,如果片面的认定是服务端/客户端的故障未免会误入歧途。 对于堆积问题,我们主要关注消息生命周期中的两个阶段: 就绪消息:就绪消息是可供消费但还未被拉取的消息,即在服务端堆积的消息; 处理中消息:处理中的消息是被客户端拉取但是还未被消费的消息。 consume lag 多维度指标分析堆积问题 对于堆积问题,RocketMQ 提供了消费延迟相关指标 rocketmq_consumer_lag_latency 可以基于这个指标配置告警。告警的阈值需要根据当前业务对消费延迟的容忍程度灵活指定。 触发告警后就需要对消息堆积在还是就绪消息和处理中消息进行分析,RocketMQ 提供了 rocketmq_consumer_ready_messages 和 rocketmq_consumer_inflight_messages 这两个指标,结合其他消费相关指标与客户端配置综合分析即可判断出消息堆积的根因: case 1:就绪消息持续上涨,处理中消息达到客户端堆积上限 这是最常见的堆积场景,客户端处理中的消息量 rocketmq_consumer_inflight_messages 达到了客户端配置的阈值,即消费者的消费能力低于消息发送量。如果业务要求尽可能实时的消费消息就需要增加消费者机器数量,如果业务对消息延迟不是很敏感可以等待业务高峰过去后再消化堆积的消息。 case 2:就绪消息几乎为 0,处理中消息持续上涨 这个 case 多出现在使用 RocketMQ 4.x 客户端的场景,此时消费位点是顺序提交的,如果某条消息的消费卡住会导致位点无法提交。看起来的现象是消息在客户端大量堆积,即处理中消息持续上涨。可以结合消费轨迹和 rocketmq_process_time 这个指标抓出消费慢的消息分析上下游链路,找到根因优化消费逻辑。 case 3: 就绪消息持续上涨,处理中消息几乎为 0 此种场景说明客户端没有拉取到消息,一般有如下几种情况: 鉴权问题:检查 ACL 配置,如果使用公有云产品请检查 AK、SK 配置; 消费者 hang 住:尝试打印线程堆栈或 gc 信息判断是否是进程卡死; 服务端响应慢:结合 RPC 相关指标查看拉取消息接口调用量与耗时、硬盘读写延迟。检查是否为服务端问题,如硬盘 IOPS 被打满了等等。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:玄珏
#技术探索 #可观测