2021年4月22日

使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
导读:本文将 rocktmqspringboot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 springbootstarter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 springbootstarter 工具包来配置,发送和消费 RocketMQ 消息。 在 Spring 生态中玩转 RocketMQ 系列文章: 本文配套可交互教程已登录阿里云知行动手实验室,PC 端登录 在浏览器中立即体验。 通过本文,您将了解到: Spring 的消息框架介绍 rocketmqspringboot 具体实现 使用示例 前言 上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。 随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布,它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 springbootwebstarter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可以使用约定的配置并且简化部署,受到越来越多的开发者的欢迎。 Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成: 其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。 为了利用 Spring Boot 的快速开发和让用户能够更灵活地使用 RocketMQ 消息客户端,Apache RocketMQ 社区推出了 springbootstarter 实现。随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布,近期升级了相关的 springboot 代码,通过注解方式支持分布式事务的回查和事务消息的发送。 本文将对当前的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 springbootstarter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 springbootstarter 工具包来配置,发送和消费 RocketMQ 消息。 Spring 中的消息框架 顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。 1. Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。 单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 如果有兴趣深入的了解 Spring Messaging 及针对不同的消息产品的使用,推荐阅读这个文件。参考 Spring Messaging 的既有实现,RocketMQ 的 springbootstarter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特点提供了相应的 API(如顺序、异步和事务半消息等)。 2. Spring Cloud Stream Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下: 该图片引自 spring cloud stream Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的Binder。 这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。 Binder SPI 的方式来让消息中间件产品使用可扩展的 API 来编写相应的 Binder,并集成到 Spring Cloud Steam 环境,目前 RocketMQ 还没有提供相关的 Binder,我们计划在下一步将完善这一功能,也希望社区里有这方面经验的同学积极尝试,贡献 PR 或建议。 springbootstarter的实现 在开始的时候我们已经知道,spring boot starter 构造的启动器对于使用者是非常方便的,使用者只要在 pom.xml引入starter 的依赖定义,相应的编译,运行和部署功能就全部自动引入。因此常用的开源组件都会为 Spring 的用户提供一个 springbootstarter 封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下 RocketMQ(客户端)的 starter 实现过程。 1. springbootstarter 的实现步骤 对于一个 springbootstarter 实现需要包含如下几个部分: 1)在 pom.xml 的定义 定义最终要生成的 starter 组件信息 org.apache.rocketmq springbootstarterrocketmq 1.0.0SNAPSHOT 定义依赖包 它分为两个部分:Spring 自身的依赖包和 RocketMQ 的依赖包。 2)配置文件类 定义应用属性配置文件类 RocketMQProperties,这个 Bean 定义一组默认的属性值。用户在使用最终的 starter 时,可以根据这个类定义的属性来修改取值,当然不是直接修改这个类的配置,而是 springboot 应用中对应的配置文件:src/main/resources/application.properties。 3)定义自动加载类 定义 src/resources/METAINF/spring.factories 文件中的自动加载类, 其目的是让 spring boot 更具文中中所指定的自动化配置类来自动初始化相关的 Bean、Component 或 Service,它的内容如下: org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration 在 RocketMQAutoConfiguration 类的具体实现中,定义开放给用户直接使用的 Bean 对象包括: RocketMQProperties 加载应用属性配置文件的处理类; RocketMQTemplate 发送端用户发送消息的发送模板类; ListenerContainerConfiguration 容器 Bean 负责发现和注册消费端消费实现接口类,这个类要求:由 @RocketMQMessageListener 注解标注;实现 RocketMQListener 泛化接口。 4)最后具体地进行 RpcketMQ 相关的封装 在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对 Spring Messaging 接口的兼容方式。 2. 消息发送端实现 1)普通发送端 发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调用关系图: 为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如异步,单向和顺序等方法直接添加到 RoketMQTempalte 中,这些方法直接代理调用到 RocketMQ 的 Producer API 来进行消息的发送。 2)事务消息发送端 对于事务消息的处理,在消息发送端进行了部分的扩展,参考上面的调用关系类图。 RocketMQTemplate 里加入了一个发送事务消息的方法 sendMessageInTransaction(),并且最终这个方法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后能够对 TransactionListener 里的方法实现进行调用。 3. 消息消费端实现 在消费端 SpringBoot 应用启动后,会扫描所有包含 @RocketMQMessageListener 注解的类(这些类需要集成 RocketMQListener 接口,并实现 onMessage()方法),这个 Listener 会一对一的被放置到。 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ Consumer 对象,启动并监听定制的 Topic 消息,如果有消费消息,则回调到 Listener 的 onMessage() 方法。 使用示例 上面的一章介绍了 RocketMQ 在 springbootstarter 方式的实现,这里通过一个最简单的消息发送和消费的例子来介绍如何使这个 rocketmqspringbootstarter。 1. RocketMQ 服务端的准备 1)启动 NameServer 和 Broker 要验证 RocketMQ 的 SpringBoot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。 2)创建实例中所需要的 Topics 在执行启动命令的目录下执行下面的命令行操作: bash bin/mqadmin updateTopic c DefaultCluster t stringtopic 2. 编译 rocketmqspringbootstarter 目前的 springbootstarter 依赖还没有提交的 Maven 的中心库,用户使用前需要自行下载 git 源码,然后执行 mvn clean install 安装到本地仓库。 git clone https://github.com/apache/rocketmqexternals.git cd rocketmqspringbootstarter mvn clean install 3. 编写客户端代码 用户如果使用它,需要在消息的发布和消费客户端的 maven 配置文件 pom.xml 中添加如下的依赖: 属性 springbootstarterrocketmqversion 的取值为:1.0.0SNAPSHOT, 这与上一步骤中执行安装到本地仓库的版本一致。 1)消息发送端的代码 发送端的配置文件 application.properties: 发送端的 Java 代码: 2)消息消费端代码 消费端的配置文件 application.properties: 消费端的 Java 代码: 这里只是简单的介绍了使用 springboot 来编写最基本的消息发送和接收的代码,如果需要了解更多的调用方式,如: 异步发送,对象消息体,指定 tag 标签以及指定事务消息,请参看 github 的说明文档和详细的代码。我们后续还会对这些高级功能进行陆续的介绍。 作者简介 辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对 Microservice、Messaging 和 Storage 等领域有深刻理解, 目前专注 RocketMQ 内核优化以及 Messaging 生态建设。 在 PC 端登录 start.aliyun.com 知行动手实验室,沉浸式体验在线交互教程。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:辽天
#技术探索 #微服务

2021年4月6日

基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台
导读:本文将对 RocketMQExporter 的设计实现做一个简单的介绍,读者可通过本文了解到 RocketMQExporter 的实现过程,以及通过 RocketMQExporter 来搭建自己的 RocketMQ 监控系统。RocketMQ 在线可交互教程现已登录知行动手实验室,PC 端登录 start.aliyun.com 即可直达。 RocketMQ 云原生系列文章: (本文) RocketMQExporter 项目的 GitHub 地址: 文章主要内容包含以下几个方面: 1. RocketMQ 介绍 2. Prometheus 简介 3. RocketMQExporter 的具体实现 4. RocketMQExporter 的监控指标和告警指标 5. RocketMQExporter 使用示例 RocketMQ 介绍 RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。简单的来说,它由 Broker 服务器和客户端两部分组成,其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。 正由于它具有高性能、高可靠性和高实时性的特点,与其他协议组件在 MQTT 等各种消息场景中的结合也越来越多,应用越来越广泛。而对于这样一个强大的消息中间件平台,在实际使用的时候还缺少一个监控管理平台。 当前在开源界,使用最广泛监控解决方案的就是 Prometheus。与其它传统监控系统相比较,Prometheus 具有易于管理,监控服务的内部运行状态,强大的数据模型,强大的查询语言 PromQL,高效的数据处理,可扩展,易于集成,可视化,开放性等优点。并且借助于 Prometheus 可以很快速的构建出一个能够监控 RocketMQ 的监控平台。 Prometheus 简介 下图展示了 Prometheus 的基本架构: 1. Prometheus Server Prometheus Server 是 Prometheus 组件中的核心部分,负责实现对监控数据的获取,存储以及查询。Prometheus Server 可以通过静态配置管理监控目标,也可以配合使用 Service Discovery 的方式动态管理监控目标,并从这些监控目标中获取数据。其次 Prometheus Server 需要对采集到的监控数据进行存储,Prometheus Server 本身就是一个时序数据库,将采集到的监控数据按照时间序列的方式存储在本地磁盘当中。最后 Prometheus Server 对外提供了自定义的 PromQL 语言,实现对数据的查询以及分析。 2. Exporters Exporter 将监控数据采集的端点通过 HTTP 服务的形式暴露给 Prometheus Server,Prometheus Server 通过访问该 Exporter 提供的 Endpoint 端点,即可获取到需要采集的监控数据。RocketMQExporter 就是这样一个 Exporter,它首先从 RocketMQ 集群采集数据,然后借助 Prometheus 提供的第三方客户端库将采集的数据规范化成符合 Prometheus 系统要求的数据,Prometheus 定时去从 Exporter 拉取数据即可。 当前 RocketMQ Exporter 已被 Prometheus 官方收录,其地址为:。 RocketMQExporter 的具体实现 当前在 Exporter 当中,实现原理如下图所示: 整个系统基于 spring boot 框架来实现。由于 MQ 内部本身提供了比较全面的数据统计信息,所以对于 Exporter 而言,只需要将 MQ 集群提供的统计信息取出然后进行加工而已。所以 RocketMQExporter 的基本逻辑是内部启动多个定时任务周期性的从 MQ 集群拉取数据,然后将数据规范化后通过端点暴露给 Prometheus 即可。其中主要包含如下主要的三个功能部分: MQAdminExt 模块通过封装 MQ 系统客户端提供的接口来获取 MQ 集群内部的统计信息。 MetricService 负责将 MQ 集群返回的结果数据进行加工,使其符合 Prometheus 要求的格式化数据。 Collect 模块负责存储规范化后的数据,最后当 Prometheus 定时从 Exporter 拉取数据的时候,Exporter 就将 Collector 收集的数据通过 HTTP 的形式在/metrics 端点进行暴露。 RocketMQExporter 的监控指标和告警指标 RocketMQExporter 主要是配合 Prometheus 来做监控,下面来看看当前在 Expoter 中定义了哪些监控指标和告警指标。 监控指标 rocketmq_message_accumulation 是一个聚合指标,需要根据其它上报指标聚合生成。 告警指标 消费者堆积告警指标也是一个聚合指标,它根据消费堆积的聚合指标生成,value 这个阈值对每个消费者是不固定的,当前是根据过去 5 分钟生产者生产的消息数量来定,用户也可以根据实际情况自行设定该阈值。告警指标设置的值只是个阈值只是象征性的值,用户可根据在实际使用 RocketMQ 的情况下自行设定。这里重点介绍一下消费者堆积告警指标,在以往的监控系统中,由于没有像 Prometheus 那样有强大的 PromQL 语言,在处理消费者告警问题时势必需要为每个消费者设置告警,那这样就需要 RocketMQ 系统的维护人员为每个消费者添加,要么在系统后台检测到有新的消费者创建时自动添加。在 Prometheus 中,这可以通过一条如下的语句来实现: (sum(rocketmq_producer_offset) by (topic) on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)560 0 借助 PromQL 这一条语句不仅可以实现为任意一个消费者创建消费告警堆积告警,而且还可以使消费堆积的阈值取一个跟生产者发送速度相关的阈值。这样大大增加了消费堆积告警的准确性。 RocketMQExporter 使用示例 1. 启动 NameServer 和 Broker 要验证 RocketMQ 的 SpringBoot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。 2. 编译 RocketMQExporter 用户当前使用,需要自行下载 git 源码编译: git clone https://github.com/apache/rocketmqexporter cd rocketmqexporter mvn clean install 3. 配置和运行 RocketMQExporter 有如下的运行选项: 以上的运行选项既可以在下载代码后在配置文件中更改,也可以通过命令行来设置。 编译出来的 jar 包就叫 rocketmqexporter0.0.1SNAPSHOT.jar,可以通过如下的方式来运行。 java jar rocketmqexporter0.0.1SNAPSHOT.jar [rocketmq.config.namesrvAddr="127.0.0.1:9876" ...] 4. 安装 Prometheus 首先到 Prometheus去下载 Prometheus 安装包,当前以 linux 系统安装为例,选择的安装包为 prometheus2.7.0rc.1.linuxamd64.tar.gz,经过如下的操作步骤就可以启动 prometheus 进程。 tar xzf prometheus2.7.0rc.1.linuxamd64.tar.gzcd prometheus2.7.0rc.1.linuxamd64/./prometheus config.file=prometheus.yml web.listenaddress=:5555 Prometheus 默认监听端口号为 9090,为了不与系统上的其它进程监听端口冲突,我们在启动参数里面重新设置了监听端口号为 5555。然后通过浏览器访问 ;服务器 IP 地址:5555,就可以验证 Prometheus 是否已成功安装,显示界面如下: 由于 RocketMQExporter 进程已启动,这个时候可以通过 Prometheus 来抓取 RocketMQExporter 的数据,这个时候只需要更改 Prometheus 启动的配置文件即可。 整体配置文件如下: my global config global: scrape_interval: 15s Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s Evaluate rules every 15 seconds. The default is every 1 minute. scrape_timeout is set to the global default (10s). Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: "first_rules.yml" "second_rules.yml" scrape_configs: job_name: 'prometheus' static_configs: targets: ['localhost:5555'] job_name: 'exporter' static_configs: targets: ['localhost:5557'] 更改配置文件后,重启服务即可。重启后就可以在 Prometheus 界面查询 RocketMQExporter 上报的指标,例如查询 rocketmq_broker_tps 指标,其结果如下: 5. 告警规则添加 在 Prometheus 可以展示 RocketMQExporter 的指标后,就可以在 Prometheus 中配置 RocketMQ 的告警指标了。在 Prometheus 的配置文件中添加如下的告警配置项,.rules 表示可以匹配多个后缀为 rules 的文件。 rule_files: "first_rules.yml" "second_rules.yml" /home/prometheus/prometheus2.7.0rc.1.linuxamd64/rules/.rules 当前设置的告警配置文件为 warn.rules,其文件具体内容如下所示。其中的阈值只起一个示例的作用,具体的阈值还需用户根据实际使用情况来自行设定。 Sample prometheus rules/alerts for rocketmq. Galera Alerts groups: name: GaleraAlerts rules: alert: RocketMQClusterProduceHigh expr: sum(rocketmq_producer_tps) by (cluster) = 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} Sending tps too high.' summary: cluster send tps too high alert: RocketMQClusterProduceLow expr: sum(rocketmq_producer_tps) by (cluster) = 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} consuming tps too high.' summary: cluster consume tps too high alert: RocketMQClusterConsumeLow expr: sum(rocketmq_consumer_tps) by (cluster) 0 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.topic}} lag behind and is falling behind (behind value {{$value}}).' summary: consumer lag behind alert: GroupGetLatencyByStoretime expr: rocketmq_group_get_latency_by_storetime 1000 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.broker}}, {{$labels.topic}} consume time lag behind message store time and (behind value is {{$value}}).' summary: message consumes time lag behind message store time too much 最终,可以在 Prometheus 的看一下告警展示效果,红色表示当前处于告警状态的项,绿色表示正常状态。 6. Grafana dashboard for RocketMQ Prometheus 自身的指标展示平台没有当前流行的展示平台 Grafana 好, 为了更好的展示 RocketMQ 的指标,可以使用 Grafana 来展示 Prometheus 获取的指标。 首先到官网去下载:,这里仍以二进制文件安装为例进行介绍。 wget https://dl.grafana.com/oss/release/grafana6.2.5.linuxamd64.tar.gz tar zxvf grafana6.2.5.linuxamd64.tar.gz cd grafana5.4.3/ 同样为了不与其它进程的使用端口冲突,可以修改 conf 目录下的 defaults.ini 文件的监听端口,当前将 grafana 的监听端口改为 55555,然后使用如下的命令启动即可: ./bin/grafanaserver web 然后通过浏览器访问 ;服务器 IP 地址:55555,就可以验证 grafana 是否已成功安装。系统默认用户名和密码为 admin/admin,第一次登陆系统会要求修改密码,修改密码后登陆,界面显示如下: 点击 Add data source 按钮,会要求选择数据源。 选择数据源为 Prometheus,设置数据源的地址为前面步骤启动的 Prometheus 的地址。 回到主界面会要求创建新的 Dashboard。 点击创建 dashboard,创建 dashboard 可以自己手动创建,也可以以配置文件导入的方式创建,当前已将 RocketMQ 的 dashboard 配置文件上传到 Grafana 的官网,这里以配置文件导入的方式进行创建。 点击 New dashboard 下拉按钮。 选择 import dashboard。 这个时候可以到 Grafana 官网去下载当前已为 RocketMQ 创建好的配置文件,地址为:,如下图所示: 点击 download 就可以下载配置文件,下载配置文件然后,复制配置文件中的内容粘贴到上图的粘贴内容处。 最后按上述方式就将配置文件导入到 Grafana 了。 最终的效果如下所示: 作者简介 陈厚道,曾就职于腾讯、盛大、斗鱼等互联网公司。目前就职于尚德机构,在尚德机构负责基础架构方面的设计和开发工作。对分布式消息队列、微服务架构和落地、DevOps 和监控平台有比较深入的研究。 冯庆,曾就职于华为。目前就职于尚德机构,在尚德机构基础架构团队负责基础组件的开发工作。 在 PC 端登录 知行动手实验室,沉浸式体验在线交互教程。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:陈厚道、冯庆
#技术探索 #可观测

2021年4月3日

阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。2020 年双十一大促中,消息中间件 RocketMQ 发生了以下几个方面的变化: 云原生化实践:完成运维层面的云原生化改造,实现 Kubernetes 化。 性能优化:消息过滤优化交易集群性能提升 30%。 全新的消费模型:对于延迟敏感业务提供新的消费模式,降低因发布、重启等场景下导致的消费延迟。 云原生化实践 1. 背景 Kubernetes 作为目前云原生化技术栈实践中重要的一环,其生态已经逐步建立并日益丰富。目前,服务于集团内部的 RocketMQ 集群拥有巨大的规模以及各种历史因素,因此在运维方面存在相当一部分痛点,我们希望能够通过云原生技术栈来尝试找到对应解决方案,并同时实现降本提效,达到无人值守的自动化运维。 消息中间件早在 2016 年,通过内部团队提供的中间件部署平台实现了容器化和自动化发布,整体的运维比 2016 年前已经有了很大的提高,但是作为一个有状态的服务,在运维层面仍然存在较多的问题。 中间件部署平台帮我们完成了资源的申请,容器的创建、初始化、镜像安装等一系列的基础工作,但是因为中间件各个产品都有自己不同的部署逻辑,所以在应用的发布上,就是各应用自己的定制化了。中间件部署平台的开发也不完全了解集团内 RocketMQ 的部署过程是怎样的。 因此在 2016 年的时候,部署平台需要我们去亲自实现消息中间件的应用发布代码。虽然部署平台大大提升了我们的运维效率,甚至还能实现一键发布,但是这样的方案也有不少的问题。比较明显的就是,当我们的发布逻辑有变化的时候,还需要去修改部署平台对应的代码,需要部署平台升级来支持我们,用最近比较流行的一个说法,就是相当不云原生。 同样在故障机替换、集群缩容等操作中,存在部分人工参与的工作,如切流,堆积数据的确认等。我们尝试过在部署平台中集成更多消息中间件自己的运维逻辑,不过在其他团队的工程里写自己的业务代码,确实也是一个不太友好的实现方案,因此我们希望通过 Kubernetes 来实现消息中间件自己的 operator 。我们同样希望利用云化后云盘的多副本能力来降低我们的机器成本并降低主备运维的复杂程度。 经过一段时间的跟进与探讨,最终再次由内部团队承担了建设云原生应用运维平台的任务,并依托于中间件部署平台的经验,借助云原生技术栈,实现对有状态应用自动化运维的突破。 2. 实现 整体的实现方案如上图所示,通过自定义的 CRD 对消息中间件的业务模型进行抽象,将原有的在中间件部署平台的业务发布部署逻辑下沉到消息中间件自己的 operator 中,托管在内部 Kubernetes 平台上。该平台负责所有的容器生产、初始化以及集团内一切线上环境的基线部署,屏蔽掉 IaaS 层的所有细节。 Operator 承担了所有的新建集群、扩容、缩容、迁移的全部逻辑,包括每个 pod 对应的 brokerName 自动生成、配置文件,根据集群不同功能而配置的各种开关,元数据的同步复制等等。同时之前一些人工的相关操作,比如切流时候的流量观察,下线前的堆积数据观察等也全部集成到了 operator 中。当我们有需求重新修改各种运维逻辑的时候,也再也不用去依赖通用的具体实现,修改自己的 operator 即可。 最后线上的实际部署情况去掉了图中的所有的 replica 备机。在 Kubernetes 的理念中,一个集群中每个实例的状态是一致的,没有依赖关系,而如果按照消息中间件原有的主备成对部署的方案,主备之间是有严格的对应关系,并且在上下线发布过程中有严格的顺序要求,这种部署模式在 Kubernetes 的体系下是并不提倡的。若依然采用以上老的架构方式,会导致实例控制的复杂性和不可控性,同时我们也希望能更多的遵循 Kubernetes 的运维理念。 云化后的 ECS 使用的是高速云盘,底层将对数据做了多备份,因此数据的可用性得到了保障。并且高速云盘在性能上完全满足 MQ 同步刷盘,因此,此时就可以把之前的异步刷盘改为同步,保证消息写入时的不丢失问题。云原生模式下,所有的实例环境均是一致性的,依托容器技术和 Kubernetes 的技术,可实现任何实例挂掉(包含宕机引起的挂掉),都能自动自愈,快速恢复。 解决了数据的可靠性和服务的可用性后,整个云原生化后的架构可以变得更加简单,只有 broker 的概念,再无主备之分。 3. 大促验证 上图是 Kubernetes 上线后双十一大促当天的发送 RT 统计,可见大促期间的发送 RT 较为平稳,整体符合预期,云原生化实践完成了关键性的里程碑。 性能优化 1. 背景 RocketMQ 至今已经连续七年 0 故障支持集团的双十一大促。自从 RocketMQ 诞生以来,为了能够完全承载包括集团业务中台交易消息等核心链路在内的各类关键业务,复用了原有的上层协议逻辑,使得各类业务方完全无感知的切换到 RocketMQ 上,并同时充分享受了更为稳定和强大的 RocketMQ 消息中间件的各类特性。 当前,申请订阅业务中台的核心交易消息的业务方一直都在不断持续增加,并且随着各类业务复杂度提升,业务方的消息订阅配置也变得更加复杂繁琐,从而使得交易集群的进行过滤的计算逻辑也变得更为复杂。这些业务方部分沿用旧的协议逻辑(Header 过滤),部分使用 RocketMQ 特有的 SQL 过滤。 2. 主要成本 目前集团内部 RocketMQ 的大促机器成本绝大部分都是交易消息相关的集群,在双十一零点峰值期间,交易集群的峰值和交易峰值成正比,叠加每年新增的复杂订阅带来了额外 CPU 过滤计算逻辑,交易集群都是大促中机器成本增长最大的地方。 3. 优化过程 由于历史原因,大部分的业务方主要还是使用 Header 过滤,内部实现其实是aviator 表达式。仔细观察交易消息集群的业务方过滤表达式,可以发现绝大部分都指定类似MessageType == xxxx这样的条件。翻看 aviator 的源码可以发现这样的条件最终会调用 Java 的字符串比较String.compareTo()。 由于交易消息包括大量不同业务的 MessageType,光是有记录的起码有几千个,随着交易业务流程复杂化,MessageType 的增长更是繁多。随着交易峰值的提高,交易消息峰值正比增长,叠加这部分更加复杂的过滤,持续增长的将来,交易集群的成本极可能和交易峰值指数增长,因此决心对这部分进行优化。 原有的过滤流程如下,每个交易消息需要逐个匹配不同 group 的订阅关系表达式,如果符合表达式,则选取对应的 group 的机器进行投递。如下图所示: 对此流程进行优化的思路需要一定的灵感,在这里借助数据库索引的思路:原有流程可以把所有订阅方的过滤表达式看作数据库的记录,每次消息过滤就相当于一个带有特定条件的数据库查询,把所有匹配查询(消息)的记录(过滤表达式)选取出来作为结果。为了加快查询结果,可以选择 MessageType 作为一个索引字段进行索引化,每次查询变为先匹配 MessageType 主索引,然后把匹配上主索引的记录再进行其它条件(如下图的 sellerId 和 testA )匹配,优化流程如下图所示: 以上优化流程确定后,要关注的技术点有两个: 如何抽取每个表达式中的 MessageType 字段? 如何对 MessageType 字段进行索引化? 对于技术点 1 ,需要针对 aviator 的编译流程进行 hook ,深入 aviator 源码后,可以发现 aviator 的编译是典型的Recursive descent,同时需要考虑到提取后父表达式的短路问题。 在编译过程中针对 messageType==XXX 这种类型进行提取后,把原有的 message==XXX 转变为 true/false 两种情况,然后针对 true、false 进行表达式的短路即可得出表达式优化提取后的情况。例如: 表达式: messageType=='200tradepaiddone' && buyerId==123456 提取为两个子表达式: 子表达式1(messageType==200tradepaiddone):buyerId==123456 子表达式2(messageType!=200tradepaiddone):false 具体到 aviator 的实现里,表达式编译会把每个 token 构建一个 List ,类似如下图所示(为方便理解,绿色方框的是 token ,其它框表示表达式的具体条件组合): 提取了 messageType ,有两种情况: 情况一:messageType == '200tradepaiddone',则把之前 token 的位置合并成true,然后进行表达式短路计算,最后优化成 buyerId==123456 ,具体如下: 情况二:messageType != '200tradepaiddone',则把之前 token 的位置合并成 false ,表达式短路计算后,最后优化成 false ,具体如下: 这样就完成 messageType 的提取。这里可能有人就有一个疑问,为什么要考虑到上面的情况二,messageType != '200tradepaiddone',这是因为必须要考虑到多个条件的时候,比如: (messageType=='200tradepaiddone' && buyerId==123456) || (messageType=='200tradesuccess' && buyerId==3333) 就必须考虑到不等于的情况了。同理,如果考虑到多个表达式嵌套,需要逐步进行短路计算。但整体逻辑是类似的,这里就不再赘述。 说完技术点 1,我们继续关注技术点 2,考虑到高效过滤,直接使用 HashMap 结构进行索引化即可,即把 messageType 的值作为 HashMap 的 key ,把提取后的子表达式作为 HashMap 的 value ,这样每次过滤直接通过一次 hash 计算即可过滤掉绝大部分不适合的表达式,大大提高了过滤效率。 3. 优化效果 该优化最主要降低了 CPU 计算逻辑,根据优化前后的性能情况对比,我们发现不同的交易集群中的订阅方订阅表达式复杂度越高,优化效果越好,这个是符合我们的预期的,其中最大的 CPU 优化有32%的提升,大大降低了本年度 RocketMQ 的部署机器成本。 全新的消费模型 —— POP 消费 1. 背景 RocketMQ 的 PULL 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,但处于半死不活的状态,与 broker 的心跳没有断掉的时候,客户端 rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,这样会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 rebalance 导致消费延迟影响等无法避免的问题。如下图所示: 当 Pull Client 2 发生 hang 机器的时候,它所分配到的三个 Broker 上的 Q2 都出现严重的红色堆积。对于此,我们增加了一种新的消费模型 —— POP 消费,能够解决此类稳定性问题。如下图所示: POP 消费中,三个客户端并不需要 rebalance 去分配消费队列,取而代之的是,它们都会使用 POP 请求所有的 broker 获取消息进行消费。broker 内部会把自身的三个队列的消息根据一定的算法分配给请求的 POP Client。即使 Pop Client 2 出现 hang,但内部队列的消息也会让 Pop Client1 和 Pop Client2 进行消费。这样就 hang 机器造成的避免了消费堆积。 2. 实现 POP 消费和原来 PULL 消费对比,最大的一点就是弱化了队列这个概念,PULL 消费需要客户端通过 rebalance 把 broker 的队列分配好,从而去消费分配到自己专属的队列,新的 POP 消费中,客户端的机器会直接到每个 broker 的队列进行请求消费, broker 会把消息分配返回给等待的机器。随后客户端消费结束后返回对应的 Ack 结果通知 broker,broker 再标记消息消费结果,如果超时没响应或者消费失败,再会进行重试。 POP 消费的架构图如上图所示。Broker 对于每次 POP 的请求,都会有以下三个操作: 对应的队列进行加锁,然后从 store 层获取该队列的消息; 然后写入 CK 消息,表明获取的消息要被 POP 消费; 最后提交当前位点,并释放锁。 CK 消息实际上是记录了 POP 消息具体位点的定时消息,当客户端超时没响应的时候,CK 消息就会重新被 broker 消费,然后把 CK 消息的位点的消息写入重试队列。如果 broker 收到客户端的消费结果的 Ack ,删除对应的 CK 消息,然后根据具体结果判断是否需要重试。 从整体流程可见,POP 消费并不需要 reblance ,可以避免 rebalance 带来的消费延时,同时客户端可以消费 broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:愈安
#行业实践 #高可用

2021年3月18日

如何在 Spring 生态中玩转 RocketMQ?
在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。本文展示了 5 种在 Spring 生态中文玩转 RocketMQ 的方式,并描述了每个项目的特点和使用场景。 一、前言 上世纪 90 年代末,随着 Java EE(Enterprise Edition)的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection),控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置。除了依赖注入、控制翻转、AOP 这些技术,Spring 后续衍生出 AMQP、Transactional、Security、Batch、Data Access 等模块,涉及开发的各个领域。 2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布。它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发,测试,运行和部署 Spring 应用,并能通过简单地与各种启动器(如springbootwebstarter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。Spring Boot 的出现可以说是 Spring 框架的第二春,它不但简化了开发的流程,目前更是事实标准。下面这幅图可以看出相同功能的 Spring 和 Spring Boot 的代码实现对比。 Apache RocketMQ 是一款是业界知名的分布式消息和流处理中间件,它主要功能是消息分发、异步解耦、削峰填谷等。RocketMQ 是一款金融级消息及流数据平台,RocketMQ 在交易、支付链路上用的很多,主要是对消息链路质量要求非常高的场景,能够支持万亿级消息洪峰。RocketMQ 在业务消息中被广泛应用,并衍生出顺序消息、事务消息、延迟消息等匹配各类业务场景的特殊消息。 本文的主角就是 Spring 和 RocketMQ,那几乎每个 Java 程序员都会使用 Spring 框架与支持丰富业务场景的 RocketMQ 会碰撞出怎么样的火花? 二、RocketMQ 与 Spring 的碰撞 在介绍 RocketMQ 与 Spring 故事之前,不得不提到 Spring 中的两个关于消息的框架,Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。 1. Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header。不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 在 Apache RocketMQ 生态中,RocketMQSpringBootStarter(下文简称 RocketMQSpring)就是一个支持 Spring Messaging API 标准的项目。该项目把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费,也支持扩展出 RocketMQ 原生 API 来支持更加丰富的消息类型。在 RocketMQSpring 毕业初期,RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)故事的佳话[1],著名 Spring 布道师 Josh Long 向国外同学介绍如何使用 RocketMQSpring 收发消息[2]。RocketMQSpring 也在短短两年时间超越 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Spring Messaging 生态中最活跃的消息项目。 2. Spring Cloud Stream Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下: Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。 在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,RocketMQ 的 topic 或者 RabbitMQ 的 exchange)。 Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。Spring 官方实现了 Rabbit binder 和 Kafka Binder。Spring Cloud Alibaba 实现了 RocketMQ Binder[3],其主要实现原理是把发送消息最终代理给了 RocketMQSpring 的 RocketMQTemplate,在消费端则内部会启动 RocketMQSpring Consumer Container 来接收消息。以此为基础,Spring Cloud Alibaba 还实现了 Spring Cloud Bus RocketMQ, 用户可以使用 RocketMQ 作为 Spring Cloud 体系内的消息总线,来连接分布式系统的所有节点。通过 Spring Cloud Stream RocketMQ Binder,RocketMQ 可以与 Spring Cloud 生态更好的结合。比如与 Spring Cloud Data Flow、Spring Cloud Funtion 结合,让 RocketMQ 可以在 Spring 流计算生态、Serverless(FaaS) 项目中被使用。 如今 Spring Cloud Stream RocketMQ Binder 和 Spring Cloud Bus RocketMQ 做为 Spring Cloud Alibaba 的实现已登陆 Spring 的官网[4],Spring Cloud Alibaba 也成为 Spring Cloud 最活跃的实现。 三、如何在 Spring 生态中选择 RocketMQ 实现? 通过介绍 Spring 中的消息框架,介绍了以 RocketMQ 为基础与 Spring 消息框架结合的几个项目,主要是 RocketMQSpring、Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ、Spring Data Flow 和 Spring Cloud Function。它们之间的关系可以如下图表示。 如何在实际业务开发中选择相应项目进行使用?下面分别列出每个项目的特点和使用场景。 1. RocketMQSpring 特点: 作为起步依赖,简单引入一个包就能在 Spring 生态用到 RocketMQ 客户端的所有功能。 利用了大量自动配置和注解简化了编程模型,并且支持 Spring Messaging API。 与 RocketMQ 原生 Java SDK 的功能完全对齐。 使用场景: 适合在 Spring Boot 中使用 RocketMQ 的用户,希望能用到 RocketMQ 原生 java 客户端的所有功能,并通过 Spring 注解和自动配置简化编程模型。 2. Spring Cloud Stream RocketMQ Binder 特点: 屏蔽底层 MQ 实现细节,上层 Spring Cloud Stream 的 API 是统一的。如果想从 Kafka 切到 RocketMQ,直接改个配置即可。 与 Spring Cloud 生态整合更加方便。比如 Spring Cloud Data Flow,这上面的流计算都是基于 Spring Cloud Stream;Spring Cloud Bus 消息总线内部也是用的 Spring Cloud Stream。 Spring Cloud Stream 提供的注解,编程体验都是非常棒。 使用场景: 在代码层面能完全屏蔽底层消息中间件的用户,并且希望能项目能更好的接入 Spring Cloud 生态(Spring Cloud Data Flow、Spring Cloud Funtcion 等)。 3. Spring Cloud Bus RocketMQ 特点: 将 RocketMQ 作为事件的“传输器”,通过发送事件(消息)到消息队列上,从而广播到订阅该事件(消息)的所有节点上,完成事件的分发和通知。 使用场景: 在 Spring 生态中希望用 RocketMQ 做消息总线的用户,可以用在应用间事件的通信,配置中心客户端刷新等场景。 4. Spring Cloud Data Flow 特点: 以 Source/Processor/Sink 组件进行流式任务处理。RocketMQ 作为流处理过程中的中间存储组件。 使用场景: 流处理,大数据处理场景。 5. Spring Cloud Function 特点: 消息的消费/生产/处理都是一次函数调用,融合 Java 生态的 Function 模型。 使用场景: Serverless 场景。 本文整体介绍了在 Spring 生态中接入 RockeMQ 的 5 种方法,让各位开发者对几种经典场景有宏观的了解。后续会有专栏详细介绍上述各个项目的具体使用方法和应用场景,真正地在 Spring 生态中玩转 RocketMQ! 在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,立即体验! 移动端同学,需要在PC端登录 start.aliyun.com 进行体验。 相关链接: 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
#社区动态 #微服务

2021年2月2日

RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
2019 年 1 月,孵化 6 个月的 RocketMQSpring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQSpring 代码进行 review,引出一段。 时隔两年,RocketMQSpring 正式发布 2.2.0。在这期间,RocketMQSpring 迭代了数个版本,以 RocketMQSpring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 ,Spring 布道师 baeldung 向国外同学介绍,越来越多国内外的同学开始使用 RocketMQSpring 收发消息,RocketMQSpring 仓库的 star 数也在短短两年时间内超越了 SpringKafka 和 SpringAMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。 RocketMQSpring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQSpring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。 遵循 Spring Messaging API 规范 Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 1. 发送端 RocketMQSpring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQSpring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。 除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQSpring 内置完成),在消费端约定好对应的 Schema 即可正常收发。 RocketMQTemplate Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message message) void asyncSend(String destination, Message message, SendCallback sendCallback) void asyncSend(String destination, Message message, SendCallback sendCallback) …… 2. 消费端 在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。 @Service @RocketMQMessageListener(topic = "demo.rocketmq.topic", consumerGroup = "string_consumer", selectorExpression = "{demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener { @Override public void onMessage(String message) { System.out.printf(" StringConsumer received: %s \n", message); } } 除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQSpring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。 配置文件resource/application.properties: rocketmq.nameserver=localhost:9876 rocketmq.consumer.group=mygroup1 rocketmq.consumer.topic=test Pull Consumer代码: while(!isStop) { List messages = rocketMQTemplate.receive(String.class); System.out.println(messages); } 丰富的消息类型 RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/oneway、顺序、延迟、批量、事务以及 RequestReply 消息。在这里,主要介绍较为特殊的事务消息和 requestreply 消息。 1. 事务消息 RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。 // Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("testtopic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } } 在 2.1.0 版本中,RocketMQSpring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。 2. RequestReply 消息 在 2.1.0 版本中,RocketMQSpring 开始支持 RequestReply 消息。RequestReply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQSpring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。 // 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback() { @Override public void onSuccess(User message) { …… } @Override public void onException(Throwable e) { …… } }); 在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。 @Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener { @Override public String onMessage(String message) { …… return "reply string"; } } RocketMQSpring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmqspringbootstarter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在 中有更加详细的用法和常见问题解答。 据统计,从 RocketMQSpring 发布第一个正式版本以来,RocketMQSpring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...钉钉搜索群号:21982288,即可进群和众多开发者交流! 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:RocketMQ 官微
#社区动态 #微服务

2021年1月6日

再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!
“童年的雨天最是泥泞,却是记忆里最干净的曾经。凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!” 走过这个岁末,万众期待的 了,在这个版本中社区对 RocketMQ 完成大量的优化和问题修复。更重要的是,该版本从性能、稳定性、功能三个方面大幅度提升 DLedger 模式能力。 是 中一个基于 Raft 的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题,而在 4.8.0 版本中社区也对 RocketMQ DLedger 模式进行了全面升级。 性能升级 异步化 pipeline 模式 RocketMQ 4.7.0 重新升级了同步双写的架构,利用异步化 pipeline 模式大幅提升了同步双写的性能。在 RocketMQ 4.8.0 中,社区将这一改进应用到 DLedger 模式中, 下图展示了 DLedger 模式下 broker 处理发送消息的过程。在原本的架构中, SendMessageProcessor 线程对每一个消息的处理,都需要等待多数派复制成功确认,才会返回给客户端,而在新版本中,利用 CompletableFuture 对线程处理消息的过程进行异步化改造,不再等待多数派的确认即可对下一个请求进行处理,Ack 操作由其他线程确认之后再进行结果处理并返回给客户端。通过对复制过程进行切分并将其流水线化,减少线程的长时间等待,充分利用 CPU,从而大幅提高吞吐量。 批量日志复制 Batch 一直是性能优化的重要方式,在新版本中,可以通过设置 isEnableBatchPush=true 来开启 DLedger 模式的批量复制。通过将多条数据聚合在一个包中进行发送,可以降低收发包的个数,从而降低系统调用和上下文的切换。在数据发送压力比较大,并且可能达到系统收发包瓶颈的情况下,批量复制能显著提高吞吐量。值得注意的是,DLedger 模式下的批量复制并不会对单个包进行延时的攒批处理,因此不会影响单个消息的发送时延。 除了上述的性能优化,社区还对 DLedger 模式下影响性能的锁、缓存等做了数项性能优化,使 DLedger 模式下的性能提升数倍。 稳定性升级 为了验证和测试 Dledger 模式的可靠性,除了本地对 DLedger 模式进行了各种各样的测试,社区利用 框架对 RocketMQ DLedger 模式进行了大量 Chaos 测试。OpenMessagingChaos 是一个利用故障注入来验证各种消息平台一致性和高可用性的测试框架,在 OpenMessagingChaos 的测试中,客户端并发地向待测试集群发送和接收消息,中间会间隔性地对集群进行故障注入,最后给出测试结果,包括是否丢消息,是否有重复消息,集群平均故障恢复时间等。利用 OpenMessagingChaos,我们验证了 DLedger 模式在以下故障注入场景下的表现: randompartition(fixedpartition)故障随机挑选节点进行网络隔离,模拟常见的对称网络分区。 randomloss 故障随机挑选节点并对这些节点接收和发送的网络包进行按比例丢弃,模拟一些节点网络较差的情况。 randomkill(minorkill、majorkill、fixedkill)故障模拟常见的进程崩溃情况。 randomsuspend(minorsuspend、majorsuspend、fixedsuspend)故障模拟一些慢节点的情况,比如发生 Full GC、OOM 等。 bridge 和 partitionmajoritiesring 故障模拟比较极端的非对称网络分区。 以 minorkill 故障注入为例,我们部署 5 个节点组成一组 DLedger 模式的 RocketMQ broker 进行 Chaos 测试。minorkill 故障注入会随机挑选集群中少数节点进程杀死,由于杀死少数节点,即使集群不可用也能在一段时间内恢复,方便测试集群平均故障恢复时间。 测试过程中我们设置四个客户端并发向 RocketMQ DLedger 集群发送和接收消息,故障注入时间间隔为 100s,即 100s 正常运行,100s 故障注入,一直循环,整个阶段一共持续 2400s。测试结束后,OpenMessagingChaos 会给出测试结果和时延图。下图展示了整个测试过程中入队操作的时延情况。 图中纵坐标是是时延,横坐标是测试时间,绿色框表示数据发送成功,红色框表示数据发送失败,蓝色框表示不确定是否数据添加成功,灰色部分表示故障注入的时间段。可以看出一些故障注入时间段造成了集群短暂的不可用,一些故障时间段则没有,这是合理的。由于是随机网络分区,所以只有杀死的少数节点包含 leader 节点时才会造成集群重新选举,但即使造成集群重新选举, DLedger 模式在一段时间后也会恢复可用性。 下图是测试结束后 OpenMessagingChaos 给出的统计结果,可以看到一共成功发送了 11W 个数据,没有数据丢失,这表明即使在故障下,RocketMQ DLedger 模式仍旧满足 At Least Once 的投递语义。此外,RTOTestResult 表明 12 次故障时间段一共发生了 3 次集群不可用的情况(与上述时延图一致),但每次集群都能在 30 秒以内恢复,平均故障恢复时间在 22 秒左右。 在 OpenMessaging Chaos 测试过程中,我们发现了 DLedger 模式存在的数个隐性问题并进行了修复,提高了 DLedger 模式下对进程异常崩溃、慢节点、对称/非对称网络分区、网络丢包的容错能力,也进一步验证了 DLedger 模式的可靠性。 功能升级 DLedger 模式支持 Preferred Leader 在旧版本中一组 Broker 中选谁作为 Leader 完全是随机的。但是在新版中我们可以通过配置 preferredLeaderId 来指定优先选举某个节点为 Leader,如下图所示,通过在三个机房部署 DLedger 模式的 broker 组,利用 preferred leader 可以更好的实现机房对齐的功能,图中 DC1 中服务更好,我们可以优先选择在 DC1 中部署 Master。此外,利用 preferred leader 还可以更好实现 DLedger 集群混部,从而充分利用机器资源。 DLedger 模式支持批量消息 从 RocketMQ 4.8.0 开始,DLedger 模式支持批量消息发送,从而在消息类型的支持上完全与 MasterSlave 部署形态保持一致。 除了对 DLedger 模式的大量优化,本次 RocketMQ 版本一共包含 Improvement 25 个,Bug Fix 11 个,文档和代码格式优化 16 个。据不完全统计,这些贡献来自近 40 位 RocketMQ 社区的 Contributor,感谢大家的积极参与。也非常期待有更多的用户、厂商、开发者参与到 RocketMQ 建设中来,加入 Apache RocketMQ 社区,永远不会太迟! 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户免费试用(2000TPS,1个月),优惠金额2000元!点击立即领取: 2、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:RocketMQ社区
#社区动态 #高可用