RocketMQ 快速上手体验

2024年7月26日

体验准备

体验开源 RocketMQ 准备

本教程旨在让开发者们快速体验开源 RocketMQ 环境配置、集群搭建、消息快速收发体验等环节。为降低部署门槛,提升上手体验,故仅在单机上部署整个 RocketMQ 集群,暂不考虑高可用部署形态。其它拓展部署形式,请参见第四章的参考资料自行尝试部署。
因此,本部分需要您准备:

  • 运行机器,64 位操作系统,推荐 Linux/Unix/macOS
  • 64 位 JDK 1.8+

具体体验内容请参见第二章。

体验阿里云云消息队列 RocketMQ 版准备

本部分教程旨在让开发者快速上手阿里云云消息队列 RocketMQ 版的资源创建、消息收发等流程。为降低使用门槛,优化上手体验,本教程将在阿里云云消息队列 RocketMQ 版推出的按量付费实例上进行。该类实例随买随用,用后即可释放,使用过程可能会产生极少量费用。
因此,本部分需要您准备:

  • 阿里云账号
  • 确保账号余额能够支持按量付费类型实例的体验费用

具体体验内容请参见第三章。

快速体验开源 RocketMQ

环境配置

RocketMQ 的安装包分为两种,二进制包和源码包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。为提升体验过程的流畅性,这边建议下载二进制包,直接运行 RocketMQ。
您可以点击这里下载二进制包。也可以点击这里下载 Apache RocketMQ 5.2.0 的源码包。
若您想从源码包开始进行上手体验,需要您安装maven,进入下载源码的目录,执行如下编译命令:

Terminal window
$ unzip rocketmq-all-5.2.0-source-release.zip
$ cd rocketmq-all-5.2.0-source-release/
$ mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U

集群部署

若您直接下载 RocketMQ 的二进制包,则可以直接进入二进制包的目录中:

Terminal window
$ cd rocketmq-all-5.2.0-bin-release

若您选择从源码开始体验,且已经在本地自行编译完成了二进制文件,则可进入源码目录下的 distribution/target 中的二进制文件目录:

Terminal window
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0

后续体验中的所有指令均在上述目录下执行。

启动 NameServer

安装完 RocketMQ 包后,我们执行下面的指令启动 NameServer:

Terminal window
### 启动namesrv
$ nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功
$ cat ~/logs/rocketmqlogs/namesrv.log

若一切正常,则会在执行完上述命令后,输出如下内容:

Terminal window
The Name Server boot success...

启动 Broker + Proxy

NameServer 成功启动后,我们启动 Broker 和 Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考部署教程

Terminal window
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ cat ~/logs/rocketmqlogs/proxy.log

我们可以在执行完上述命令后看到 proxy.log 中的内容,若看到如下信息,则表明 broker 已成功启动:

Terminal window
The broker[broker-a,192.169.1.2:10911] boot success...

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。

消息收发

工具测试消息收发

在进行工具测试消息收发之前,我们需要告诉客户端 NameServer 的地址,RocketMQ 有多种方式在客户端中设置 NameServer 地址,这里我们利用环境变量 NAMESRV_ADDR。

Terminal window
$ export NAMESRV_ADDR=localhost:9876

完成环境变量配置后,可以在命令行输入如下指令,启动生产者:

Terminal window
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

若生产成功,会输出如下内容:

Terminal window
SendResult [sendStatus=SEND_OK, msgId= ...

消息生产完成后,该消息便已经保存在本地 Broker 的存储中了。接下去再输入命令,启动消费者:

Terminal window
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

若消费成功,则会出现如下的输出内容:

Terminal window
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

SDK 测试消息收发

工具测试完成后,我们可以尝试使用 SDK 收发消息。使用 SDK 进行消息收发的教程较为复杂,若有一定工程代码编写、运行经验,可以参考该教程自行尝试,本文不再赘述。

配套运维能力

mqadmin 工具介绍

mqadmin 是 RocketMQ 配套的运维工具,能够非常简便的查看集群状态,创建、修改 topic 等元数据。
该工具的使用方式可以参考该说明文档。本文档仅举例如何使用该工具进行集群状态查看。

查看集群状态

对于刚刚启动的 Broker,我们可以尝试使用 mqadmin 工具对它状态进行查看,在二进制包目录下输入如下命令:

Terminal window
sh bin/mqadmin clusterlist -n localhost:9876

若集群运行正常,则输出如下:
image.png
在该输出中,您可以看到该 NameServer 下的集群名称、Broker 名称、对应 IP 地址、Broker 代码版本、消息生产速度、消息消费速度、定时消息总数、刷盘等待时长、消息保留时长、磁盘使用率等信息。
善用 mqadmin 工具,将能在集群故障时快速定位问题所在,并有能力人工介入作恢复。

关闭集群

当上述测试均完成后,您需要将集群进程(NameServer, Proxy, Broker)进行关闭,关闭方法如下:

Terminal window
# 关闭Broker
$ sh bin/mqshutdown broker
# 若一切正常,则会输出如下内容:
# The mqbroker(36695) is running...
# Send shutdown request to mqbroker with proxy enable OK(36695)
# 关闭NameServer
$ sh bin/mqshutdown namesrv
# 若一切正常,则会输出如下内容:
# The mqnamesrv(36664) is running...
# Send shutdown request to mqnamesrv(36664) OK

快速体验阿里云云消息队列 RocketMQ 版

体验阿里云云消息队列 RocketMQ 版主要需要如下图所示的几个步骤。本文将按照下面的流程,分三部分引导您快速体验。

创建账号 & 授权

注意:若您的账号为阿里云账号,则默认拥有云消息队列 RocketMQ 版服务的所有权限,无需进行授权操作。
账号角色查看方法如下:

登录阿里云控制台,页面右上角区域显示账号基本信息,若账号 ID 下显示主账号,表示该账号为阿里云账号,无需授权;若显示 RAM 用户,则该账号需要进行授权。

image.pngimage.png
(左图为主账号,无需授权;右图为 RAM 角色账号,需要授权)
若您使用的是 RAM 账号,则需要按该文档进行授权。考虑到大部分体验者应是个人开发者,授权过程故不在本文中展开说明。

创建资源

在调用 SDK 收发消息前,您需要提前创建云消息队列 RocketMQ 版的相关资源,包括创建云消息队列 RocketMQ 版实例、获取实例的接入点、创建 Topic、创建 ConsumerGroup。调用 SDK 时,需要将这些资源信息填写到 SDK 代码中。
需要注意的是,由于云消息队列 RocketMQ 版需要您预先准备网络、安全组等资源,所以在开通云消息队列 RocketMQ 版实例前,请尽量先参考如下教程做好准备工作:

当然,若您觉得跟着文档走比较复杂,云消息队列 RocketMQ 版在开通过程中也提供了全面的引导,辅助您在开通过程中自查资源准备情况,并立即补齐资源。下面我们直接以“零准备”的主账号进行云消息队列 RocketMQ 版实例购买、配置。

创建实例

  1. 进入云消息队列 RocketMQ 版产品控制台。可以直接从阿里云官网的产品下拉框中进入,选择“中间件”,并从中找到“云消息队列 RocketMQ 版”。

image.png

  1. 进入控制台后,点击“创建实例”按钮。

image.png

  1. 选择“Serverless 按累积量”的实例类型,进入创建配置页面。请注意,若您要创建 Serverless 类型实例,请确保您的购买地域支持该实例类型。

image.png

  1. 确认您在该地域是否已经有 VPC 资源。若无,则点击创建 VPC 专有网络。

image.png

  1. 进入专有网络的创建页面后,请输入专有网络名称、网段、交换机名称等信息:

image.png

  1. 请注意,由于云消息队列 RocketMQ 版的多可用区容灾高可用设计,需要您至少在两个可用区创建交换机。点击图中的“添加(1/10)”,可以同时创建多个交换机。

image.png
若您已经创建完成,仍可以进入专有网络控制台独立进行交换机的创建:
image.png
需要注意的是,创建交换机时,请选取和已创建交换机不同可用区进行创建。
image.png

  1. 创建完成后,重新返回云消息队列 RocketMQ 版控制台,即可在此处进行 VPC 专有网络的选择,以及 VSwitch(交换机)的选择。此处我们勾选两个可用区进行配置。

image.png

  1. 若您未创建安全组,则可以在安全组选择栏下直接进入“创建安全组”的流程。

image.png
进入创建页面后,选择创建安全组。
image.png
在网络栏选择刚刚配置的专有网络,其余安全组规则按默认即可,即可完成创建。
image.png

  1. 返回云消息队列 RocketMQ 版控制台,查看“服务关联角色”是否已经创建。若未创建,则可点击该按钮直接进行创建。

image.png
创建完成的效果如下:
image.png

  1. 若上述均已配置完成,但是购买按钮仍然显示灰色,且显示 PrivateLink 未开通,则点击进行开通即可。

image.png
image.png
注意,此处开通完成后,返回云消息队列 RocketMQ 版控制台,页面需要进行刷新才可正常购买。刷新可通过页面中“选择 VPC”等下拉框后面的“刷新”小按钮完成。

  1. 刷新完成后,即可正常购买云消息队列 RocketMQ 版实例了,创建若干分钟后,您就拥有了一个按量付费的云消息队列 RocketMQ 版实例。

image.png

获取接入点

  1. 实例列表页面中单击目标实例名称。

image.png

  1. 实例详情页面的 TCP 协议接入点区域即可查看实例的接入点信息。
    • VPC 专有网络接入点:使用 VPC 专有网络访问云消息队列 RocketMQ 版时使用。云消息队列 RocketMQ 版默认提供的接入点。
    • 公网接入点:使用公网访问云消息队列 RocketMQ 版时使用该接入点。仅当开启公网访问时显示。

image.png

获取账号密码

客户端接入云消息队列 RocketMQ 版服务端时,需要根据接入方式配置实例用户名密码。

  • 使用公网访问云消息队列 RocketMQ 版服务端:需要配置实例的用户名密码。
  • 使用VPC网络访问云消息队列 RocketMQ 版服务端:无需配置实例的用户名密码,系统会根据VPC接入点智能识别用户身份。

此处我们以公网访问为例,查看如何获取 Serverless 系列实例的账号密码:
image.png
如上图所示,在您实例下点击“访问控制”按钮,进入“智能身份识别”一栏,下面便是您的实例账号、密码。
后续若您需要用公网操作您的实例便需要填入此处的内容。

创建 Topic

现在我们已经拥有了一个 RocketMQ 实例,下面我们便在该实例下创建 Topic 资源。

  1. 实例列表页面中单击目标实例名称。
  2. 在左侧导航栏单击 Topic 管理,然后在 Topic 管理页面单击创建 Topic

image.png

  1. 创建 Topic 面板中填写Topic名称和描述,此处我们将 Topic 命名为”test”, 选择消息类型普通消息,然后单击确定,一个 Topic 便创建完成了。

image.png

创建订阅组(Group)

拥有一个 Topic 后,我们再创建一个订阅组(Group)。订阅组将被用于消息消费过程。

  1. 实例列表页面中单击目标实例名称。
  2. 在左侧导航栏单击 Group 管理,然后在 Group 管理页面单击创建 Group

image.png

  1. 创建 Group 面板填写Group ID,此处我们将 Group ID 设置为”test-group”。其他参数可使用默认配置,然后单击确定。此时,一个订阅组便创建完成了。

image.png

收发消息

为方便体验,我们选择在控制台进行消息的发送,编写消费者代码并运行,以消费控制台发送的那条消息。

  1. 控制台发送消息。首先进入 Topic 详情页面,点击右侧“快速体验”按钮。

image.png

  1. 填入消息内容,即可点击发送。发送成功后,这条消息便已进入您实例所在的存储中,您可点击查看其消息轨迹。

image.png

  1. 编写消费者代码,本教程将说明如何在 IntelliJ IDEA 中完成消费者的启动。本教程将从 0 开始教您从零开始构建一个 Java 项目。若您已有一定开发经验,请您根据真实情况选择性跳过。
    1. 首先,安装 IntelliJ IDEA。点击该链接,下滑页面,选择社区版(Community)进行下载。
    2. 新建 Java 工程:

image.png

  1. 在运行代码前,请在您的工程中添加 pom 依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
</dependencies>

添加完成后,pom 文件如下所示:
image.png

  1. 完成依赖添加后,您可以直接复制下面的代码并运行,但是需要注意的是,您要在代码中填入您的实例相关信息,这些信息均已经使用中括号({})框起。
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云ECS内网访问,建议填写VPC接入点。
* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
String endpoints = "{实例接入点,如rmq-cn-xxx.cn-zhangjiakou.rmq.aliyuncs.com:8080}";
//指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
String topic = "{Topic名称,如test}";
//为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
String consumerGroup = "{Group ID, 如test-group}";
String instanceId = "{实例id,如rmq-cn-xxx}";
String userName = "{账号名}";
String passWord = "{密码}";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setNamespace(instanceId)
.setCredentialProvider(new StaticSessionCredentialsProvider(userName, passWord))
.build();
//订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//设置消费监听器。
.setMessageListener(messageView -> {
//处理消息并返回消费结果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可关闭该进程。
//pushConsumer.close();
}
}

启动后,消费成功即可拿到之前在控制台发送的消息:
image.png

可观测能力

刚刚发送消息后,我们可以在控制台进行消息轨迹的查看。进入仪表盘时,会提示您创建服务关联角色,点击创建、授权即可。阿里云云消息队列 RocketMQ 版的可观测能力多样,细粒度的有消息级别的查询、轨迹查询。粗粒度的有仪表盘,能够在实例维度查看消息的生产、发送、堆积等情况。

消息查询 & 轨迹

针对我们刚刚发送的消息,可以点击“消息查询”功能,查询该消息的具体内容、查看消息轨迹,并可指定消费者进行消费能力验证等。image.png
尤其是消息轨迹功能,我们能够支持对特定消息进行全生命周期的展示,包括其生产者、存储时间、存储 ID、投递事件、消费者等信息。通过该可观测能力,我们能够十分清晰地了解消息收发的细节。
image.png

仪表盘

相对于消息查询功能,仪表盘属于粗粒度的可观测能力。该能力可以展现实例维度、Topic 维度、Group 维度的整体情况,包括但不限于收发速率、堆积情况等数据。且依托于 Grafana 的可视化能力,这些指标的展示都是十分直观且灵活的。如下图,我们可以看到刚刚测试的消息在何时进入实例,消费延迟时间等信息。
image.png

其它拓展能力以及参考文档

开源 RocketMQ 在 GitHub 社区中不断迭代成长,定期发布版本,您可以在社区内查看最新特性、提出 Bug,甚至参与 Bug 的修复。

此外,阿里云云消息队列 RocketMQ 版的更多特性、教程、最佳实践均可在官方文档中找到。基于 Serverless 系列可以让体验成本可控,若对其它消息队列特性感兴趣,请自行上手尝试。