2024年7月24日

基于 RocketMQ Connect 构建数据流转处理平台
从问题中来的 RocketMQ Connect 在电商系统、金融系统及物流系统,我们经常可以看到 RocketMQ 的身影。原因不难理解,随着数字化转型范围的扩大及进程的加快,业务系统的数据也在每日暴增,此时为了保证系统的稳定运行,就需要把运行压力分担出去。RocketMQ 就担任着这样的角色,它的异步消息处理与高并发读写能力,决定了系统底层的重构不会影响上层应用的功能。而 RocketMQ 的另一个优势——可伸缩能力,使系统在面临流量的不确定性时,实现对流量的缓冲处理。此外,RocketMQ 的顺序设计特性使其成为一个天然的排队引擎,例如,三个应用同时对一个后台引擎发起请求,确保不引起“撞车”事故。因此,RocketMQ 被用在异步解耦、削峰填谷以及事务消息等场景中。 但是,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ 自身不具备数据分析能力,但是有不少用户希望从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,使用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些分析系统中虽然是一种可行方案,却会引入新的组件,造成数据同步的链路较长,时延相对较高,用户体验不佳。 举个例子,假设业务场景中使用 OceanBase 作为数据存储,同时希望将这些数据同步到 Elasticsearch 进行全文搜索,有两种可行的数据同步方案。 方案一:从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此方案没什么问题,一旦数据增多,开发和维护都非常复杂,此时就要用到第二种方案。 方案二:引入消息中间件对上下游进行解藕,这能解决第一种方案的问题,但是一些较为复杂的问题还没有完全解决。比如,如何将数据从源数据同步到目标系统并保证高性能,如果保证同步任务的部分节点挂掉,数据同步依然正常进行,节点恢复依然可以断点续传,同时随着数据管道的增多,如何管理数据管道也变得十分困难。 总的来说,数据集成过程中的挑战主要有五个。 挑战一:数据源多,市面上可能有上百个数据源,且各数据源的系统差异较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。 挑战二:高性能问题,如何高效地从源数据系统同步到目的数据系统,并保障其性能。 挑战三:高可用问题,即Failover能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。 挑战四:弹性扩缩容能力,根据系统流量动态增加或减少节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。 挑战五:数据管道的管理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越复杂,如何高效管理监控众多的同步任务。 面对上述挑战 RocketMQ 如何解决? 第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中增加 Connect 组件,一方面对数据集成过程抽象,抽象标准的数据格式以及描述数据的 Schema,另一方面对同步任务进行抽象,任务的创建、分片都抽象成一套标准化的流程。 第二,基于标准的 API 实现 Connect Runtime。Runtime 提供了集群管理、配置管理、位点管理、负载均衡相关的能力,拥有了这些能力,开发者或者用户就只需要关注数据如何获取或如何写入,从而快速构建数据生态,如与 OceanBase、MySQL、Elasticsearch 等快速建立连接,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简单调用即可。 第三,提供完善的运维工具,方便管理同步任务,同时提供丰富的 Metrics 信息,方便查看同步任务的 TPS,流量等信息。 RocketMQ Connect 两大使用场景 这里为大家整理了 RocketMQ Connect 的两大使用场景。 场景一,RocketMQ 作为中间媒介,可以将上下游数据打通。 比如在新旧系统迁移的过程中,如果在业务量不大时使用 MySQL 就可以满足业务需求,而随着业务的增长,MySQL 性能无法满足业务要求时,需要对系统进行升级,选用分布式数据库 OceanBase 提升系统性能。 如何将旧系统数据无缝迁移到 OceanBase 中呢?在这个场景中 RocketMQ Connect 就可以发挥作用,RocketMQ Connect 可以构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁移。RocketMQ Connect 还可以用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic 当中,目标存储只需要对接 Elasticsearch 就可以构建一个搜索平台,目标存储如果是数据湖就可以构建一个数据湖平台。 除此之外,RocketMQ 自身也可以作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,可以构建 RocketMQ 多活容灾能力,这是社区正在孵化的 Replicator 可以实现的能力。 场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件——RocketMQ Streams,Connector 将各个存储系统的数据集成到RocketMQ Topic 当中,下游使用 RocketMQ Streams 流计算的能力就可以构建一个实时的流计算平台。当然也可以配合业务系统的 Service 实现业务系统快速从其它存储统一快速获取数据的能力。 还可以将 RocketMQ 作为端点的上游,将业务消息发到 Topic 中,使用 Connector 对数据做持久化或转存的操作。 如此一来,RocketMQ 就具备数据集成能力,可以实现任意任意异构数据源之间的数据同步,同时也具备统一的集群管理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只需要专注于数据拷贝,简单配置就可以得到一个具备配置化、低代码、低延时、高可用,支持故障处理和动态扩缩容数据集成平台。 RocketMQ Connect 实现原理 那么, RocketMQ Connect 是如何实现的呢?在介绍实现原理前,先来了解两个概念。 概念一:什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入 RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到目标系统,这种是 SinkConnector。Connector 决定需要创建任务的数量,从 Worker 接收配置传递给任务。 概念二:什么是 Task ? Task 是 Connector 任务分片的最小分配单位,是实际将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从 RocketMQ 读出写入到目标系统(SinkTask)真正的执行者,Task 是无状态的,可以动态的启停任务,多个 Task 可以并行执行,Connector 复制数据的并行度主要体现在 Task 上。一个 Task 任务可以理解为一个线程,多个 Task 则以多线程的方式运行。 通过 Connect 的 API 也可以看到 Connector 和 Task 各自的职责,Connector 实现时就已经确定数据复制的流向,Connector 接收数据源相关的配置,taskClass 获取需要创建的任务类型,通过 taskConfigs 的数量确定任务数量,并且为 Task 分配好配置。Task 拿到配置以后数据源建立连接并获取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector 和 Task 处理基本流程。 一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些任务运行在 Worker(进程)中。Worker 进程是 Connector 和 Task 运行环境,它提供 RESTFull 能力,接收 HTTP 请求,将获取到的配置传递给 Connector 和 Task,它还负责启动 Connector 和 Task,保存 Connector 配置信息,保存 Task 同步数据的位点信息,除此以外,Worker 还提供负载均衡能力,Connect 集群高可用、扩缩容、故障处理主要依赖 Worker 的负责均衡能力实现的。Worker 提供服务的流程如下: Worker 提供的服务发现及负载均衡的实现原理如下: 服务发现: 用过 RocketMQ 的开发者应该知道,它的使用很简单,就是发送和接收消息。消费模式分为集群模式和广播模式两种,集群消费模式下一个 Topic 可以有多个 Consumer 消费消息,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还可以将客户端上下线信息通知给其它节点,利用 RocketMQ 这个特性就实现了 Worker 的服务发现。 配置 / Offset 同步: Connector 的配置/Offset 信息同步通过每个 Worker 订阅相同的 Topic,不同 Worker 使用不同的 Consumer Group 实现的, Worker 节点可以通过这种方式消费到相同 Topic 的所有数据,即 Connector 配置/ Offset 信息,这类似于广播消费模式,这种数据同步模式可以保证任何一个 Worker 挂掉,该 Worker 上的任务依旧可以在存活的 Worker 正常拉起运行 ,并且可以获取到任务对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的基础。 负载均衡: RocketMQ 消费场景中,消费客户端 与 Topic Queue 之间有负载均衡能力,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载均衡,与 RocketMQ 客户端负载均衡一样,可以根据使用场景选择不同负载均衡算法。 上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP可以创建 Connector,管理Connector 以及查看 Connector 状态,简单列举: POST /connectors/{connector name} GET /connectors/{connector name}/config GET /connectors/{connector name}/status POST /connectors/{connector name}/stop 目前 Connector 支持单机、集群两种部署模式。集群模式至少要有两个节点,才能保证它的高可用。并且集群可以动态增加或者减少,做到了动态控制提升集群性能和节省成本节省的能力。单机模式更多方便了开发者开发测试 Connector 。 如何实现一个 Connector呢? 还是结合一个具体的场景看一看,例如业务数据当前是写入 MySQL 数据库中的,希望将 MySQL中数据实时同步到数据湖 Hudi 当中。只要实现 MySQL Source Connector 、Hudi Sink Connector 这两个 Connector 即可。 下面就以 MySQLSource Connector 为例,来看一下具体的如何实现。 实现 Connector 最主要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相关的 API 外,还有任务如何分配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API是需要创建的 Task,Connector 通过任务分配将相关的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,密码,IP,端口后就会创建数据库连接,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 方法,实现 Connector 时只要调用到 Poll 方法时可以获取到数据即可,这样 Connector 就基本写完了。然后打包以 Jar 包的形式提供出来,将它加载到 Worker 的节点中。 创建 Connector 任务后, Worker 中会创建一个或者多个线程,不停的轮询 Poll 方法,从而获取到 MySQL 表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。 RocketMQ Connect 现状与未来 RocketMQ Connect 的发展历程分为三个阶段。 第一阶段:Preview 阶段 RocketMQ Connect 发展的初期也即 Preview 阶段,实现了 Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 可以简单实现端到端的数据源同步,但功能还不够完善,不支持数据转换,序列化等能力,生态相对还比较贫乏。 第二阶段:1.0 阶段 在 1.0 阶段,Open Messaging Connect API 进行了升级,支持Schema、Transform,Converter等能力,在此基础上对 Connect Runtime 也进行了重大升级,对数据转换,序列化做了支持,复杂Schema也做了完善的支持。该阶段的 API、Runtime 能力已经基本完善,在此基础上,还有30+ Connecotor 实现,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等Connector实现,还做了Kafka Connector Adaptor可以运行Kafka生态的Connector。 第三阶段:2.0 阶段 RocketMQ Connect当前处于这个阶段,重点发展Connector生态,当 RocketMQ 的 Connector生态达到 100 + 时,RocketMQ 基本上可以与任意的一个数据系统去做连接。 目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中发布,欢迎感兴趣的同学试用。 总结 RocketMQ 是一个可靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,可以实现 RocketMQ 与其他数据系统之间的数据流入与流出。通过 RocketMQ Connect 可以实现 CDC,构建数据湖,结合流计算可实现数据价值。
作者:周波
#行业实践 #最佳实践 #生态集成

2023年1月13日

RocketMQ 集成生态再升级:轻松构建云上数据管道
阿里云消息队列 RocketMQ 版是阿里云基于 Apache RocketMQ 构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,面向互联网分布式应用场景提供微服务异步解耦、流式数据处理、事件驱动处理等核心能力。其自诞生以来一直为阿里集团提供稳定可靠的消息服务,历经多年双十一万亿级流量洪峰的验证。 随着业务需求场景日渐丰富,在多年经验积累后,阿里云 RocketMQ 也迎来了革命性的更新,正式发布了阿里云消息队列 RocketMQ 版 5.0,在架构、网络、负载均衡、存储等诸多方面进行了显著优化。其定位不再局限于消息解耦场景,将全新布局事件驱动和消息流式处理场景。 阿里云 EventBridge 作为云上事件枢纽一直以来都保持着对云上事件、数据的友好生态支持。随着 RocketMQ 5.0版本的用户日渐增多,EventBridge 在近期对 RocketMQ Connector 进行了全面升级。升级之后的 RocketMQ Connector 不仅可以支持RocketMQ 5.0 版本,同时也能支持云上自建 RocketMQ 实例。除此之外,基于成熟的事件流能力,用户使用 EventBridge 也能轻松构建消息路由能力,实现对灾备、数据同步的需求。 本文将从业务架构和 API 使用等方面讲解如何使用 EventBridge 创建阿里云 RocketMQ 4.0、5.0 版本,开源自建版本以及消息路由的相关任务。 EventBridgeRocketMQ 4.0 业务架构 RocketMQ 4.0 版本使用较为经典的 clientnameserverbroker 架构,整个应用主要由生产者、消费者、NameServer 和 Broker 组成。 Name Server:是一个几乎无状态节点,可集群部署,在消息队列 RocketMQ 版中提供命名服务,更新和发现 Broker 服务。 Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。 生产者:与 Name Server 集群中的其中一个节点(随机)建立长连接(Keepalive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长连接,且定时向 Master Broker 发送心跳。 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从  Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。 EventBridge在获取用户授权之后,利用生成的 sts 临时授权对客户的  RocketMQ 实例进行消息读取或写入。 API 使用 在 API 介绍方面,我们以创建「自定义总线自定义事件源」为例,事件目标以及事件流中的API基本一致。 基于 EventBridge 创建 RocketMQ 4.0 任务的 API 和之前基本保持了一致。具体参数如下 版本:代表阿里云消息队列 RocketMQ 版本,可选择 4.x 或 5.x; RocketMQ 实例:RocketMQ 对应的实例 ID。用户在阿里云 RocketMQ控制台每创建一个实例都会有一个对应的实例 ID,如MQ_INST_123456789_BX6zY7ah; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; 消费位点:初始消费位点。可选择最新位点、最早位点、或者指定时间戳。 EventBridgeRocketMQ 5.0 业务架构 RocketMQ 5.0 版将通用的存储逻辑下沉,集中解决消息存储的多副本、低延迟、海量队列分区等技术问题,将上层的消息处理剥离出完全的无状态计算层,主要完成协议适配、权限管理、消费状态、可观测运维体系支持,Broker 则继续专注于存储能力的持续优化。存算分离的架构设计,使得从 SDK 接入到线上运维全链路带来全面提升: 1. 轻量版 SDK 的开放和全链路可观测系统的提升:同时支持 4.x 通信协议和全新的 gRPC 通信协议,并内置 OpenTelemetry 埋点支持,新版本 SDK 新增了 10 余个指标埋点。 2. 消息级负载均衡:新版本 SDK 不再参与实际存储队列的负载均衡,消息负载均衡将更加轻量,以单条消息为调度最小单元。 3. 多网络访问支持:新版本支持单一实例同时暴露公网、内网等访问形式,方便客户多网络接入访问。 4. 海量分级存储:新版本开放分级存储历史消息保存能力,消息低成本无大小限制,最长保存 30 天。冷热数据进行分离设计,极大降低消费历史消息对实例的性能影响。 RocketMQ 5.0 版本 可以支持 VPC 内部安全识别,用户上云无需修改代码。在用户授予 EventBridge 网络和 RocketMQ 相关权限之后,用户在 EventBridge 创建 MQ 5.0 Source&Sink 任务的时,EventBridge 会根据 RocketMQ 5.0 实例的 VPC 信息,调用网络组件获取相应代理信息。MQ sdk 侧通过配置代理实现消息的收发。 API 使用 相比于 4.0 实例,5.0 实例多了 VPC、交换机和安全组 3 个参数。 5.0 实例新增了 VPC 属性,用户需要在对应 vpc 内去访问 MQ 5.0 实例。EventBridge 在获得用户授权之后,也是经由 5.0 实例对应的 VPC 内进行消息的收发。创建任务时前端会自动填充好实例的 vpc 和交换机信息。 安全组参数限制了 EventBridge 在 vpc 内的访问策略,用户可以选择使用已有安全组也可以选择快速创建,让 EventBridge 快速创建一个安全组供任务使用。安全组策略推荐使用默认的安全组策略。使用上推荐第一次在此vpc内创建任务时,使用 EventBridge 自动创建一个安全组,后续在此 VPC 内再创建其他任务时,在使用已有中选择 EventBridge 创建的安全组。 EventBridge自建 Apache RocketMQ 针对用户在阿里云自建 Apache RocketMQ 集群的场景,EventBridge 也支持了消息导出能力。用户通过配置接入点、topic、groupID、VPC 等信息,即可将自建集群中的消息导入 EventBridge,进而对接 EventBridge 目前支持的大量下游生态。 业务架构 抽象来看,EventBridge 访问自建 MQ 实例的链路和阿里云 5.0 版本基本一致,都是从用户 vpc 发起对 MQ 实例的访问。区别在于接入点的不同,前者是用户自建 MQ 集群的nameserver,而后者为阿里云 RocketMQ 提供的接入点,不需要感知真实的 MQ 集群是部署在用户 vpc 还是阿里云 RocketMQ 自身的生产环境。 API 使用 在 API 使用方面,自建集群的大部分参数需要用户手动填入。 接入点:nameserver 地址。后续会支持 proxy 地址; Topic:RocketMQ Topic。选择此 topic 作为事件源的读取对象或者事件目标的写入对象; Tag:RocketMQ 消费 Tag,用于消费者过滤消息使用; Group ID:RocketMQ 消费组,标识一组特定的消费者,仅事件源有此参数; FilterType:过滤模式,目前支持 Tag 过滤; 认证模式:如果开启 ACL 鉴权,可在此配置鉴权信息; 消费位点:初始消费位点; VPC:自建 MQ 集群对应的 VPC 参数信息; 交换机:自建 MQ 集群对应的交换机信息; 安全组:EventBridge使用此安全组访问用户自建 MQ 集群,安全组规定了 EventBridge 在此 vpc 内的访问策略。 RocketMQ 消息路由 当用户有灾备或者消息同步的需求时,可能就会需要消息路由能力,即将 A region 下某实例 topic 的消息同步到 B region 的某 topic 中。 对于 EventBridge 而言,消息路由并非单独的一个产品能力,用户通过使用事件流即可实现消息路由。 针对非跨境场景的消息路由,如从北京同步消息到上海,跨 region 网络打通能力由 EventBridge 来实现,用户无需关注过多实现细节。 针对跨境场景,如北京同步消息到新加坡,EventBridge 使用的是公网链路完成对目标实例的写入,使用的是目标 MQ 实例的公网接入点。消息出公网的能力需要用户提供,即需要用户提供 VPC、交换机和安全组配置,此VPC须带有NAT等访问公网能力, EventBridge 使用此 VPC 实现写入目标端公网接入点。 在 API 使用方面,创建消息路由任务本质上是创建事件流,API 参数和上面各类型 RocketMQ 实例任务一致,这里以创建一个青岛到呼和浩特的 RocketMQ 消息路由为例。 1.进入 EventBridge 控制台,regionBar 选择到呼和浩特,点击左侧“事件流”,然后选择“创建事件流”。 2.在事件源页面,事件提供方选择“消息队列 RocketMQ 版”,地域选择青岛,剩余 RocketMQ 相关参数按需求选择。 3.规则页面按需填写,这里选择默认内容。 4.在“目标”页面,服务类型选择“消息队列 RocketMQ 版”,剩余参数按需填写。 5.点击“创建”,等待事件流任务启动即可。 总结 本文介绍了 EventBridge 对接各类型 RocketMQ 实例的基本原理与对应的 API 使用说明,便于已经使用了 RocketMQ 5.0 版本和自建 MQ 实例的用户可以借助 EventBridge 的能力实现事件驱动业务架构的搭建。同时针对灾备和业务消息同步的场景,本文也基于事件流讲解了如何基于 EventBridge 创建 RocketMQ 消息路由任务。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#技术探索 #生态集成

2023年1月6日

基于 EventBridge API Destination 构建 SaaS 集成实践方案
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 目前 HTTP 的不足有以下几点: HTTP 的能力较弱,比如:授权方式单一、只支持 Body 传参、网络互通能力未对齐。只能满足客户最简单的场景。 用户无法基于 API 来统一管理(修改/下线)Target,用户体验交叉口; 对于基于 HTTP 实现的 SaaS API,无法简单快捷的引入到 EB 中,作为 Target 给用户使用。 本次新增集成中心(Integration Center)是负责 EventBridge 与外界系统对接的模块,通过抽象与配置快速获取第三方事件并将事件集成到第三方系统。并且优化现有 HTTP Sink 集成方案,为用户下游集成创造更多适配场景。 集成中心重点服务对象包括但不限于 SaaS 系统,对标 IPaaS 平台的能力提供完整的全面的通用系统集成方案。 集成源(Integration Source):指集成到 EventBridge 的第三方源; API 端点(API Destination ):指被集成到 EventBridge 的第三方 API 端点; 连接配置(Connection):是 API 端点模块的子集,与API 端点的平级资源,主要负责记录连接及配置信息,连接配置可被任意 API 端点复用。 针对市场上其他云厂商服务,EventBridge 发布了 API 端点 Sink 能力,主要作用在于承接 EventBridge 下游端数据,帮助用户快速完成下游数据集成。提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 API 端点 Sink 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 DB 产品。 具体而言,API 端点 Sink 事件目标是 EventBridge 支持的事件目标的一种,是通过 EventBridge 将数据投递至指定 Web Server 中。 API 端点 Sink 基本使用 首先现阶段 API 端点的 Sink 支持三种鉴权方式: 同时网络支持公网和专有网络(后续支持)。 1、创建 Connection 添加连接配置基本信息,并配置鉴权。 链接配置支持三种鉴权方式 : Basic 鉴权方式 : OAuth 2.0 鉴权方式: 添加授权接入点、授权请求方式、Client ID、ClientSecret 和授权相关的 Http 请求参数。 API Key 鉴权方式: 2、创建 ApiDestination API 端点配置 :配置需要访问 API 的 URL 地址和 HTTP 调用类型。 添加请求地址和请求方式: 在创建 API 端点时可以直接创建连接配置也可以选择已有的连接配置,例如上面已经创建成功的连接配置。 3、创建 Rule 创建事件规则,用于将事件投递到具体的 API 端点中。 步骤一 :点击事件规则并创建事件规则 步骤二 :是选择事件源,可以选择阿里云官方的或者选择自定义事件源,这里选择的是自定义事件源 步骤三 :第三步是选择 API 端点事件目标 支持自定义创建和使用已有,同时可以添加请求 HTTP 参数。 使用已有 使用选择已有的以后只需要添加请求 HTTP 参数即可: 选择已有的 API 端点来自于集成中心下面的 API 端点: 最佳实践 常见场景案例,比如: 用户可以把 RocketMQ 或者 RabbitMQ 的消息产品的消息动态投递到不同的 Web Server 中,这样可以让不同的 web 平台处理消息数据,实现了跨平台或者跨语言的消息流通。 用户可以把日志服务 SLS 数据投递到指定的 Web Server 或者 ELK 中,方便业务部门或者大数据平台对日志数据处理,可以更好的完善用户画像和用户行为分析,方便给用户打标签,从而可以进一步完善大数据个性化用户推荐系统。 例如下面是访问的国内外 SaaS 生态: 典型场景 :与 Buildkite 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 Buildkite 的持续集成和持续交付(CI / CD)平台进行集成。 集成产品背景描述 :Buildkite 是大型持续集成和持续交付(CI / CD)平台会有各种管理的变更、构建和作业等任务,运维人员需要快速感知、处理这些变更,以便决赛风险。 用户痛点 :构建的事件收集困难,需要手动触发构建和手动创建管道。 方案优势 :EventBridge 支持集成 Buildkite 的持续集成和持续交付平台,用户只需要简单配置即可创建和处理平台的事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建管道、创建构建和重试作业等。 文档地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与 Freshdesk 集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与 CRM(Freshdesk)进行集成。 集成产品背景描述 :不同的平台都需要对接 CRM(Freshdesk)管理系统。 用户痛点 :不同的平台的事件收集困难,需要用户自定义实现。 方案优势 :EventBridge 支持集成 CRM(Freshdesk)平台,用户只需要简单配置即可实现动态的创建会话、创建联系人和创建技能等事件。 举例介绍:可以通过 API 文档中提供的接口实现动态的创建会话、创建联系人和创建技能等。 文档地址 : 创建 API 端点 创建事件规则 发布事件,发布完成以后可以到事件轨迹查询详情 典型场景 :与有成财务集成 场景介绍 :利用 EventBridge 丰富的云产品事件源和目标集成能力,快速与有成财务进行集成 集成产品背景描述 :不同的 HR 系统或者 OA 系统需要对接有成财务时 用户痛点 :不同的系统的事件收集困难,需要用户自定义实现 方案优势 :EventBridge 支持集成有成财务平台,用户只需要简单配置即可实现动态生成报销科目和财务凭证等事件 举例介绍:比如用户想把 mns 的消息或者其他消息产品,同步到钉钉产品等接口中,或者也可以利用消息生成报销单据,可以生成报销科目和财务凭证等 地址 : 创建 API 端点 创建规则 发布事件,发布完成以后可以到事件轨迹查询详情。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:赵海
#行业实践 #生态集成

2022年10月24日

EventBridge 生态实践:融合 SLS 构建一体化日志服务
引言 阿里云日志服务 SLS 是一款优秀的日志服务产品,提供一站式地数据采集、加工、查询与分析、可视化、告警、消费与投递等服务。对于使用 SLS 的用户业务而言,SLS 上存储的日志信息反映着业务的运行状态,通过适当地流转加工即可创建一定价值。 另一方面,阿里云 EventBridge 作为云上事件枢纽,每天承载着大量事件的流转。云上资源的操作事件、消息队列中的数据、用户业务中的自定义事件等,是否有一站式的配置工具来将这些数据统一收敛到 SLS,进而使用 SLS 强大的加工、分析能力也是一个具有价值的问题。 为了支持上述日志、数据流入流出 SLS 的场景,阿里云 EventBridge 在近期支持了 SLS 能力。用户在 EventBridge 上通过简单地配置,即可实现数据写入 SLS 和将 SLS 中日志路由到不同的 EventBridge 目标端。EventBridge 对 SLS 的支持是全面的,用户既可以在事件总线中使用 SLS,也可以在事件流中使用。本文将从 SLS 在 EventBridge上 的使用以及若干最佳实践场景等方面,为大家介绍如何基于 EventBridge 构建 SLS 相关应用。 基于 EventBridge 使用 SLS 阿里云 SLS 日志服务 SLS[1] 是一款云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。 SLS 在 EventBridge 上的应用 阿里云 EventBridge 提供了事件总线[2]与事件流[3]两款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件经事件规则路由到多个目标。而事件流则更轻量化,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便,适用于端到端的流式数据处理场景。SLS 目前对事件总线与事件流均已支持。 针对 SLS 事件源,EventBridge 会构造一个 SLS source connector,其会实时地从 SLS 服务端拉取日志。数据拉取到 EventBridge 后,会进行一定的结构封装,保留用户日志、SLS 系统参数等数据,同时增加 event 所需要的一些系统属性。 SLS Event 样例可参考如下示例。 data 部分代表用户日志内容,其中以“__”开头和结尾的字段表示日志项的 SLS 系统属性。 { "datacontenttype": "application/json;charset=utf8", "aliyunaccountid": "1756789", "data": { "key1": "value1", "key2": "value2", "__topic__": "TopicCategory", "__source__": "SourceCategory", "__client_ip__": "122.231..", "__receive_time__": "1663487595", "__pack_id__": "59b662b2257796280" }, "subject": "acs:log:cnqingdao:1756789:project/demoproject/logstore/logstore1", "aliyunoriginalaccountid": "1756789", "source": "testSLS", "type": "sls:connector", "aliyunpublishtime": "20220918T07:53:15.387Z", "specversion": "1.0", "aliyuneventbusname": "demoBus", "id": "demoprojectlogstore11MTY2MzExODM5ODY4NjAxOTQyMw==0", "time": "20220918T07:53:12Z", "aliyunregionid": "cnqingdao", "aliyunpublishaddr": "10.50.132.112" } 针对 SLS 事件目标,EventBridge 使用 logProducer 将 event 整体作为一个字段投递到 SLS,字段 key 名称为“content”。 使用介绍 SLS 事件源   在使用 SLS 作为事件源时(这里包含了事件总线中的事件源和事件流中的事件源),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) 起始消费位点 调用角色  在创建 SLS 事件源时,EventBridge 会自动在对应 LogStore 下创建一个以“eventbridge”开头的消费组,事件源或事件流被删除时,对应消费组资源也会被清理。 日志项目与日志库参数,用户根据已创建的 Project 和 LogStore 去填写即可。 起始消费位点参数指定了新任务启动时的初始消费位点。这里可以选择“最早位点”、“最新位点”与“指定时间”。“最早位点”即从当前 LogStore 中最早的日志开始消费,会导致大量历史日志被读取,建议结合业务谨慎选择;“最新位点”则表示消费对应 EventBridge 任务启动后的日志;“指定时间”需要用户填写时间戳(以秒为单位),消费从此时刻开始的日志。 针对调用角色,其实是允许 EventBridge 以这个角色的身份去调用读取用户 SLS 日志。用户需要创建一个自定义角色,并将其授信给事件总线 EventBridge。角色的权限方面则可以按照需要去进行设置,在权限最小的原则基础上,权限策略提供的角色应保证事件总线 EventBridge 可以读取对应 LogStore 日志与消费组的增删操作,至少赋予角色 LogStore 消费权限与消费组的增删操作。参考示例: { "Version": "1", "Statement": [ { "Action": [ "log:ListShards", "log:GetCursorOrData", "log:GetConsumerGroupCheckPoint", "log:UpdateConsumerGroup", "log:ConsumerGroupHeartBeat", "log:ConsumerGroupUpdateCheckPoint", "log:ListConsumerGroup", "log:CreateConsumerGroup", "log:DeleteConsumerGroup" ], "Resource": [ "acs:log:::project//logstore/", "acs:log:::project//logstore//" ], "Effect": "Allow" } ] } SLS 事件目标   在使用 SLS 作为事件目标时(这里包含了事件总线中的事件目标和事件流中的事件目标),需要提供以下参数: 日志项目(SLS Project) 日志库(SLS LogStore) Topic 调用角色  日志项目、日志库参数含义同 SLS 事件源。Topic 即 SLS 日志主题,用户可以根据需要进行设置,非必填内容。 在创建 SLS 事件目标时,确保使用的调用角色有写入给定日志库权限即可。参考示例: { "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:::project//logstore/" ] } ] } 使用示例 SLS 事件源和事件目标,其事件总线与事件流的参数配置相同,这里示例了如何创建  SLS 事件源和事件目标的 EventBridge 事件流。 前期准备   1. 开通 EventBridge 服务; 2. 开通 SLS 服务并创建 Project 与 Store。 创建 SLS 事件源   1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“日志服务 SLS”; 4. 在“日志服务 SLS”一栏中选配置 SLS Project、LogStore、起始消费位点与角色配置。 创建 SLS 事件目标   1. 在创建事件流的事件目标时,服务类型选择“日志服务”; 2. 配置 SLS Project、LogStore、日志主题、日志内容、角色配置等参数。 3. 保存启动即可创建事件流。 最佳实践示例 异步架构完备性校验 在使用消息队列搭建异步应用架构时,会偶发遇到消息丢失的情况,这种情况下的问题排查通常较为麻烦,需要确定问题到底是出在发送端、消费端还是消息队列上,这种场景可以使用 SLS + EventBridge 来进行相关预警和现场保留。 1. 业务 1 发送消息到消息队列,业务 2 异步消费 MQ 中的消息,实现架构解耦; 2. 消息发送端和消费端,在完成消费发送、消费的相关操作后,均将操作日志打印出来,并采集到 SLS 上,日志中可以包含消息 ID 等字段以确保可溯源; 3. 配置 EventBridge 事件流,事件提供方为 SLS,事件接收方为函数计算 FC; 4. FC 中的服务读取 SLS 中日志内容,若发现针对某条消息,若仅有发送日志无消费日志,则说明可能存在漏消息的可能性,需要相关人员及时介入排查。 异常业务异步处理 部分消息队列如 RocketMQ 有死信队列能力,当用户消费失败达到一定次数时,消息会被投递到死信队列。用户也可以使用 SLS + EventBridge 构建业务死信队列,以完成对异常情况的处理。 例如下图是一个电商平台的订单处理系统,当订单处理成功时,相关信息会被写入 DB 或者进行后续操作。但如果订单处理异常用户又不想要阻塞现有订单处理流程,则可以将处理异常订单的流程异步处理。 1. 用户下单/付款,订单系统进行业务处理,处理成功则将数据变更写入 DB; 2. 订单处理异常,记录相关信息日志; 3. 搭建 EventBridge 事件规则。事件源为 SLS,事件目标为函数计算 FC; 4. 当有异常业务日志产生时,日志内容被 SLS 事件源拉取,随后投递到 FC,由专门的服务来处理异常订单。当然,在架构设计时也可以将异常订单信息直接投递到函数计算,但对于大部分业务系统而言,当有异常出现时通常都会进行相关日志的打印,即异常日志大概率是存在的,这个时候使用 SLS + EventBridge 则无需再使用函数计算的发送客户端,仅按需打印日志即可,对业务的侵入性更小。 消息备份 目前阿里云上的消息队列产品种类丰富,用户在使用消息队列实现业务解耦的同时,也会产生对消息内容进行加工分析的需求。SLS 拥有强大的数据加工能力,使用 EventBridge 将消息路由到 SLS,在实现消息备份的同时也可以利用 SLS 的分析加工能力来提升业务的可观测性。 1. 搭建 EventBridge 事件流。事件提供方为各种云上消息队列,事件目标方为日志服务 SLS; 2. 使用 SLS 的能力完成消息的加工、查询、分析与可视化。 自建 SQL 审计 目前 EventBridge 已经支持了 DTS 作为事件源的能力,使用 EventBridge 可以轻松实现构建自定义 SQL 审计的需求。 1. 用户新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 _相关链接_ _[1] 日志服务SLS_ _[2] 事件总线_ _[3] 事件流_ 感 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#行业实践 #生态集成

2022年7月17日

融合数据库生态:利用 EventBridge 构建 CDC 应用
引言 CDC(Change Data Capture)指的是监听上游数据变更,并将变更信息同步到下游业务以供进一步处理的一种应用场景。近年来事件驱动架构(EDA)热度逐步上升,日渐成为项目架构设计者的第一选择。EDA 天然契合 CDC 的底层基础架构,其将数据变更作为事件,各个服务通过监听自己感兴趣的事件来完成一些列业务驱动。阿里云 EventBridge 是阿里云推出的一款无服务器事件总线服务,能够帮助用户轻松快捷地搭建基于 EDA 架构的应用。近期,EventBridge 事件流已经支持了基于阿里云 DTS[1]服务的 CDC 能力。本文将从 CDC、CDC 在 EventBridge 上的应用以及若干最佳实践场景等方面,为大家介绍如何利用 EventBridge 轻松构建 CDC 应用。 CDC 概述 基本原理与应用场景 CDC 从源数据库捕获增量的数据以及数据模式变更,以高可靠、低延时的数据传输将这些变更有序地同步到目标数据库、数据湖或者其他数据分析服务。目前业界主流的开源 CDC 工具包括 Debezium[2]、Canal[3] 以及 Maxwell[4]。 图片来源: 目前业界主要有以下几类 CDC 的实现: 1. 基于时间戳或版本号 基于时间戳的方式要求数据库表有一个字段代表更新时间戳,当存在数据插入或更新时,对应时间戳字段就会随之更新。CDC 组件周期性检索更新时间大于上次同步时间的数据记录,即可捕获本周期内数据的变更。基于版本号跟踪和基于时间戳跟踪原理基本一致,要求开发者变更数据时必须更新数据的版本号。 2. 基于快照 基于快照的 CDC 实现在存储层面使用到了数据源 3 份副本,分别是原始数据、先前快照和当前快照。通过对比 2 次快照之间的差异来获取这之间的数据变更内容。 3. 基于触发器 基于触发器的 CDC 实现方式事实上是在源表上建立触发器将对数据的变更操作(INSERT、UPDATE、DELETE)记录存储下来。例如专门建立一张表记录用户的变更操作,随后创建 INSERT、UPDATE、DELETE 三种类型的触发器将用户变更同步到此表。 4. 基于日志 以上三种方式都对源数据库存在一定侵入性,而基于日志的方式则是一种非侵入性的 CDC 方式。数据库利用事务日志实现灾备,例如 MySQL 的 binlog 就记录了用户对数据库的所有变更操作。基于日志的 CDC 通过持续监听事务日志来实时获取数据库的变化情况。 CDC 的应用场景广泛,包括但不限于这些方面:异地机房数据库同步、异构数据库数据同步、微服务解耦、缓存更新与 CQRS 等。 基于阿里云的 CDC 解决方案:DTS 数据传输服务 DTS(Data Transmission Service)是阿里云提供的实时数据流服务,支持关系型数据库(RDBMS)、非关系型的数据库(NoSQL)、数据多维分析(OLAP)等数据源间的数据交互,集数据同步、迁移、订阅、集成、加工于一体。其中,DTS 数据订阅[5]功能可以帮助用户获取自建 MySQL、RDS MySQL、Oracle 等数据库的实时增量数据。 CDC 在EventBrige上的应用 阿里云 EventBridge 提供了事件总线[6]与事件流[7] 2 款不同应用场景的事件路由服务。 事件总线底层拥有事件的持久化能力,可以按照需要将事件路由到多个事件目标中。 事件流适用于端到端的流式数据处理场景,对源端产生的事件实时抽取、转换和分析并加载至目标端,无需创建事件总线,端到端转储效率更高,使用更轻便。 为了更好地支持用户在 CDC 场景下的需求,EventBridge 在事件流源端支持了阿里云 DTS 的数据订阅功能,用户仅需简单配置,即可将数据库变更信息同步到 EventBridge 事件流。 EventBridge 定制了基于 DTS sdk 的 DTS Source Connector。当用户配置事件提供方为 DTS 的事件流时,source connector 会实时地从 DTS 服务端拉取 DTS record 数据。数据拉取到本地后,会进行一定的结构封装,保留 id、operationType、topicPartition、beforeImage、afterImage 等数据,同时增加 streaming event 所需要的一些系统属性。 DTS Event 样例可参考 EventBridge 官方文档 EventBridge Streaming 保证了 DTS 事件的顺序性,但存在事件重复投递的可能性,EventId 在保证了和每条 DTS record 的一一映射关系,用户可依据此字段来对事件做幂等处理。 创建源为 DTS 的 EventBridge 事件流 下面展示如何在 EventBridge 控制台创建源为 DTS 的事件流 前期准备  1. 开通 EventBridge 服务; 2. 创建 DTS 数据订阅任务; 3. 创建用于消费订阅数据的消费组账号信息。 创建事件流  1. 登陆 EventBridge 控制台,点击左侧导航栏,选择“事件流”,在事件流列表页点击“创建事件流”; 2. “基本信息”中“事件流名称”与“描述”按照需要填写即可; 3. 在创建事件流,选择事件提供方时,下拉框选择“数据库 DTS”; 4. 在“数据订阅任务”一栏中选择已创建的 DTS 数据订阅任务。在消费组一栏,选择要使用哪个消费组消费订阅数据,同时填写消费组密码与初始消费时间。 5. 事件流规则与目标按照需要填写,保存启动即可创建以 DTS 数据订阅为事件源的事件流。 注意事项 使用时有以下几点需要注意: 1. EventBridge 使用的是 SUBSCRIBE 消费模式[8],所以请保证当前 DTS 消费组没有其他客户端实例在运行。如果设置的消费组在之前有运行,则传入的位点失效,会基于此消费组上次消费过的位点继续消费; 2. 创建 DTS 事件源时传入的位点仅在新消费组第一次运行时起效,后续任务重启后会基于上次消费位点继续消费; 3. EventBridge 事件流订阅 OperationType 为 INSERT、DELETE、UPDATE、DDL 类型的 DTS 数据; 4. 使用 DTS  事件源可能会有消息重复,即保证消息不丢,但无法保证仅投递一次,建议用户做好幂等处理; 5.用户如果需要保证顺序消费,则需要将异常容忍策略设置为“NONE”,即不容忍异常。在这种情况下,如果事件流目标端消费消息异常,整个事件流将暂停,直至恢复目标端正常。 最佳实践示例 基于EventBridge 实现 CQRS 在 CQRS(Command Query Responsibility Segregation)模型中,命令模型用于执行写以及更新操作,查询模型用于支持高效的读操作。读操作和写操作使用的数据模型存在一定区别,需要使用一定方式保证数据的同步,基于 EventBridge 事件流的 CDC 可以满足这样的需求。 基于云上服务,用户可以使用如下方式轻松构建基于 EventBridge 的 CQRS: 1. 命令模型操作数据库进行变更,查询模型读取 elasticsearch 获取数据; 2. 开启 DTS 数据订阅任务,捕获 DB 变更内容; 3.配置 EventBridge 事件流,事件提供方为 DTS 数据订阅任务,事件接收方为函数计算 FC; 4. FC 中的服务即为更新 elasticsearch 数据操作。 微服务解耦 CDC 也可以用于微服务解耦。例如下文是一个电商平台的订单处理系统,当有新建的未付款订单产生时,数据库会有一条 INSERT 操作,而当某笔订单状态由“未付款”变为“已付款”时,数据库会有一条 UPDATE 操作。根据订单状态变化的不同,后端会有不同的微服务来对此进行处理。 1. 用户下单/付款,订单系统进行业务处理,将数据变更写入 DB; 2. 新建 DTS 订阅任务捕获 DB 数据变更; 3. 搭建 EventBridge 事件流。事件提供方为 DTS 数据订阅任务,事件接收方为 RocketMQ; 4. 在消费 RocketMQ 数据时,同一个 topic 下启用 3 个 group 代表不同的业务消费逻辑; a. GroupA 将捕获到的 DB 变更用户缓存更新,便于用户查询订单状态; b. GroupB 下游关联财务系统,仅处理新建订单,即处理 DB 操作类型为 INSERT 的事件,丢弃其余类型事件; c. GroupC 仅关心订单状态由“未付款”变为“已付款”的事件,当有符合条件事件到达时,调用下游物流、仓储系统,对订单进行进一步处理。 如果采用接口调用方式,那么用户在下单之后订单系统将分别需要调用缓存更新接口、新建订单接口以及订单付款接口,业务耦合性过高。除此之外,这种模式使得数据消费端不用担心上游订单处理接口返回内容的语义信息,在存储模型不变的情况下,直接从数据层面判断此次数据变更是否需要处理以及需要怎样的处理。同时,消息队列天然的消息堆积能力也可以帮助用户在订单峰值到来时实现业务削峰填谷。 事实上,目前 EventBridge Streaming 支持的消息产品还包括 RabbitMQ、Kafka、MNS 等,在实际操作中用户可以根据自己的需要进行选择。 数据库备份&异构数据库同步 数据库灾备和异构数据库数据同步也是 CDC 重要的应用场景。使用阿里云 EventBridge 亦可以快速搭建此类应用。 1. 新建 DTS 数据订阅任务,捕获用户 MySQL 数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS 数据订阅任务; 3. 使用 EventBridge 在目的数据库执行指定 sql,实现数据库备份; 4. 数据变更事件投递到函数计算,用户业务根据数据变化内容更新对应异构数据库。 自建 SQL 审计 对于用户有自建 SQL 审计的需求,使用 EventBridge 也可以轻松实现。 1. 新建 DTS 数据订阅任务,捕获数据库变更; 2. 搭建 EventBridge 事件流,事件提供方为 DTS,事件接收方为日志服务 SLS; 3. 用户需要对 SQL 进行审计时,通过查询 SLS 进行。 总结 本文介绍了 CDC 的一些概念、CDC 在 EventBridge 上的应用以及若干最佳实践场景。随着支持产品的不断增加,EventBridge 所承载的生态版图也不断扩大,从消息生态到数据库生态,从日志生态到大数据生态,EventBridge 不断扩大其适用领域,巩固云上事件枢纽的地位,此后也将按照这个方向继续发展,技术做深,生态做广。 _参考链接:_ _ _ _ _ _ _ _ _ 感兴趣的小伙伴们可以扫描下方二维码加入钉钉群讨论(群号:44552972) 点击,进入 EventBridge 官网了解更多信息~ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#行业实践 #生态集成 #事件驱动架构

2022年6月26日

EventBridge 在 SaaS 企业集成领域的探索与实践
当下降本增效是各行各业的主题,而 SaaS 应用作为更快触达和服务业务场景的方式则被更多企业熟知和采用。随着国内 SaaS 商业环境的逐渐成熟,传统企业中各个部门的工程师和管理者,能迅速决定采购提升效率的 SaaS 产品,然后快速投入生产和使用。但是随着行业 SaaS 越来越多,如何解决各个 SaaS 系统的数据孤岛,如何将SaaS 应用数据与现有系统数据进行打通,已然变成了企业使用 SaaS 的瓶颈。因此,业内也广泛提出 B2B integration 企业集成的概念。 本文将结合实际业务场景讲述在 SaaS 行业的洞察与 SaaS 集成的探索实践。 什么是 SaaS SaaS 概述 SaaS(SoftwareasaService,软件即服务)源自于美国 Salesforce 公司(1999 年创立)创造的新软件服务模式。相比于传统软件,用户使用的 SaaS 软件,其数据保存在云端(国内有很多行业 SaaS 由于其数据敏感会单独部署在客户 IDC)。而且,SaaS 公司提供给客户的服务通常是按需租用的,比如按年缴纳使用费5年,第二年再续费,如果不满意也可以不续费,这会大大激发 SaaS 创业公司持续的打磨产品、持续的为客户提供更大价值的动力。 SaaS 典型分类 SaaS 分类比较繁琐,一般有两个分类维度。一个维度按照使用场景,另一个维度按照商业价值。 SaaS 一般分类为 通用 SaaS 和 行业 SaaS 两个基础类。 通用 SaaS 顾名思义是通用的,跨行业的,比如钉钉即时通讯或者某司的 HR 产品,由于使用场景更广,因而客群也会更多。 行业 SaaS 是在某个行业内使用的产品,比如餐饮企业 SaaS、电商 SaaS 等。 当然,还有第二个维度是工具 SaaS 和 商业 SaaS。 工具 SaaS,为客户企业提供一个提高管理效率的工具;商业 SaaS,除了提供一部分“工具”价值外,还能为客户企业提供增值价值,比如增加营收、获得资金等。 商业 SaaS 产品虽然风险更大,但在国内特色的商业环境、管理水平及人才结构下,更容易快速实现客户价值和自我价值。 SaaS 在中国的发展历程 对于 SaaS 领域来讲,云服务的普及提振了大家对 SaaS 服务稳定性和数据安全性的信心。同时,人口红利消退使得 SaaS 成本优势凸显。当下疫情环境也加快了市场教育,企业主转变思路,降本增效的需求显著上升。随着整个行业的渗透率加快,SaaS 场景和行业越做越深,SaaS 市场可以遇见在未来会有高速的增长。很多企业会在新业务场景使用 SaaS 服务,小步快跑试错,解决活下来的问题,而不是重复造轮子。 什么是 B2B ? B2B 即 BusinesstoBusiness (B2B) integration 是指将两个或者多个组织之间的业务流程和通讯自动化,通过自动化关键业务流程,实现不用应用和组织关系的打通,有效促进应用提供方和客户之间的数据打通与合作。 可以断言,随着 SaaS 行业逐渐渗透,企业集成的诉求会逐渐增多。数据同步、用户同步、接口同步的诉求会逐步增多,包括自建服务与 SaaS 服务的打通,SaaS 服务与 SaaS 服务的打通等。 SaaS 集成领域场景分析 随着行业类 SaaS 的逐渐丰富,在企业生产实践中,应用和应用的数据集成和互通变得至关重要。包括 API 集成,数据集成,流程集成等场景。 API 集成 通过 API 将 SaaS 应用的业务流程串联,现阶段大部分 SaaS 集成对接都是通过标准 API 协议实现的。源端采用 WebHook 机制推送到指定 HTTP 端点,目标端则采用类似 API 接口调用的方式,主动调取执行动作。 实现结构如下: 业内通用方案通几乎均为同步方案,通过 API + 中间网关对调用做解耦和映射。该方案的主要问题是调用追溯难;其次如出现上下游接口限制不一致问题,会导致下游调用大量失败。 数据集成 数据集成场景主要是企业自建系统和 SaaS 系统的打通。当企业使用的行业 SaaS 逐渐增多,数据一致性问题就会变的迫在眉睫。 大部分企业通常会面临云上数据导入/同步到 SaaS 应用的场景,而业内对 SaaS 应用的数据集成方案并没有类似 CDC 场景下的 Debezium 那么标准和通用。 企业在 SaaS 集成领域的痛点 接入成本高 对大量使用 SaaS 应用的企业来讲, SaaS 集成是必须做的基础建设。但是该部分基础建设通常会消耗大量人力,由于各个行业的 SaaS 百花齐放,通常很难使用一套架构满足全部集成场景。意味着通常情况下,企业使用每一款 SaaS 都会面临 SaaS 系统与自身系统集成的困难。 异构数据多 异构数据多是集成领域又一个比较典型的特点,异构数据通常有结构化数据,非机构化数据,半结构化数据。比如企业自建关系型数据库就是典型的结构化数据,但是要被其他 SaaS 系统集成通常是 Json 这种半结构数据入参。当然这部分内容可以通过定制代码搞定,但这个思路一定不是做消息枢纽的思路; 异构数据如何高效的统一处理其实是当前 SaaS 集成亟待解决的问题,也是最大的冲突点。 分发/路由困难 当很多集成需求同时涌现时,如何对已集成数据进行合理分发,会变成集成领域又一个难以解决的问题。每个细分场景甚至每个集成链路所需要的数据内容甚至数据类型都不一样。如果路由/分发无法完成,那么企业统一集成将无法实现。 集成追踪困难 当全部采用同步链路时,这里的集成状态追踪就会变成玄学,除非将链路接入 Tracing ,但是这部分又回产生高额的改造成本,同时多源 Tracing 的复杂相对于单链路会呈几何倍数的增加。 老系统迁移困难 老系统迁移主要是数据集成部分,如果将新老系统对接,并构建统一的应用网是当下企业构建 SaaS 建设的难点。企业迫切的需要一种能将"新"“老”应用联接起来的方式,打破企业应用发展的时间与空间界限,协同企业原有核心数据资产及创新应用,构建可平滑演进的企业IT架构。 EventBridge 一站式企业 SaaS 集成方案 针对业内 SaaS 系统集成的种种痛点,EventBridge 推出一站式企业 SaaS 集成方案。通过收敛 SaaS 集成痛点诉求,EventBridge 推出 API 集成方案和数据集成方案,打通应用与应用,云与应用的连接。 低代码集成平台 提供完全托管的集成平台服务,仅需在控制台进行简单配置即可在几分钟内完成应用集成。客户无需关心底层资源及实现细节即可打通云下到云上,SaaS 到 SaaS 的集成与连接,轻松完成异构数据接入。 金融级稳定性 满足不同客户企业级集成项目的要求,提供高可用性、灾难恢复、安全性、服务等级协议(SLA)和全方面的技术支持。 全方位的集成能力 支持各种集成场景,打通云上云下企业应用、物联网、设备及合作伙伴之间的信息孤岛。支持事件规则,事件路由等多种路由方式,实现跨云跨地域互通和信息共享。同时强大的链路追踪能力可以帮助企业快速排障。 开放的平台 拥抱 CloudEvents 社区,提供标准化的事件集成方案。提供丰富的开发者工具,拥有海量的生态伙伴及开发者,丰富开箱即用的连接器和应用组件可以帮助加速企业业务创新。 EventBridge 在 SaaS 领域的典型应用场景 SaaS 应用同步 应用同步是指在特定时间点将一组特定的事件从一个系统迁移到另一个系统的行为。事件同步模式允许开发人员创建数据自动迁移集成服务;业务人员和开发人员可以通过配置集成应用,自动化的将特定范围内的数据传递到下游应用;创建可重用的服务可以为开发和运营团队节省大量时间。 例如: 把销售机会数据从一个旧式 CRM (客户关系管理) 系统迁移到新的 CRM 实例; 把销售订单数据从一个 CRM 组织迁移到另一个组织; 从 ERP (企业资源计划) 同步产品主数据到 CRM 系统中。 事件广播 事件广播是在连续的、近实时或实时的基础上将事件从单个源系统移动到多个目标系统的行为。本质上,它是一对多的单向同步。通常,“单向同步”表示 1:1 关系。但是,广播模式也可以是 1:n(n 大于 1)的关系。 例如: 当一个销售机会在 CRM 中被标记为成功关单的时候,应在 ERP 中创建销售订单。 SaaS 应用通知 事件通知是指当 SaaS 应用发生某个类型的事件,可以通过钉钉,短信等通知方式告知用户。用户可及时获取到关键事件信息。 例如: 当一个销售机会在 CRM 中被标记为重要商机的时候,会及时通知给其他同事进行跟进并关注。 自建系统到云上迁移 EventBridge 支持云上数据库、云上消息队列、云产品事件对接 SaaS 系统,完善企业用户建设应用一张网的诉求,打破企业应用发展的时间和空间界限,协同企业原有核心资产与 SaaS 系统,构建可演进的企业 IT 架构。 例如: 当引入一个新的 SaaS 应用时,可通过 EventBridge 将数据库/大数据平台的核心资产(如人员信息等)同步至 SaaS 应用。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:肯梦
#行业实践 #生态集成

2022年5月17日

云钉一体:EventBridge 联合钉钉连接器打通云钉生态
背景 以事件集成阿里云,从 EventBridge 开始”是 EventBridge 这款云产品的愿景和使命。作为一款无服务器事件总线服务,EventBridge 从发布以来,以标准化的 CloudEvents 1.0 协议连接了大量云产品和云事件,用户可以通过 EventBridge 轻松访问云上事件,驱动云上生态。 截止目前为止,EventBridge 已集成 85+ 阿里云产品,提供了 941+ 事件类型,集成 50+ SaaS产品,通过事件规则可轻松驱动 10+ 阿里系一方云产品的计算力。 另一方面,钉钉生态空前繁荣,拥有 4000+ 家的生态伙伴,包括 ISV 生态伙伴、硬件生态伙伴、服务商、咨询生态和交付生态伙伴等。通过事件将钉钉生态与阿里云生态联通,是践行「云钉一体」战略的重要途径,EventBridge 作为阿里云标准化的事件枢纽,其重要性不言而喻。 今天,EventBridge 联合钉钉连接器,打通了钉钉生态和阿里云生态,钉钉的生态伙伴可以通过通道的能力驱动阿里云上海量的计算力。 关键技术 EventBridge 集成阿里云和钉钉生态的方案,核心能力由钉钉连接器和 EventBridge 的 HTTP 事件源能力提供。 钉钉连接器 钉钉连接平台通过可视化拖拽配置、一键订阅等零代码方式,简单高效的实现钉钉、企业内部系统、知名厂商系统(金蝶、用友、SAP 等)、钉钉第三方企业应用之间数据互通和集成。 近期,钉钉连接器在「连接流」中发布了「HTTP Webhook」的执行动作能力,支持将钉钉生态开放给外部生态,EventBridge 正是通过该能力将钉钉生态接入到阿里云生态。 EventBridge HTTP 事件源 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP 事件源能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 具体而言,HTTP 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP  事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP  事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 在安全方面,HTTP 事件源不需要用户进行复杂的签名鉴权,支持 3 种类型开箱即用的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 应用场景 钉钉连接器市场有数百款连接器,包含官方连接器和第三方生态连接器。 官方连接器,来源主要是钉钉官方的应用,比如视频会议、日程、通讯录、审批流、钉盘、宜搭等,企业和 SaaS 厂商可以充分利用这些官方应用的事件构建企业级的应用系统,也可以将钉钉的官方数据流与其他系统做深度集成。 第三方连接器,来源主要是钉钉的生态合作伙伴,比如金蝶、行翼云、集简云、用友、易快报、销帮帮等。SaaS 厂商可以通过开放连接器来开放数据,与其它应用互联互通。 如上图所示,借助钉钉连接器,可以将钉钉官方事件源和钉钉 SaaS 事件源连接到阿里云 EventBridge,从而能驱动云上的弹性资源。SaaS 厂商能够借助 EventBridge 连接的能力快速构建云原生的 SaaS 应用,借助云的弹性能力,采用云原生最新的技术栈,快速高效地开发 SaaS 应用,同时利用 EventBridge 获取钉钉和其它 SaaS 应用的数据源,轻松进行业务创新。 当钉钉生态和 EventBridge 联通后,能产生哪些应用场景呢? 分析场景:企业借助 EventBridge 事件分析能力,对钉钉官方事件进行分析,快速洞察企业运转数据。比如审批效率,员工变更趋势、会议效率等。 通知场景:钉钉连接器 + EventBridge  可覆盖绝大多数消息通知场景,帮助企业用户快速感知 审批,员工变动,会议室信息等一些列企业基础支持系统。 集成场景:基于阿里云基础建设,可快速提升钉钉生态和企业内部数据的互通。例如当公司需要对钉钉和企业内部 IT 系统进行数据打通时,EventBridge 解决方案可以毫不费力地将建立在阿里云体系的 IT 系统连通起来,比如函数计算,云数据库,消息队列等连接扩展阿里云生态。 EDA 场景:使用 EventBridge 快速构建 EDA 驱动的自动化业务流程。例如在新员工入职时,获取员工变动信息。并集中推送到邮箱系统,业务支持系统(DB),CRM 系统等。对企业新员工权限账户进行一站式授权,较少重复机械的业务审批流程。 最佳实践:新增员工 0 代码入库 本章节介绍使用钉钉连接器和 EventBridge 的最佳实践,通过一个例子展示如何 0 代码将钉钉的一个新员工入职记录录入到自定义的数据库当中,企业可以根据该数据库搭建各类员工管理系统。 方案简介 整个方案涉及到钉钉、钉钉连接器、EventBridge、阿里云数据库等产品,整个链路如下图所示: 前置条件: 拥有一个钉钉账号,并创建一个团队成为管理员,并能登陆钉钉开放平台。 拥有一个阿里云账号,并开通 EventBridge 和阿里云数据库。 实践步骤 整个实践过程分为以下几个步骤。 1)创建事件总线和 HTTP 事件源 首先登陆 EventBridge 控制台,创建一个事件总线和 HTTP 事件源,如下图所示,可以先跳过规则和目标的创建。 创建完成后,进入事件总线的详情列表,获取 HTTP 事件源的公网「Webhook 地址」,如下图所示: 2)创建钉钉连接流 登陆钉钉开放平台,进入连接平台,在「我的连接」下创建连接流,在创建界面,选择触发器为「官方通讯录通讯录用户增加」。 连接流创建完成后,进入编辑页面,添加一个「HTTP Webhook」的节点,在「请求地址」一栏填入上个步骤获取到的「HTTP 事件源」地址。 3)钉钉触发新增员工事件 打开钉钉,进入团队,邀请另一个账号加入团队,然后进入事件总线的「事件追踪」页面,可以发现该员工新增事件已经投递到了事件总线之上。 该事件被转换成了一个「CloudEvents」格式,其「$.data.body」为事件的详情,包含 dingId, userId, department 等字段。 { "datacontenttype": "application/json", "aliyunaccountid": "1148", "data": { "headers": { }, "path": "/webhook/putEvents", "body": { "syncAction": "user_add_org", "orderInDepts": "{1:1762632512}", "dingId": "$::$5RUQhP/pK+4A==", "active": true, "avatar": "", "isAdmin": false, "userId": "141146379", "isHide": false, "isLeaderInDepts": "{1:false}", "isBoss": false, "isSenior": false, "name": "小明", "department": [ 1 ] }, "httpMethod": "POST", "queryString": {} }, "subject": "acs:eventbridge:cnhangzhou::eventbus//eventsource/my.dingtalk", "aliyunoriginalaccountid": "11848", "source": "my.dingtalk", "type": "eventbridge:Events:HTTPEvent", "aliyunpublishtime": "20220513T07:28:29.505Z", "specversion": "1.0", "aliyuneventbusname": "chenyangbus", "id": "7059131cb232c4c3592120ae", "time": "20220513T15:28:29.504+08:00", "aliyunregionid": "cnhangzhou", "aliyunpublishaddr": "..61.88" } 4)数据库创建员工表 通过 RDS 控制台购买一个实例,并创建好数据库,然后根据上述新增员工事件的格式,提取部分字段对数据库进行建表。 CREATE TABLE 'user_info' ( 'dingId' varchar(256) NULL, 'active' varchar(256) NULL, 'isAdmin' varchar(256) NULL, 'userId' varchar(256) NULL, 'name' varchar(256) NULL ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; 5)创建事件规则 数据库准备好后,返回 EventBridge 控制台,为第一步创建的事件总线创建事件规则,对「新增员工事件」进行转换并投递至数据库当中。 首先创建规则,过滤第一步创建的 HTTP 事件源。 然后选择 RDS 目标,做好参数映射。 6)触发事件入库 第三步触发事件时,因未配置规则和目标,事件没有被消费,故需要通过钉钉重新触发一次事件,然后从 EventBridge 控制台观察推送轨迹。 从轨迹中可以看出推送成功,然后通过 RDS 控制台可以查询到该条记录。 至此,一个钉钉团队新员工入职的记录通过 0 代码的方式入库到企业数据库当中,可以非常低的成本开发企业级管理应用。 _参考链接:_ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:尘央
#行业实践 #生态集成

2022年4月20日

EventBridge 集成云服务实践
EvenBridge 集成概述 EventBridge 是阿里云所推出了一款无服务器事件总线,其目标是拓展事件生态,打破系统间的数据孤岛,建立事件集成生态。提供统一的事件标准化接入及管理能力,完善集成与被集成通路,帮助客户快速实现事件驱动的核心原子功能,可将 EventBridge 快速集成至 BPM、RPA、CRM 等系统。 EventBridge 通过事件标准化,接入标准化,组件标准化三个方向作为支点拓展 EventBridge 事件生态: 事件标准化:拥抱 CloudEvents 1.0 开源社区标准协议,原生支持 CloudEvents 社区 SDK 和 API,全面拥抱开源社区事件标准生态; 接入标准化:提供标准事件推送协议 PutEvent,并支持 Pull 和 Push 两种事件接入模型,可有效降低事件接入难度,提供云上完善的事件接入标准化流程; 组件标准化:封装标准的事件下游组件工具链体系,包括 Schema 注册、事件分析、事件检索、事件仪表盘等。提供完善的事件工具链生态。 在集成领域 EventBridge 重点打造事件集成和数据集成两类核心场景,下面将围绕这两类场景具体展开描述。 事件集成 目前 EventBridge 已经拥有 80+ 云产品的事件源,800+ 种事件类型。整个事件生态还正在逐步丰富中。 那么,EventBridge 如何实现云产品的事件集成呢? 首先在 EventBridge 控制台可以看见一个名为 default 的事件总线,云产品的事件都会投递到这个总线; 然后点击创建规则,就可以选择所关心的云产品以及它的相关事件进行事件的监听和投递。 下面以两个例子为例,来看下 EventBridge 事件集成的方式。 OSS 事件集成 以 OSS 事件源为例,来讲解一下如何集成 OSS 事件。 OSS 事件现在主要分为 4 类,操作审计相关、云监控相关、配置审计相关、以及云产品相关的事件例如 PutObject 上传文件等等。其他的云产品的事件源也类似,基本都可以分为这几个类型的事件。 下面演示一下事件驱动的在线文件解压服务: 在 OSS Bucket 下面会有一个  zip 文件夹存放需要解压的文件,一个 unzip 文件夹存放解压后的文件; 当上传文件到 OSS Bucket 之后,会触发文件上传的事件并投递到 EventBridge 的云服务专用总线; 然后会使用一个事件规则过滤 zip 这个 bucket 的事件并投递到解压服务的 HTTP Endpoint; 解压服务会在收到事件之后,根据事件里面的文件路径从 OSS 下载文件解压,并在解压之后将文件传到 unzip 目录下; 同时,还会有一个事件规则,监听 unzip 目录的文件上传事件,并将事件转换后推送到钉钉群。 一起来看下是如何实现的: 前往下方链接查看视频: 1)首先创建一个 bucket,下面有一个 zip 目录用于存放上传的压缩文件,一个 unzip 目录用于存放解压后的文件。 2) 部署解压服务,并且暴露公网访问的地址。 解压服务的源码地址为: 也可以使用 ASK 直接部署,yaml 文件地址为: 3)创建一个事件规则监听 zip 目录下的上传文件的事件,并投递到解压服务的 HTTP  Endpoint。 这里使用 subject,匹配 zip 目录。 4)再创建一个事件规则监听 unzip 目录的事件,投递解压事件到钉钉群。 这里同样使用 subject,匹配 unzip 目录。 对于变量和模板的配置可以参考官方文档 : 。 EventBridge 会通过 JSONPath 的方式从事件中提取参数,然后把这些值放到变量中,最后通过模板的定义渲染出最终的输出投递到事件目标。OSS 事件源的事件格式也可以参考官方文档 : _ _,并根据实际的业务需要使用 JSONPath 定义变量。5)最后,通过 oss 控制台上传一个文件进行验证。 可以看到刚刚上传的 eventbridge.zip 已经解压到并上传上来了,也可以在钉钉群里面,收到解压完成的通知。此外,还可以在事件追踪这边查看事件的内容已经投递的轨迹。 可以看到有两个上传事件:一个是通过控制台上传的事件,一个是解压文件后上传的事件。 可以查看轨迹,都成功投递到了解压服务的 HTTP Endpoint 以及钉钉机器人。 以自定义事件源以及云产品事件目标的方式集成云产品 刚才演示的 demo 是集成云服务的事件源,下面再通过一个 demo 看一下如何通过以自定义事件源以及云产品事件目标的方式集成云产品。 前往下方链接查看视频: 这个 demo 的最终效果是通过 EventBridge 自动进行数据的清洗,并投递到 RDS 中去。事件内容是一个 JSON,拥有两个字段一个名字一个年龄,现在希望将把大于 10 岁的用户过滤出来并存储到 RDS 中。 整体的架构如图所示,使用一个 MNS Queue 作为自定义事件源,并通过 EventBridge 过滤并转换事件最终直接输出到 RDS 中去。 1)首先已经创建好了一个 MNS Queue,创建好一个 RDS 实例以及数据库表,表结构如下所示: 2)创建一个自定事件总线,选择事件提供方为 MNS,队列为提前创建好的队列; 创建好了之后,我们就可以在事件源这里看见一个已经正在运行中的事件源; 3)接下来创建规则投递到 RDS 配置的事件模式内容如下: { "source": [ "my.user" ], "data": { "messageBody": { "age": [ { "numeric": [ "", 10 ] } ] } } } 数值匹配可以参考官方文档:   4) 点击下一步,选择事件目标为数据库,填写数据库信息,配置转化规则,完成创建。 5)最后,先用 MNS Queue 发送一个消息,这个的 age 是大于 10 的。 可以看见这条事件就输出到了 RDS 里面了。 下面再发一个小于 10 的消息到 MNS Queue。 这条事件就被过滤掉了,没有输出到 RDS。 也可通过事件追踪查看事件: 可以看到一条事件成功投递到了 RDS,一条事件被过滤掉了,没有进行投递。 数据集成 事件流是 EventBridge 为数据集成提供的一个更为轻量化、实时的端到端的事件流试的通道,主要目标是将事件在两个端点之间进行数据同步,同时提供过滤和转换的功能。目前已经支持阿里云各消息产品之间的事件流转。 不同于事件总线模型,在事件流中,并不需要事件总线,其 1:1 的模型更加的轻量,直接到目标的方式也让事件更加的实时;通过事件流,我们可以实现不同系统之间的协议转换,数据同步,跨地域备份的能力。 下面将通过一个例子讲解如何使用事件流,将 RocketMQ 的消息路由到 MNS Queue,将两个产品集成起来。 整体的结构如图所示,通过EventBridge 将 RocketMQ 中 TAG 为 MNS 的消息路由到 MNQ Queue。 一起看下怎么实现: 前往下方链接查看视频: 首先创建一个事件流,选择源 RocketMQ 实例,填写 Tag 为 mns。 事件模式内容留空表示匹配所有。 目标选择 MNS,选择目标队列完成创建。 完成创建之后,点击启动,启动事件流任务。 事件流启动完成之后,我们就可以通过控制台或者 SDK 发送消息到源 RocketMQ Topic 里面。当有 Tag 为 mns 的时候,我们可以看见消息路由到了 mns;当有 Tag 不为 mns 的时候,消息就不会路由到 mns。 总结 本篇文章主要向大家分享了通过 EventBridge 如何集成云产品事件源,如何集成云产品事件目标以及通过事件流如何集成消息产品. 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:李凯(凯易)
#行业实践 #生态集成

2022年4月13日

基于 EventBridge 构建数据库应用集成
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持将阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,然后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件目标(Target)负责事件的处理终端与消费事件,是 EventBridge 的核心模块。针对市场上其他云厂商和垂直领域的 DB 服务,EventBridge 发布基于事件目标模块的数据库 Sink,提供简单且易于集成的 DB 落库能力,帮助开发者更加高效、便捷地实现业务上云。 数据库 Sink 概述 数据库 Sink 事件目标是 EventBridge 支持的事件目标的一种,主要能力是通过 EventBridge 将数据投递至指定数据库表中。 得益于 EventBridge 生态体系,数据库 Sink 支持众多接入方式: 阿里云云产品事件,EventBridge 支持云服务总线,通过简单配置即可直接对云服务相关事件进行入库操作; SaaS 应用事件,EventBridge 支持三方 SaaS 事件接入,支持对 SaaS 触发事件落库、查询; 用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端、HTTP Source 以及 CloudEvents 社区的开源客户端来完成接入。 数据库 Sink 能力重点聚焦在如何将 EventBridge 业务的半结构化 Json 数据转为结构化 SQL 语句,提供 LowCode 交互接入,帮助开发者一站式完成数据入库。 数据库 Sink 最佳实践 典型案例: 希望把一些 MNS 的消费消息或者 RocketMQ 的消费消息存储到指定的数据库表中,方便后面的数据分析和消息排查,也可以通过这种方式把数据新增到数据库表中; 通过 HTTP 的事件源把一些重要的日志或者是埋点数据直接存储到 DB 中,不需要经过用户业务系统,可以方便后续的客户场景分析。 使用介绍: 首先现阶段数据库 Sink For MySQL 支持两种方式:一种是基于阿里云的 RDS MySQL(VPC),另一种是用户自建的 MySQL(公网),可根据业务场景选择的不同方式接入。 步骤一 :点击事件规则并创建事件规则 步骤二 :选择事件源 可以选择阿里云官方或者自定义事件源 步骤三 :选择事件目标 1)在事件目标下面的服务类型选择数据库,这时会有两个选项就是一个是阿里云的 RDS MySQL,一个是自建 MySQL; 2)如果是阿里云 RDS MySQL,需要创建服务的关联角色。 3)授权以后就可以选择用户自己创建的 RDS MySQL 数据库的实例 ID 和数据库名称。 数据库账号和密码需手动填写,并发配置可以根据实际业务需要进行填写。因为 RDS MySQL 涉及到了跨地域访问,所以需要专有网络 VPC 的支持。 步骤四 :入库配置 入库配置支持快速配置与自定义 SQL 两种方式: 1)快速配置,支持 LowCode 方式快速选择入库内容。 2)自定义 SQL,支持自定义高级 SQL 语法。 步骤五:事件发布 当创建成功以后可以通过控制台进行事件发布: 步骤六 :事件状态追踪和查询 可以通过上个步骤中的事件 ID 可看到轨迹的详细信息,包括事件执行成功与否等信息。如果事件执行失败,会在页面展示异常信息。 通过事件追踪也可以看到详细的事件轨迹 : 总结 本文重点介绍 EventBridge 的新特性:数据库 Sink 事件目标。 作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据和消息产品业务数据整合到事件源生态中,提高了上云用户业务集成的便捷性,满足 Open API 与多语言 sdk 的支持,在此基础之上,通过 EventBridge 将数据投递至指定的数据库表中,为客户自身业务接入 EventBridge 提供了便利。 相关链接 [2] RDS 官方文档 [3] EventBridge 官方文档 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:赵海
#行业实践 #生态集成

2022年4月10日

EventBridge 特性介绍|以 IaC 的方式使用 EventBridge
引言 EventBridge 作为构建 EDA 架构的基础设施,通过一些核心概念和特性提供了灵活丰富的事件收集、处理和路由的能力。对于不少用户来说,通过控制台里的便捷的引导来使用 EventBridge 应该是最快的上手方式。此外,也有很多用户面临着大量的云产品的管理,使用控制台管理每一个资源的方式变成了沉重的手工操作负担。 为了解决这个问题,现在已经能够通过 OpenAPI、terraform 等方式将 EventBridge 的能力方便快捷的带给用户。本文将重点介绍 EventBridge 和 IaC 的重点概念和特性,然后演示如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。 EventBridge 概述 事件驱动架构 事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,紧接着路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件驱动架构体系架构具备以下三个能力: 事件收集:负责收集各种应用发生的事件,如新建订单,退换货订单等其他状态变更; 事件处理:对事件进行脱敏处理,并对事件进行初步的过滤和筛选; 事件路由:分析事件内容并将事件路由分发至下游产品。 事件驱动架构具有以下优势: 降低耦合:降低事件生产者和订阅者的耦合性。事件生产者只需关注事件的发生,无需关注事件如何处理以及被分发给哪些订阅者;任何一个环节出现故障,都不会影响其他业务正常运行; 异步执行:事件驱动架构适用于异步场景,即便是需求高峰期,收集各种来源的事件后保留在事件总线中,然后逐步分发传递事件,不会造成系统拥塞或资源过剩的情况; 可扩展性:事件驱动架构中路由和过滤能力支持划分服务,便于扩展和路由分发; 敏捷性:事件驱动架构支持与各种阿里云产品和应用集成,支持事件路由至任何系统服务,提供各种敏捷高效的部署方案。 使用 EventBridge 构建 EDA 架构 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务。EventBridge 提供的几个核心概念,可以满足构建 EDA 架构的需要。 事件总线 EventBridge 支持以下事件源: 阿里云官方事件源 自定义事件源 事件总线 EventBridge 的事件总线包括以下类型: 云服务专用事件总线:一个无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件;阿里云官方事件源的事件只能发布到云服务专用总线; 自定义事件总线:需要您自行创建并管理的事件总线,用于接收自定义应用或存量消息数据的事件;自定义应用或存量消息数据的事件只能发布到自定义总线。 在 EventBridge 中,一个事件规则包含以下内容: 事件模式:用于过滤事件并将事件路由到事件目标; 事件目标:包括事件的转换和处理,负责消费事件。 EventBridge 提供了简洁的事件模式匹配语法,同时具备灵活的事件转换能力,后面将会通过演示来展示一些具体的例子。 此外,EventBridge 还提供了一些增强能力,这些能力使得 EDA 架构中流经的事件更加透明,具备了开箱即用的观测和分析能力: 事件追踪:可以查看发布到事件总线 EventBridge 的事件内容和处理轨迹; 事件分析:对发布到事件总线的各种事件进行查询分析处理和可视化图表展示,以便发现事件内在价值。 IaC 简介 在介绍完事件总线 EventBridge 的相关基础内容后,接下来一起了解下 IaC。在 DevOps 的实践中,IaC 是非常重要的部分,通过将基础设施代码化,版本化,便可以轻松的借助版本控制工具来提供 single source of truth、协调多人合作的变更、实施严格的 review、借助一些 CI/CD pipeline 工具(甚至 GitOps)来自动触发部署。软件系统的开发者仅付出很小的努力去描述需求,就可以在几分钟后得到所需的虚拟机、网络等云上的服务,极大的缩短了部署时间,同时还能够保证多个环境的配置一致性,通过减少人为操作也降低了引入错误的概率。 IaC的代码实践中一般有两种方式,命令式和声明式。 命令式:顾名思义,需要明确发出每一个动作的指令,描述的是 How,比如“创建一台 xx 规格的 ECS”。代码需要对每一步动作的顺序仔细编排,处理各种可能的错误,尤其要注意处理好每次变更对已经存在的资源的影响,否则稍有不慎就可能造成服务中断。举例来说,作为开发者可以通过自己熟悉的编程语言调用阿里云的 OpenAPI 来管理资源,因为这些 API 是类似 Create、Describe、Delete 等操作,这就是一种命令式的 IaC 实践。 声明式:意味着开发者仅描述自己的需求终态是什么样子,即描述 What,比如“一台 xx 规格的 ECS”。熟悉 Kubernetes 的同学应该对这个概念很熟悉了。IaC 工具可以通过描述资源之间的依赖关系自动编排顺序,如果有已经存在的资源,则比对期望的状态和实际状态的差异,并根据差异做出更新;如果不存在,需要进行创建。可以看出,声明式对开发者非常友好,极大的降低了开发者的心智负担。 IaC 带来的优势: 降低成本:有效管理资源,并减少为此投入的人力; 提升效率:加快资源交付和软件部署的速度; 风险控制: 减少错误; 提高基础架构一致性; 消除配置偏移 terraform 作为 IaC 领域的佼佼者,提供了强大的自动化管理基础设施的能力。生态丰富,很多云厂商都提供了官方插件,阿里云的大多数产品(包括 EventBridge)都对 terraform 做了很全面的支持,使得跨多云部署基础设施变得极其简单。既然是 IaC,terraform 提供了自己的语言 HCL(hashicorp configuration language),HCL 具有类似 json 的简洁的语法,通过声明式的资源描述,可以让开发者快速上手。 动手实践 准备工作 安装 terraform cli 工具,可以参见 的内容。 创建一个 tf 文件 terraform.tf,内容如下(需要替换<内的值) provider "alicloud" { access_key = "" secret_key = "" region = "" } 案例1:通过钉钉监控云上资源变化 假设一个用户使用了很多云上的资源作为生产环境,需要感知线上资源的变更操作,一个可行的方案是利用 EventBridge 将来自于 ActionTrail 的审计事件投递到用户的钉钉。 首先根据钉钉官方文档创建一个机器人,记下 webhook url 和加签的秘钥,接下来会用到。 创建一个 tf 文件 1_actiontrail2dingding.tf,内容如下(需要替换<内的值) 案例1:通过钉钉监控云上资源变化 目标: 熟悉部署使用EventBridge的default总线 熟悉EventBridge的事件模式匹配 熟悉EventBridge的事件转换配置 声明一个default总线上的规则 resource "alicloud_event_bridge_rule" "audit_notify" { default总线默认存在,所以这里可以直接使用 event_bus_name = "default" rule_name = "audit_notify" description = "demo" 通过后缀匹配的方式过滤来自所有云产品事件源的ActionTrail:ApiCall事件 其他更多模式匹配的介绍可以查阅文档:https://help.aliyun.com/document_detail/181432.html filter_pattern = jsonencode( { "type" : [ { "suffix" : ":ActionTrail:ApiCall" } ] } ) targets { target_id = "testtarget" endpoint = "" type的取值可以查阅文档:https://registry.terraform.io/providers/aliyun/alicloud/latest/docs/resources/event_bridge_ruletype type = "acs.dingtalk" 每个事件目标都有一组对应的param_list,具体可以查阅文档:https://help.aliyun.com/document_detail/185887.html 每一个param的form关系到事件转换的配置,可以查阅文档:https://help.aliyun.com/document_detail/181429.html param_list { resource_key = "URL" form = "CONSTANT" value = "" } param_list { resource_key = "SecretKey" form = "CONSTANT" value = "" } 这里展示了TEMPLATE类型的事件转换描述 value是使用jsonpath引用事件内容的字典,template则是模板内容,EventBridge最终会根据这两者结合事件本身渲染出这个参数的值 param_list { resource_key = "Body" form = "TEMPLATE" value = jsonencode( { "source": "$.source", "type": "$.type" "region": "$.data.acsRegion", "accountId" : "$.data.userIdentity.accountId", "eventName" : "$.data.eventName", } ) template = jsonencode( { "msgtype" : "text", "text" : { "content": "来自 {source} 的 {type} 审计事件:{accountId} 在 {region} 执行了 {eventName} 操作" } } ) } } } 在命令行窗口依次执行命令: 初始化 terraform init 预览变更 terraform plan 应用变更 terraform apply 在云产品控制台进行操作,这里以 KMS 为例 钉钉上收到消息通知 在 EventBridge 控制台查看事件轨迹 案例 2:自定义总线触发 FunctionCompute 假设一个用户的应用会产生一些事件,其中一个链路是通过 FunctionCompute 对这些事件进行弹性的处理。那么就可以通过 EventBridge 的自定义事件源和函数计算事件目标来实现这个方案。 创建一个模拟对事件进行处理的 python 脚本文件 src/index.py,内容如下: coding: utf8 import logging def handler(event, context): logger = logging.getLogger() logger.info('evt: ' + str(event)) return str(event) 创建一个 tf 文件 2_trigger_function.tf,内容如下(需要替换<内的值) 案例2:自定义总线触发FunctionCompute 目标: 熟悉部署使用EventBridge的自定义总线 熟悉"自定义应用"事件源配置 熟悉“FunctionCompute”事件目标配置 由于用户自己产生的事件需要投递到自定义总线,这里声明一个叫demo_event_bus的自定义总线 resource "alicloud_event_bridge_event_bus" "demo_event_bus" { event_bus_name = "demo_event_bus" description = "demo" } 声明一个在demo_event_bus总线上的自定义事件源,用于通过sdk或者控制台向EventBridge投递事件 resource "alicloud_event_bridge_event_source" "demo_event_source" { event_bus_name = alicloud_event_bridge_event_bus.demo_event_bus.event_bus_name event_source_name = "demo_event_source" description = "demo" linked_external_source = false } 声明一个叫fc_service的函数计算服务,publish=true意味着会立即部署上传的函数代码。 resource "alicloud_fc_service" "fc_service" { name = "ebfcservice" description = "demo" publish = true } 将前面准备的python脚本文件打包成zip用于部署到函数计算 data "archive_file" "code" { type = "zip" source_file = "{path.module}/src/index.py" output_path = "{path.module}/code.zip" } 声明一个fc_service服务中的函数,其中filename引用了上面描述的zip包,会将这个代码包上传。 resource "alicloud_fc_function" "fc_function" { service = alicloud_fc_service.fc_service.name name = "ebfcfunction" description = "demo" filename = data.archive_file.code.output_path memory_size = "128" runtime = "python3" handler = "index.handler" } 声明一个在demo_event_bus总线上的规则 resource "alicloud_event_bridge_rule" "demo_rule" { event_bus_name = alicloud_event_bridge_event_bus.demo_event_bus.event_bus_name rule_name = "demo_rule" description = "demo" 通过匹配source过滤来自于前面创建的自定义事件源的事件 filter_pattern = jsonencode( { "source" : ["{alicloud_event_bridge_event_source.demo_event_source.id}"] } ) targets { target_id = "demofctarget" type的取值可以查阅文档:https://registry.terraform.io/providers/aliyun/alicloud/latest/docs/resources/event_bridge_ruletype type = "acs.fc.function" endpoint = "acs:fc:::services/{alicloud_fc_service.fc_service.name}.LATEST/functions/{alicloud_fc_function.fc_function.name}" param_list { resource_key = "serviceName" form = "CONSTANT" value = alicloud_fc_service.fc_service.name } param_list { resource_key = "functionName" form = "CONSTANT" value = alicloud_fc_function.fc_function.name } param_list { resource_key = "Qualifier" form = "CONSTANT" value = "LATEST" } 注意form=ORIGINAL意味着每次投递事件都会将事件的原始内容作为这个参数的值 param_list { resource_key = "Body" form = "ORIGINAL" } } } 在命令行窗口依次执行命令 初始化 terraform init 预览变更 terraform plan 应用变更 terraform apply 在控制台模拟自定义事件源发布事件 在 FunctionCompute 的控制台页面查看函数调用日志 在 EventBridge 控制台查看事件轨迹 _总结_ EventBridge 作为构建 EDA 架构的基础设施,通过一些核心概念和特性提供了灵活丰富的事件收集、处理和路由的能力,并支持通过 OpenAPI、terraform 等方式将这些能力方便快捷的带给用户。本文介绍了 EventBridge 和 IaC 的重点概念和特性,然后演示了如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。 期待大家可以发掘更多利用 EventBridge 快速搭建 EDA 架构的 idea,并使用 terraform 快捷的将这些 idea 变为现实。 相关链接 [1] 阿里云 terraform 文档 [2] terraform registry 文档 [3] 钉钉官方文档 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:王川(弗丁)
#技术探索 #生态集成