2022年5月15日

【科普】如果程序员穿越到古代当皇帝,会发生什么?
点击查看科普小视频~ 作为 Gartner 定义的十大战略技术趋势之一,事件驱动架构(EDA)逐渐成为主流技术架构。根据 Gartner 的预估,在新型数字化商业的解决方案中,将有 60% 使用 EDA,在商业组织参与的技术栈中,EDA 有一半的占比。 当下比较成功的企业已然认识到,要想最大限度提升运营效率和客户体验,务必要将业务和技术两方面的举措紧密结合起来。事件或业务形势的变化是时下众多企业关注的焦点,这些变化能够为企业领导者带来切实有用的信息,而架构设计的主旨恰恰是从客户联系人、交易、运营等方面的信息中获取洞见,两者相辅相成。传统技术历来对企业从事件中获取洞见的速度有着诸多限制,比如用于记录、收集和处理此类事件的批处理 ETL(提取、转换、加载)等。基于以上背景,阿里云 EventBridge 应运而生,通过事件的标准化和广泛的事件集成能力,帮助开发者轻松构建松耦合、分布式的事件驱动架构。同时阿里云 EventBridge 即将开源,敬请期待! 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
#技术探索

2022年4月20日

EventBridge 集成云服务实践
EvenBridge 集成概述 EventBridge 是阿里云所推出了一款无服务器事件总线,其目标是拓展事件生态,打破系统间的数据孤岛,建立事件集成生态。提供统一的事件标准化接入及管理能力,完善集成与被集成通路,帮助客户快速实现事件驱动的核心原子功能,可将 EventBridge 快速集成至 BPM、RPA、CRM 等系统。 EventBridge 通过事件标准化,接入标准化,组件标准化三个方向作为支点拓展 EventBridge 事件生态: 事件标准化:拥抱 CloudEvents 1.0 开源社区标准协议,原生支持 CloudEvents 社区 SDK 和 API,全面拥抱开源社区事件标准生态; 接入标准化:提供标准事件推送协议 PutEvent,并支持 Pull 和 Push 两种事件接入模型,可有效降低事件接入难度,提供云上完善的事件接入标准化流程; 组件标准化:封装标准的事件下游组件工具链体系,包括 Schema 注册、事件分析、事件检索、事件仪表盘等。提供完善的事件工具链生态。 在集成领域 EventBridge 重点打造事件集成和数据集成两类核心场景,下面将围绕这两类场景具体展开描述。 事件集成 目前 EventBridge 已经拥有 80+ 云产品的事件源,800+ 种事件类型。整个事件生态还正在逐步丰富中。 那么,EventBridge 如何实现云产品的事件集成呢? 首先在 EventBridge 控制台可以看见一个名为 default 的事件总线,云产品的事件都会投递到这个总线; 然后点击创建规则,就可以选择所关心的云产品以及它的相关事件进行事件的监听和投递。 下面以两个例子为例,来看下 EventBridge 事件集成的方式。 OSS 事件集成 以 OSS 事件源为例,来讲解一下如何集成 OSS 事件。 OSS 事件现在主要分为 4 类,操作审计相关、云监控相关、配置审计相关、以及云产品相关的事件例如 PutObject 上传文件等等。其他的云产品的事件源也类似,基本都可以分为这几个类型的事件。 下面演示一下事件驱动的在线文件解压服务: 在 OSS Bucket 下面会有一个  zip 文件夹存放需要解压的文件,一个 unzip 文件夹存放解压后的文件; 当上传文件到 OSS Bucket 之后,会触发文件上传的事件并投递到 EventBridge 的云服务专用总线; 然后会使用一个事件规则过滤 zip 这个 bucket 的事件并投递到解压服务的 HTTP Endpoint; 解压服务会在收到事件之后,根据事件里面的文件路径从 OSS 下载文件解压,并在解压之后将文件传到 unzip 目录下; 同时,还会有一个事件规则,监听 unzip 目录的文件上传事件,并将事件转换后推送到钉钉群。 一起来看下是如何实现的: 前往下方链接查看视频: 1)首先创建一个 bucket,下面有一个 zip 目录用于存放上传的压缩文件,一个 unzip 目录用于存放解压后的文件。 2) 部署解压服务,并且暴露公网访问的地址。 解压服务的源码地址为: 也可以使用 ASK 直接部署,yaml 文件地址为: 3)创建一个事件规则监听 zip 目录下的上传文件的事件,并投递到解压服务的 HTTP  Endpoint。 这里使用 subject,匹配 zip 目录。 4)再创建一个事件规则监听 unzip 目录的事件,投递解压事件到钉钉群。 这里同样使用 subject,匹配 unzip 目录。 对于变量和模板的配置可以参考官方文档 : 。 EventBridge 会通过 JSONPath 的方式从事件中提取参数,然后把这些值放到变量中,最后通过模板的定义渲染出最终的输出投递到事件目标。OSS 事件源的事件格式也可以参考官方文档 : _ _,并根据实际的业务需要使用 JSONPath 定义变量。5)最后,通过 oss 控制台上传一个文件进行验证。 可以看到刚刚上传的 eventbridge.zip 已经解压到并上传上来了,也可以在钉钉群里面,收到解压完成的通知。此外,还可以在事件追踪这边查看事件的内容已经投递的轨迹。 可以看到有两个上传事件:一个是通过控制台上传的事件,一个是解压文件后上传的事件。 可以查看轨迹,都成功投递到了解压服务的 HTTP Endpoint 以及钉钉机器人。 以自定义事件源以及云产品事件目标的方式集成云产品 刚才演示的 demo 是集成云服务的事件源,下面再通过一个 demo 看一下如何通过以自定义事件源以及云产品事件目标的方式集成云产品。 前往下方链接查看视频: 这个 demo 的最终效果是通过 EventBridge 自动进行数据的清洗,并投递到 RDS 中去。事件内容是一个 JSON,拥有两个字段一个名字一个年龄,现在希望将把大于 10 岁的用户过滤出来并存储到 RDS 中。 整体的架构如图所示,使用一个 MNS Queue 作为自定义事件源,并通过 EventBridge 过滤并转换事件最终直接输出到 RDS 中去。 1)首先已经创建好了一个 MNS Queue,创建好一个 RDS 实例以及数据库表,表结构如下所示: 2)创建一个自定事件总线,选择事件提供方为 MNS,队列为提前创建好的队列; 创建好了之后,我们就可以在事件源这里看见一个已经正在运行中的事件源; 3)接下来创建规则投递到 RDS 配置的事件模式内容如下: { "source": [ "my.user" ], "data": { "messageBody": { "age": [ { "numeric": [ "", 10 ] } ] } } } 数值匹配可以参考官方文档:   4) 点击下一步,选择事件目标为数据库,填写数据库信息,配置转化规则,完成创建。 5)最后,先用 MNS Queue 发送一个消息,这个的 age 是大于 10 的。 可以看见这条事件就输出到了 RDS 里面了。 下面再发一个小于 10 的消息到 MNS Queue。 这条事件就被过滤掉了,没有输出到 RDS。 也可通过事件追踪查看事件: 可以看到一条事件成功投递到了 RDS,一条事件被过滤掉了,没有进行投递。 数据集成 事件流是 EventBridge 为数据集成提供的一个更为轻量化、实时的端到端的事件流试的通道,主要目标是将事件在两个端点之间进行数据同步,同时提供过滤和转换的功能。目前已经支持阿里云各消息产品之间的事件流转。 不同于事件总线模型,在事件流中,并不需要事件总线,其 1:1 的模型更加的轻量,直接到目标的方式也让事件更加的实时;通过事件流,我们可以实现不同系统之间的协议转换,数据同步,跨地域备份的能力。 下面将通过一个例子讲解如何使用事件流,将 RocketMQ 的消息路由到 MNS Queue,将两个产品集成起来。 整体的结构如图所示,通过EventBridge 将 RocketMQ 中 TAG 为 MNS 的消息路由到 MNQ Queue。 一起看下怎么实现: 前往下方链接查看视频: 首先创建一个事件流,选择源 RocketMQ 实例,填写 Tag 为 mns。 事件模式内容留空表示匹配所有。 目标选择 MNS,选择目标队列完成创建。 完成创建之后,点击启动,启动事件流任务。 事件流启动完成之后,我们就可以通过控制台或者 SDK 发送消息到源 RocketMQ Topic 里面。当有 Tag 为 mns 的时候,我们可以看见消息路由到了 mns;当有 Tag 不为 mns 的时候,消息就不会路由到 mns。 总结 本篇文章主要向大家分享了通过 EventBridge 如何集成云产品事件源,如何集成云产品事件目标以及通过事件流如何集成消息产品. 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:李凯(凯易)
#行业实践 #生态集成

2022年4月13日

基于 EventBridge 构建数据库应用集成
引言 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持将阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,然后路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件目标(Target)负责事件的处理终端与消费事件,是 EventBridge 的核心模块。针对市场上其他云厂商和垂直领域的 DB 服务,EventBridge 发布基于事件目标模块的数据库 Sink,提供简单且易于集成的 DB 落库能力,帮助开发者更加高效、便捷地实现业务上云。 数据库 Sink 概述 数据库 Sink 事件目标是 EventBridge 支持的事件目标的一种,主要能力是通过 EventBridge 将数据投递至指定数据库表中。 得益于 EventBridge 生态体系,数据库 Sink 支持众多接入方式: 阿里云云产品事件,EventBridge 支持云服务总线,通过简单配置即可直接对云服务相关事件进行入库操作; SaaS 应用事件,EventBridge 支持三方 SaaS 事件接入,支持对 SaaS 触发事件落库、查询; 用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端、HTTP Source 以及 CloudEvents 社区的开源客户端来完成接入。 数据库 Sink 能力重点聚焦在如何将 EventBridge 业务的半结构化 Json 数据转为结构化 SQL 语句,提供 LowCode 交互接入,帮助开发者一站式完成数据入库。 数据库 Sink 最佳实践 典型案例: 希望把一些 MNS 的消费消息或者 RocketMQ 的消费消息存储到指定的数据库表中,方便后面的数据分析和消息排查,也可以通过这种方式把数据新增到数据库表中; 通过 HTTP 的事件源把一些重要的日志或者是埋点数据直接存储到 DB 中,不需要经过用户业务系统,可以方便后续的客户场景分析。 使用介绍: 首先现阶段数据库 Sink For MySQL 支持两种方式:一种是基于阿里云的 RDS MySQL(VPC),另一种是用户自建的 MySQL(公网),可根据业务场景选择的不同方式接入。 步骤一 :点击事件规则并创建事件规则 步骤二 :选择事件源 可以选择阿里云官方或者自定义事件源 步骤三 :选择事件目标 1)在事件目标下面的服务类型选择数据库,这时会有两个选项就是一个是阿里云的 RDS MySQL,一个是自建 MySQL; 2)如果是阿里云 RDS MySQL,需要创建服务的关联角色。 3)授权以后就可以选择用户自己创建的 RDS MySQL 数据库的实例 ID 和数据库名称。 数据库账号和密码需手动填写,并发配置可以根据实际业务需要进行填写。因为 RDS MySQL 涉及到了跨地域访问,所以需要专有网络 VPC 的支持。 步骤四 :入库配置 入库配置支持快速配置与自定义 SQL 两种方式: 1)快速配置,支持 LowCode 方式快速选择入库内容。 2)自定义 SQL,支持自定义高级 SQL 语法。 步骤五:事件发布 当创建成功以后可以通过控制台进行事件发布: 步骤六 :事件状态追踪和查询 可以通过上个步骤中的事件 ID 可看到轨迹的详细信息,包括事件执行成功与否等信息。如果事件执行失败,会在页面展示异常信息。 通过事件追踪也可以看到详细的事件轨迹 : 总结 本文重点介绍 EventBridge 的新特性:数据库 Sink 事件目标。 作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据和消息产品业务数据整合到事件源生态中,提高了上云用户业务集成的便捷性,满足 Open API 与多语言 sdk 的支持,在此基础之上,通过 EventBridge 将数据投递至指定的数据库表中,为客户自身业务接入 EventBridge 提供了便利。 相关链接 [2] RDS 官方文档 [3] EventBridge 官方文档 想要了解更多 EventBridge 相关信息,扫描下方二维码加入钉钉群~ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:赵海
#行业实践 #生态集成

2022年4月10日

EventBridge 特性介绍|以 IaC 的方式使用 EventBridge
引言 EventBridge 作为构建 EDA 架构的基础设施,通过一些核心概念和特性提供了灵活丰富的事件收集、处理和路由的能力。对于不少用户来说,通过控制台里的便捷的引导来使用 EventBridge 应该是最快的上手方式。此外,也有很多用户面临着大量的云产品的管理,使用控制台管理每一个资源的方式变成了沉重的手工操作负担。 为了解决这个问题,现在已经能够通过 OpenAPI、terraform 等方式将 EventBridge 的能力方便快捷的带给用户。本文将重点介绍 EventBridge 和 IaC 的重点概念和特性,然后演示如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。 EventBridge 概述 事件驱动架构 事件驱动架构是一种松耦合、分布式的驱动架构,收集到某应用产生的事件后实时对事件采取必要的处理,紧接着路由至下游系统,无需等待系统响应。使用事件总线 EventBridge 可以构建各种简单或复杂的事件驱动架构,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用等。 事件驱动架构体系架构具备以下三个能力: 事件收集:负责收集各种应用发生的事件,如新建订单,退换货订单等其他状态变更; 事件处理:对事件进行脱敏处理,并对事件进行初步的过滤和筛选; 事件路由:分析事件内容并将事件路由分发至下游产品。 事件驱动架构具有以下优势: 降低耦合:降低事件生产者和订阅者的耦合性。事件生产者只需关注事件的发生,无需关注事件如何处理以及被分发给哪些订阅者;任何一个环节出现故障,都不会影响其他业务正常运行; 异步执行:事件驱动架构适用于异步场景,即便是需求高峰期,收集各种来源的事件后保留在事件总线中,然后逐步分发传递事件,不会造成系统拥塞或资源过剩的情况; 可扩展性:事件驱动架构中路由和过滤能力支持划分服务,便于扩展和路由分发; 敏捷性:事件驱动架构支持与各种阿里云产品和应用集成,支持事件路由至任何系统服务,提供各种敏捷高效的部署方案。 使用 EventBridge 构建 EDA 架构 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务。EventBridge 提供的几个核心概念,可以满足构建 EDA 架构的需要。 事件总线 EventBridge 支持以下事件源: 阿里云官方事件源 自定义事件源 事件总线 EventBridge 的事件总线包括以下类型: 云服务专用事件总线:一个无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件;阿里云官方事件源的事件只能发布到云服务专用总线; 自定义事件总线:需要您自行创建并管理的事件总线,用于接收自定义应用或存量消息数据的事件;自定义应用或存量消息数据的事件只能发布到自定义总线。 在 EventBridge 中,一个事件规则包含以下内容: 事件模式:用于过滤事件并将事件路由到事件目标; 事件目标:包括事件的转换和处理,负责消费事件。 EventBridge 提供了简洁的事件模式匹配语法,同时具备灵活的事件转换能力,后面将会通过演示来展示一些具体的例子。 此外,EventBridge 还提供了一些增强能力,这些能力使得 EDA 架构中流经的事件更加透明,具备了开箱即用的观测和分析能力: 事件追踪:可以查看发布到事件总线 EventBridge 的事件内容和处理轨迹; 事件分析:对发布到事件总线的各种事件进行查询分析处理和可视化图表展示,以便发现事件内在价值。 IaC 简介 在介绍完事件总线 EventBridge 的相关基础内容后,接下来一起了解下 IaC。在 DevOps 的实践中,IaC 是非常重要的部分,通过将基础设施代码化,版本化,便可以轻松的借助版本控制工具来提供 single source of truth、协调多人合作的变更、实施严格的 review、借助一些 CI/CD pipeline 工具(甚至 GitOps)来自动触发部署。软件系统的开发者仅付出很小的努力去描述需求,就可以在几分钟后得到所需的虚拟机、网络等云上的服务,极大的缩短了部署时间,同时还能够保证多个环境的配置一致性,通过减少人为操作也降低了引入错误的概率。 IaC的代码实践中一般有两种方式,命令式和声明式。 命令式:顾名思义,需要明确发出每一个动作的指令,描述的是 How,比如“创建一台 xx 规格的 ECS”。代码需要对每一步动作的顺序仔细编排,处理各种可能的错误,尤其要注意处理好每次变更对已经存在的资源的影响,否则稍有不慎就可能造成服务中断。举例来说,作为开发者可以通过自己熟悉的编程语言调用阿里云的 OpenAPI 来管理资源,因为这些 API 是类似 Create、Describe、Delete 等操作,这就是一种命令式的 IaC 实践。 声明式:意味着开发者仅描述自己的需求终态是什么样子,即描述 What,比如“一台 xx 规格的 ECS”。熟悉 Kubernetes 的同学应该对这个概念很熟悉了。IaC 工具可以通过描述资源之间的依赖关系自动编排顺序,如果有已经存在的资源,则比对期望的状态和实际状态的差异,并根据差异做出更新;如果不存在,需要进行创建。可以看出,声明式对开发者非常友好,极大的降低了开发者的心智负担。 IaC 带来的优势: 降低成本:有效管理资源,并减少为此投入的人力; 提升效率:加快资源交付和软件部署的速度; 风险控制: 减少错误; 提高基础架构一致性; 消除配置偏移 terraform 作为 IaC 领域的佼佼者,提供了强大的自动化管理基础设施的能力。生态丰富,很多云厂商都提供了官方插件,阿里云的大多数产品(包括 EventBridge)都对 terraform 做了很全面的支持,使得跨多云部署基础设施变得极其简单。既然是 IaC,terraform 提供了自己的语言 HCL(hashicorp configuration language),HCL 具有类似 json 的简洁的语法,通过声明式的资源描述,可以让开发者快速上手。 动手实践 准备工作 安装 terraform cli 工具,可以参见 的内容。 创建一个 tf 文件 terraform.tf,内容如下(需要替换<内的值) provider "alicloud" { access_key = "" secret_key = "" region = "" } 案例1:通过钉钉监控云上资源变化 假设一个用户使用了很多云上的资源作为生产环境,需要感知线上资源的变更操作,一个可行的方案是利用 EventBridge 将来自于 ActionTrail 的审计事件投递到用户的钉钉。 首先根据钉钉官方文档创建一个机器人,记下 webhook url 和加签的秘钥,接下来会用到。 创建一个 tf 文件 1_actiontrail2dingding.tf,内容如下(需要替换<内的值) 案例1:通过钉钉监控云上资源变化 目标: 熟悉部署使用EventBridge的default总线 熟悉EventBridge的事件模式匹配 熟悉EventBridge的事件转换配置 声明一个default总线上的规则 resource "alicloud_event_bridge_rule" "audit_notify" { default总线默认存在,所以这里可以直接使用 event_bus_name = "default" rule_name = "audit_notify" description = "demo" 通过后缀匹配的方式过滤来自所有云产品事件源的ActionTrail:ApiCall事件 其他更多模式匹配的介绍可以查阅文档:https://help.aliyun.com/document_detail/181432.html filter_pattern = jsonencode( { "type" : [ { "suffix" : ":ActionTrail:ApiCall" } ] } ) targets { target_id = "testtarget" endpoint = "" type的取值可以查阅文档:https://registry.terraform.io/providers/aliyun/alicloud/latest/docs/resources/event_bridge_ruletype type = "acs.dingtalk" 每个事件目标都有一组对应的param_list,具体可以查阅文档:https://help.aliyun.com/document_detail/185887.html 每一个param的form关系到事件转换的配置,可以查阅文档:https://help.aliyun.com/document_detail/181429.html param_list { resource_key = "URL" form = "CONSTANT" value = "" } param_list { resource_key = "SecretKey" form = "CONSTANT" value = "" } 这里展示了TEMPLATE类型的事件转换描述 value是使用jsonpath引用事件内容的字典,template则是模板内容,EventBridge最终会根据这两者结合事件本身渲染出这个参数的值 param_list { resource_key = "Body" form = "TEMPLATE" value = jsonencode( { "source": "$.source", "type": "$.type" "region": "$.data.acsRegion", "accountId" : "$.data.userIdentity.accountId", "eventName" : "$.data.eventName", } ) template = jsonencode( { "msgtype" : "text", "text" : { "content": "来自 {source} 的 {type} 审计事件:{accountId} 在 {region} 执行了 {eventName} 操作" } } ) } } } 在命令行窗口依次执行命令: 初始化 terraform init 预览变更 terraform plan 应用变更 terraform apply 在云产品控制台进行操作,这里以 KMS 为例 钉钉上收到消息通知 在 EventBridge 控制台查看事件轨迹 案例 2:自定义总线触发 FunctionCompute 假设一个用户的应用会产生一些事件,其中一个链路是通过 FunctionCompute 对这些事件进行弹性的处理。那么就可以通过 EventBridge 的自定义事件源和函数计算事件目标来实现这个方案。 创建一个模拟对事件进行处理的 python 脚本文件 src/index.py,内容如下: coding: utf8 import logging def handler(event, context): logger = logging.getLogger() logger.info('evt: ' + str(event)) return str(event) 创建一个 tf 文件 2_trigger_function.tf,内容如下(需要替换<内的值) 案例2:自定义总线触发FunctionCompute 目标: 熟悉部署使用EventBridge的自定义总线 熟悉"自定义应用"事件源配置 熟悉“FunctionCompute”事件目标配置 由于用户自己产生的事件需要投递到自定义总线,这里声明一个叫demo_event_bus的自定义总线 resource "alicloud_event_bridge_event_bus" "demo_event_bus" { event_bus_name = "demo_event_bus" description = "demo" } 声明一个在demo_event_bus总线上的自定义事件源,用于通过sdk或者控制台向EventBridge投递事件 resource "alicloud_event_bridge_event_source" "demo_event_source" { event_bus_name = alicloud_event_bridge_event_bus.demo_event_bus.event_bus_name event_source_name = "demo_event_source" description = "demo" linked_external_source = false } 声明一个叫fc_service的函数计算服务,publish=true意味着会立即部署上传的函数代码。 resource "alicloud_fc_service" "fc_service" { name = "ebfcservice" description = "demo" publish = true } 将前面准备的python脚本文件打包成zip用于部署到函数计算 data "archive_file" "code" { type = "zip" source_file = "{path.module}/src/index.py" output_path = "{path.module}/code.zip" } 声明一个fc_service服务中的函数,其中filename引用了上面描述的zip包,会将这个代码包上传。 resource "alicloud_fc_function" "fc_function" { service = alicloud_fc_service.fc_service.name name = "ebfcfunction" description = "demo" filename = data.archive_file.code.output_path memory_size = "128" runtime = "python3" handler = "index.handler" } 声明一个在demo_event_bus总线上的规则 resource "alicloud_event_bridge_rule" "demo_rule" { event_bus_name = alicloud_event_bridge_event_bus.demo_event_bus.event_bus_name rule_name = "demo_rule" description = "demo" 通过匹配source过滤来自于前面创建的自定义事件源的事件 filter_pattern = jsonencode( { "source" : ["{alicloud_event_bridge_event_source.demo_event_source.id}"] } ) targets { target_id = "demofctarget" type的取值可以查阅文档:https://registry.terraform.io/providers/aliyun/alicloud/latest/docs/resources/event_bridge_ruletype type = "acs.fc.function" endpoint = "acs:fc:::services/{alicloud_fc_service.fc_service.name}.LATEST/functions/{alicloud_fc_function.fc_function.name}" param_list { resource_key = "serviceName" form = "CONSTANT" value = alicloud_fc_service.fc_service.name } param_list { resource_key = "functionName" form = "CONSTANT" value = alicloud_fc_function.fc_function.name } param_list { resource_key = "Qualifier" form = "CONSTANT" value = "LATEST" } 注意form=ORIGINAL意味着每次投递事件都会将事件的原始内容作为这个参数的值 param_list { resource_key = "Body" form = "ORIGINAL" } } } 在命令行窗口依次执行命令 初始化 terraform init 预览变更 terraform plan 应用变更 terraform apply 在控制台模拟自定义事件源发布事件 在 FunctionCompute 的控制台页面查看函数调用日志 在 EventBridge 控制台查看事件轨迹 _总结_ EventBridge 作为构建 EDA 架构的基础设施,通过一些核心概念和特性提供了灵活丰富的事件收集、处理和路由的能力,并支持通过 OpenAPI、terraform 等方式将这些能力方便快捷的带给用户。本文介绍了 EventBridge 和 IaC 的重点概念和特性,然后演示了如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。 期待大家可以发掘更多利用 EventBridge 快速搭建 EDA 架构的 idea,并使用 terraform 快捷的将这些 idea 变为现实。 相关链接 [1] 阿里云 terraform 文档 [2] terraform registry 文档 [3] 钉钉官方文档 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:王川(弗丁)
#技术探索 #生态集成

2022年4月6日

EventBridge 与 FC 一站式深度集成解析
前言:事件总线 EventBridge 产品和 FC (Serverless 函数计算) 产品全面深度集成,意味着函数计算和阿里云生态各产品及业务 SaaS 系统有了统一标准的接入方式;依托 EventBridge 统一标准的事件源接入能力,结合 Serverless 函数计算高效敏捷的开发特点,能够帮助客户基于丰富的事件,结合 EDA 架构快速构建云上业务系统。为了帮助大家更好的理解,今天的介绍主要分为三部分:为什么需要一站式深度集成、FC 和 EventBridge 产品集成功能演示及场景介绍、EventBridge 和函数计算深度集成下一阶段规划。 为什么需要一站式深度集成? 首先让我们一起来看看什么是 EventBridge,什么是函数计算? 什么是 EventBridge? 阿里云事件总线(EventBridge)是一种无服务器事件总线,支持将用户的应用程序、第三方软件即服务 (SaaS)数据和阿里云服务的数据通过事件的方式轻松的连接到一起,这里汇聚了来自云产品及 SaaS 服务的丰富事件; 从整个架构来看,EventBridge 通过事件总线,事件规则将事件源和事件目标进行连接。首先,让我们快速普及下 EventBridge 架构中涉及的几个核心概念: 事件:状态变化的记录; 事件源:事件的来源,事件的产生者,产生事件的系统和服务, 事件源生产事件并将其发布到事件总线; 事件总线:负责接收来自事件源的事件;EventBridge支持两种类型的事件总线: 云服务专用事件总线:无需创建且不可修改的内置事件总线,用于接收您的阿里云官方事件源的事件。 自定义事件总线:标准存储态总线,用于接收自定义应用或存量消息数据的事件,一般事件驱动可选该总线。 事件规则:用于过滤,转化事件,帮助更好的投递事件; 事件目标:事件的消费者,负责具体事件的处理。 通过上面的流程,完成了事件的产生,事件的投递,事件的处理整个过程。当然事件并不是一个新的概念,事件驱动架构也不是一个新的概念,事件在我们的系统中无处不在,事件驱动架构同样伴随着整个计算机的架构演进,不断地被讨论。对于 EventBridge,采用云原生事件标准 CloudEvents 来描述事件;带来事件的标准化,这样的标准化和事件标准的开放性带来一个最显著的优势:接入的标准化,无论是对于事件源还是事件目标。 什么是函数计算(FC)? 函数计算是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。 通过上面的描述,总结起来大家只需要记住几点: 简单易用:快速上线,极大提升业务研发效率; 无服务器运维:节省运维投入; 按需付费:沉稳应对突发流量场景; 事件驱动:云产品互通,快速联动。 为什么函数计算需要 EventBridge? 函数计算以其轻量,快捷,能够利用事件驱动的方式与其他云产品进行联动的特点, 成为很多客户利用事件驱动架构构建业务系统的首选,随着业务及客户需求的不断增加,客户对于函数计算和更多云产品及服务的连接需求变得越来越多,同时对于其他云产品的客户而言, 也希望能够利用Serverless函数计算的特点帮助处理一些系统任务和事件。 1)事件源多样性挑战 事件驱动作为函数计算产品核心竞争力,打通函数计算和其它云产品,以及用户自定义应用,SaaS 服务的连通成为函数计算生态集成的迫切需求,但系统集成,生态建设从来都不是一件容易的事情。函数计算系统在和 EventBridge 集成之前,已经和 OSS,SLS 等用户典型场景的云产品进行了集成,也和阿里云的其它大概十多款产品进行了集成,不同系统具有不同的事件格式,不同系统的注册通知机制也各不相同,以及上游不同系统的失败处理机制也各不相同;部分系统支持同步的调用方式,部分系统支持异步的调用方式,调用方式的差异主要取决于上游系统在接入函数计算的时候当时面临的产品业务场景,对于新的产品能力和业务场景的扩展支持,在当时并未有太多的考虑。随着和更多云产品的集成,集成的投入,集成的困难度和底层数据管理难度越来越大。面对多种事件源集成的客观困难,函数计算希望提高和其他云产品的集成效率。 2)授权复杂及安全隐患 除此之外, 函数计算希望提升用户体验,保证用户关心事件的处理;同时希望能够在面对大量的云产品时保证系统授权层面的复杂度。用户在使用事件触发的时候, 需要了解不同产品接入函数计算的权限要求, 对于客户使用函数计算带来了非常大的困难,为了加速产品接入,大量用户经常使用 FullAcees 权限,造成较大产品安全隐患。 3)通用能力难以沉淀 面对上游不同的事件源, 如何更好的投递事件、更好的消费事件?如何进行事件的错误处理?函数计算调用方式如何选择?以及函数计算后端错误 Backpressure 能力的反馈、重试策略和上游系统参数设置、触发器数量的限制等问题成为函数计算事件触发不得不面对的问题。为了更好的服务客户,提供可靠的消费处理能力,函数计算希望能够有一个统一的接入层,基于统一的接入层进行消费能力和流控能力的建设。通过沉淀在这样一个标准的层面,在保证调用灵活性的同时,提供可靠的服务质量。 为什么 EventBridge 同样需要函数计算? EventBridge 作为标准的事件中心,目的是希望能够帮助客户把这些事件利用起来,能够通过事件将产品的能力进行联动,为了达成这样的目的,势必需要帮助客户通过更便捷的路径来快速消费处理这些事件。EventBridge 和函数计算的深度集成正是为了这样的共同目标 —— 帮助客户快速的构建基于 EDA 架构的业务系统,促进业务获得成功。 FC 和 EventBridge 产品集成功能演示及场景介绍 EventBridge 具体支持的事件类型, 基本上包括了阿里云所有的官方产品。可以通过 EventBridge 官方主页查看目前支持的阿里云官方产品事件源类型 。 EventBridge 触发器及异步集成 点击下方链接跳转查看: 函数计算异步链路支持将处理结果直接投递到 MQ 和 EventBridge,用户可以利用 EventBridge 将相关的结果投递到 SAAS 服务; 点击下方链接跳转查看: 双向集成的变化 1. 函数计算支持 85+阿里云官方事件源; 2. 函数计算支持整个阿里云消息队列的事件触发,包括 RocketMQ, RabbitMQ,MNS 等; 1. EventBridge 和函数计算控制台数据互通,用户无需在函数计算控制台和事件总线控制台来回跳转; 2. 用户通过触发器详情,快速跳转,利用 EventBridge 事件追踪能力帮助用户快速排查问题; 官方事件源运维场景总结 基于官方事件源的事件驱动场景,大概可以总结抽象成四个场景。 场景一:单账号下某个云产品的运维需求。通常客户希望基于这样的一个事件,包括类似像云服务器事件 ECS,或者容器服务镜像事件,通过这样的事件监听做一些自动化诊断和运维操作。 场景二:实际是在场景一的基础上的一个扩展,针对多个云产品的事件,希望能够进一步分析,做一些故障处理。 场景三:我们观察到,大的一些企业,在使用云产品的时候,实际上是由多个账号去使用阿里云的产品。在多个账号,多个产品的情况下,希望能够对多个账号中的云资源使用情况有一个全局统一的视角进行实践分析,同时进行账号配额的一些调整。那这样的话就是可以利用到 EventBridge 跨账号事件投递的能力,然后再利用函数计算做一个统一处理。 场景四:这个场景实际上是一个账号跨域事件处理场景,EventBridge 目前并没有去提供这样一个跨域的能力,这种情况下,可以借助函数计算提供的 HTTP 函数能力,自动生成 HTTP Endpoint,通过 EventBridge 的 HTTP 事件源,完成事件的跨域消费。 自定义事件源场景总结 1)MNS 队列自定义事件源触发场景:客户在 OSS 中上传文件之后,根据文件上传事件对 ACK 进行扩容,目前通过 OSS 事件发送到 MNS 中,然后由 MNSQueue 消息通过 EventBridge 触发函数计算, 在函数计算中根据一定的逻辑进行 ECI 资源的创建;同时客户希望通过 MNS 进行通知服务;利用 EventBridge 订阅模式,通过事件规则的定义,让通知服务和函数计算共享同一个事件订阅规则,可以大大的简化用户的方案。 2)RabbitMQ 队列自定义事件源触发场景:鉴于 RabbitMQ 在稳定性和可靠性方面的表现,在 IOT 场景具有非常普遍的使用,客户通常会选择使用 RabbitMQ 来进行端设备数据采集和存储, 考虑到 IOT 相关的嵌入式设备性能使用环境,通常端设备采集的数据比较偏向底层裸数据,在实际业务层面,客户需要找到一种快速高效的途径对 RabbitMQ 中的数据进行加工,通过 EventBridge 提供的自定义事件总线,利用函数计算对 RabbitMQ 中的数据快速处理, 实现 ETL 目的。 EventBridge 和函数计算深度集成下一阶段规划 事件过滤高级 ETL 处理 将函数计算和 EventBridge 进行更紧密的集成,由函数计算提供一些高级的 ETL 能力,提升整个事件过滤转换的能力。 提供更丰富的事件目标 目前 EventBridge 整个下游的事件目标相对来说较少,我们希望能够通过函数计算和 EventBridge 的一个密切集成,利用函数计算敏捷的开发能力,分别通过大账号模式和用户自持的这样一个能力,构建一些更丰富的 EventBridge 下游事件目标,帮助丰富整个事件目标的生态。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:史明伟(世如)
#行业实践 #生态集成

2022年3月23日

消息驱动、事件驱动、流”基础概念解析
阿里云消息队列 RocketMQ 5.0 实现了全新升级,实现了从“消息”到“消息、事件、流”的大融合,基于此,MessageDriven、EventDriven、Streaming 这三个词是近期消息领域高频词,但由于概念过于新,很多同学其实是不太理解这里的异同。本文把三个概念重新整理下,梳理出比较明确的概念讲给大家。 背景 首先这三个概念具体翻译如下: MessageDriven:消息驱动的通信; Event Driven:事件驱动的通信; Streaming:流模式。 这三个模式都是类似异步通信的模式,发送消息的服务不会等待消费消息服务响应任何数据,做服务解耦是三个模式共同的特性; 只要是在服务通讯领域内,在选型时还要考虑如下特性: 排序:是否可以保证特定的顺序交付; 事务:生产者或消费者是否可以参与分布式事务; 持久化:数据如何被持久化,以及是否可以重放数据; 订阅过滤:是否拥有根据Tag或其他字段做订阅过滤的能力; At – least once(最少交付一次),Atmostonce(最多交付一次),Exactlyonce (精确交付)。 通用背景介绍完,依次来看看各个模型代表的是什么意思。 消息驱动 MessageDriven 在消息驱动通信中,一般链路就是消息生产者(Producer)向消息消费者(Consumer)发送消息。模型如下: 消息驱动模式下通常会用到中间件,比较常见的中间组件有 RocketMQ,Kafka,RabbitMQ 等。这些中间件的目的是缓存生产者投递的消息直到消费者准备接收这些消息,以此将两端系统解耦。 在消息驱动架构中,消息的格式是基于消费者的需求制定的;消息传递可以是一对一,多对多,一对多或多对一。 消息驱动通讯比较常见的一个例子是商品订单推送,上游组件负责生成订单,下游组件负责接收订单并处理。通过这样的通讯方式上游生成组件其实无需关心整个订单的生命周期,更专注于如何快速生成订单,使单个组件的性能得以提升。 消息驱动模式在服务之间提供了轻的耦合(这部分耦合指代 Producer/Consumer SDK),并可以对生产和消费服务根据诉求进行扩展。 事件驱动 EventDriven 首先要申明一个观点:事件驱动其实是对消息驱动方法的改进,它对消息体大小,消息格式做了较为严格的限制,这层基于消息的限制封装其实就称为事件(Event)。 在事件驱动模式中,生产者发布事件来表示系统变更,任何感兴趣且有权限接入的服务都可以订阅这些事件,并将这些事件作为触发器来启动某些逻辑/存储/任务。 事件驱动的模式可以是一对一,多对一,一对多或多对多。通常情况下一般是多个目标根据过滤条件执行不同的事件。 在事件驱动架构中,事件的格式是由生产者根据事件标准协议制定的;由于更规范限制和封装,事件的生产者完全不需要关心有哪些系统正在消费它生成的事件。 事件不是命令,事件不会告诉消费者如何处理信息,他们的作用只是告诉消费者此时此刻有个事件发生了;事件是一份不可变的数据,重要的数据,它与消息的数据价值相同;通常情况下当某个事件发生并执行时,往往伴随着另一个事件的产生。 事件驱动提供了服务间的最小耦合,并允许生产服务和消费服务根据需求进行扩展;事件驱动可以在不影响现有服务的情况下添加各类新增组件。 事件驱动也可以举一个非常贴切的例子,我们以“客户购买完一款商品”为一个事件,举证在事件场景的应用: CRM(客户关系系统)系统接收到客户购买信息,可自行更新客户的购买记录; EMR(库存管理系统) 系统接收到客户购买信息,动态调整库存并及时补货; 快递服务接收到客户购买信息,自行打单并通知快递公司派送。 这么看,事件驱动模式是不是可以应用并出现在任何地方! 在 EventBridge 产品化方向,也正是由于针对消息做了一些标准化封装,才有可能实现譬如针对事件本身的 filter(过滤) ,transform(转换),schema(事件结构),search(查询) 等能力。这些能力也拓展出更多针对事件驱动特有的场景功能及相关特性。 流 Streaming 流是一组有序的无界事件或数据,执行操作通常是固定的某个事件段(e.g. 00:00 – 12:00)或一个相对事件(E.g. 过去 12 小时)。 通常情况下单个事件往往就是使用事件本身,但是对于流可能的操作大概率是过滤,组合,拆分,映射等等。 流的操作可以是无状态也可以是有状态的: 对于单个事件操作是无状态的,包括过滤和映射; 依赖消息在流的时间或位置(e.g. offset,time)是有状态的。有状态操作中,流处理逻辑必须保留一些已被消费消息的内存。有状态包括对数据做 Batch Size,Batch Window 等。 流这里也可以举一个比较简单的例子,比如我们的物流系统在物品通过一个物流节点时会生成一个事件,但是要查到这个物品完整的流转状态事件,则必须是各个物流节点单个事件的聚合,那这个聚合事件就是流事件。 Kafka 是最典型的流式中间件,在流式场景中,事件的位置信息至关重要。通常情况下位置信息(E.g. offset)是由消费者托管的。 事件规范标准 聊完 Event 和 Streaming 是什么,再来补充一点有关于它们的规范。 事件规范存在的目的是为了清晰事件生产者和消费者的关系,目前主要有两部分:AsyncAPI 和 CloudEvents; AsyncAPI:基于事件 API 提供了与之对应的 Open API 和 Swagger 等;CloudEvents:侧重于处理事件的元数据。 下面也重点介绍一些关于 CloudEvents 的相关概念参考:CloudEvents 的核心其实是定义了一组关于不同组件间传输事件的元数据,以及这些元数据应该如何出现在消息体中。 其主旨大抵如下: 事件规范化; 降低平台集成难度; 提高 FaaS 的可移植性; 源事件可追踪; 提升事件关联性 准确的事件体,事件信息才可以做出更稳定的系统架构,永远保持对事件的敬畏。 附 一些术语及定义: Occurrence:发生,指事件逻辑上的发生,基于某种情况,事件出现了; Event:事件,表示事件以及上下文的数据记录。可以根据事件中的信息决定路由,但事件本身并不包含路由信息; Producer:生产者,真正创造事件的实例或组件; Source:源,事件发生的上下文,可以由多个 producer 组成; Consumer:消费者,接收事件并对事件进行消费; Intermediary:中介,接收包含事件的消息(message),并转发给下一个接收方,类似路由器; Context:上下文,上下文元数据被封装到 context attributes 中,用来判断事件与其它系统的关系; Data:数据,也可以叫做 payload; EventFormat:事件格式,例如 json; Message:消息,封装事件并将其从 source 传递到 destination; Protocol:协议,可以是行业标准如 http,开源协议如 Kafka 或者供应商协议如 AWS Kinesis; Protocol Binding:协议绑定,描述如何通过给定的协议收发事件,如何将事件放到消息里。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:肯梦
#技术探索 #事件驱动架构

2022年3月18日

EventBridge 事件总线及 EDA 架构解析
作为 Gartner 定义的 10 大战略技术趋势之一,事件驱动架构(EDA)逐渐成为主流技术架构。根据 Gartner 的预估,在新型数字化商业的解决方案中,将有 60%使用 EDA,在商业组织参与的技术栈中,EDA 有一半的占比。 当下比较成功的企业已然认识到,要想最大限度提升运营效率和客户体验,务必要将业务和技术两方面的举措紧密结合起来。运营事件或业务形势的变化是时下众多企业关注的焦点,这些变化能够为企业领导者带来切实有用的信息,而架构设计的主旨恰恰是从客户联系人、交易、运营等方面的信息中获取洞见,两者相辅相成。传统技术历来对企业从事件中获取洞见的速度有着诸多限制,比如用于记录、收集和处理此类事件的批处理 ETL(提取、转换、加载)等。基于以上背景,阿里云 EventBridge 应运而生。 EventBridge 是事件驱动的具体落地产品,也是 EDA 的最佳实践方式。 事件驱动(EDA)是什么 早在 2018 年,Gartner 评估报告将 EventDriven Model 列为 10 大战略技术趋势之一,事件驱动架构(EDA)将成为未来微服务的主流。该报告同时做出了以下断言: 到 2022 年,事件通知的软件模型将成为超过 60% 的新型数字化商业的解决方案; 到 2022 年,超过 50% 的商业组织将参与到事件驱动的数字化商业服务的生态系统当中。 很喜欢 George Santayana 在《 The Life of Reason》说的一句话 Those who fail to learn History are doomed to repeat it.(不懂历史的人注定会重蹈覆辙)。我们以史为鉴,来看看为什么会架构会演进到事件驱动。 上图是关于架构演进时间轴线。架构本身没有优劣之分,它本身就是一组技术决策,决定后续项目的所有功能开发(框架,编码规范,文档,流程….),所以这里不谈选型好坏,只谈为什么会引入某些框架,这个框架解决了软件开发中的什么问题。 单体架构:在单节点服务中,单体应用的所有模块都封装在单个进程运行,通信通过相同堆栈调用完成。这种模式下非常容易导致结构和关系不明确,难以对系统进行更改和重构。就像一个不透明的,粘稠的,脆弱的,僵硬的 Big Ball of Mud! 分层架构:在经典的分层架构中,层以相当谨慎的方式使用。即一个层只能知道它下方层的数据。在随后的实际应用中,更多的方式是一个层可以访问它下面的任何层。分层架构解决了单体架构的的逻辑分离问题,每一层都可以被等效替换,是用层区分也更加标准化,同时一个层可以被几个不同/更高级别的层使用。当然,层也有比较明显的缺点,层不能封装掉一切,比如添加到 UI 的某个字段,可能也需要添加到 DB,而且额外多余的层会严重损害系统性能。 MVC 架构:MVC 架构产生的原因其实很简单,随着业务系统的复杂性增加,之前所谓“全栈工程师”已经不适用大部分场景。为了降低前端和后台的集成复杂性,故而开始推广 MVC 架构。其中,Model 代表业务逻辑;View 代表视图层,比如前端 UI 的某个小组件;Controller 提供 View 和 Model 的协调,比如将用户某项操作转为业务逻辑等。此外还有很多扩展架构,譬如 ModelViewPresenter,ModelViewPresenterViewModel,ResourceMethodRepresentation,ActionDomainResponder 就不在细说了,感兴趣的同学可以 wiki 搜索下。 EBI 架构:即 Entity,Boundary(接口),Interactor (控制)。EBI 架构将系统边界视为完整连接,而不仅仅是视图,控制器或接口。EBI 的实体代表持有数据并结束相关行为的实际实体,很类似阿里云的 POP API。EBI 主要还是后端概念,它是与 MVC 相辅相成的。 洋葱架构:洋葱架构是一种低耦合,高内聚的架构模型。所有的应用程序围绕独立的对象模型构建,内层定义接口,外层实现接口,耦合方向向中心内聚,所有代码都可以独立与基础设施进行编译和运行。 SOA 架构:SOA 是 Service Orientated Architure 的缩写,即面向服务架构。表示每一个功能都是通过一个独立的服务来提供,服务定义了明确的可调用接口,服务之间的编排调用可完成一个完整的业务。其实这个架构也是目前架构中最成熟的,日常使用最多的架构模式。 在介绍完之前全部的架构趋势后,在回过头看看什么是 EDA 架构。 EDA 事件驱动架构( EventDriven Architecture ) 是一种系统架构模型,它的核心能力在于能够发现系统“事件”或重要的业务时刻(例如交易节点、站点访问等)并实时或接近实时地对相应的事件采取必要行动。这种模式取代了传统的“ request/response ”模型,在这种传统架构中,服务必须等待回复才能进入下一个任务。事件驱动架构的流程是由事件提供运行的。 上图其实很好的解释了 EDA 架构的模型,但是其实还不够明确,所以这里我们和单体架构一起对比看看他们之间差异。 在如上对比图中,我们其实可以较为清楚看到它与传统架构的区别。在一般传统架构中,创建订单操作发生后,一系列的操作其实都是通过一个系统完成的。而事件驱动的概念则是将全部操作都转换为 “事件” 概念,下游通过捕获某个 “事件” 来决定调用什么系统完成什么样的操作。 我们回过头来看“事件”,刚刚介绍中比较的重要部分其实是将操作转换为某类事件进行分发。那这的事件我们怎么定义呢? 简单来看,其实事件就是状态的显著变化,当用户采取特定行动时触发。以 4S 店售卖汽车为例: 当客户购买汽车并且其状态从 For Sale 变为 Sold 是一个事件; 成功交易后,从帐户中扣除金额是一个事件; 单击预订试驾后,从将预约信息添加到指定用户就是一个事件; 每个事件都可能触发一个或多个选项作为响应。 事件其实云原生 CNCF 基金会在 2018 年托管了开源 CloudEvents 项目,该项目旨在用统一和规范的格式来描述事件,来加强不同的服务、平台以及系统之间的互操作性。在该项目定义下,通用的事件规范是这样的: 事件主要由 Json 体构成,通过不同字段描述发生的事件。 总结来看,事件驱动其实是将比较重要的业务时刻封装成“事件”,并通过某个 EventBus 将事件路由给下游系统。 了解了 EDA 架构的整个处理过程,但是还没解决这个所谓的“EventBus”到底是什么? 如上图就是 EventBus 的核心逻辑架构,它由 Event Producer 和 Event Consumer 两端组成,通过 Bus 解耦中间环节,是不是非常像某个传统的 MQ 架构?别着急,在接下来的落地实践部分会讲解这个架构的复杂部分。 EDA 架构的落地实践思考 在开始介绍落地实践时,我们先来看一个经典的 EDA 架构模型: 这是一个非常经典 EDA 订单架构,该架构主要使用了 EventBridge 和 FC 函数计算(如果不太熟悉 FaaS 的同学可以把 FC 节点当作 ECS 或 Kubernetes 的某个 POD 节点),通过事件驱动各个业务进行协作。 所以这块的中心节点(EventBridge)其实有三个比较重要的能力: 1. For Event Capturing(事件收集):具备采集事件的能力; 2. For Routing(事件路由):通过事件内容将事件路由分发至于下游的能力; 3. For Event Processing(事件过滤/替换):对事件进行脱敏或初步过滤&筛选的能力。 通常情况下,要实现这三个能力是比较困难的,比如:Event Capturing 可能需要熟悉 Dell Boomi, Snaplogic, MuleSoft, Dataflow, Apache Apex 等,Routing 部分可能通过 RocketMQ、RabbitMQ、ActiveMQ、Apache Kafka,Event Processing 需要了解 Apache Storm, Apache Flink 。所以之前讲的逻辑架构其实非常理想,要想实现完成的 EDA 事件驱动还需要包括这些核心能力。 其实,从刚刚的架构中我们也能窥探到一些信息,EDA 架构其实看起来没有那么简单,那它有何优劣呢? 下面简单罗列下 EDA 架构在实践中的优势: 松耦合:事件驱动架构是高度松耦合且高度分布式的架构模型,事件的创建者(来源)只知道发生的事件,并不知道事件的处理方式,也关心有多少相关方订阅该事件; 异步执行:EDA 架构是异步场景下最适合的执行工具,我们可以将需要事件保留在队列中,直到状态正常后执行; 可扩展性:事件驱动架构可以通过路由&过滤能力快速划分服务,提供更便捷的扩展与路由分发; 敏捷性:事件驱动架构可以通过将事件分发至任何地方,提供更敏捷高效的部署方案。 当然,劣势也很明显: 架构复杂:事件驱动架构复杂,路由节点多,系统结成复杂,功能要求多; 路由分发难:事件路由分发难,灵活的事件路由需要依赖强大的实时计算能力,对整体分发系统要求较高; 无法追踪:事件追踪是整个 EDA 架构的保证,EDA 架构中往往很难追踪到事件处理状态,需要大量的定制化开发; 可靠性差:事件驱动由于需要多系统集成,可靠性通常较差,且交付无法保障。 _ 针对 EDA 场景面临的这些问题,阿里云推出了 EventBridge,一款无服务器事件总线服务,其使命是作为云事件的枢纽,以标准化的 CloudEvents 1.0 协议连接云产品和应用、应用和应用,提供中心化的事件治理和驱动能力,帮助用户轻松构建松耦合、分布式的事件驱动架构;另外,在阿里云之外的云市场上有海量垂直领域的 SaaS 服务,EventBridge 将以出色的跨产品、跨组织以及跨云的集成与被集成能力,助力客户打造一个完整的、事件驱动的、高效可控的上云体验。 阿里云对 EventBridge 做了定义,核心价值包括: 统一事件枢纽:统一事件界面,定义事件标准,打破云产品事件孤岛; 事件驱动引擎:海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级; 开放与集成:提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS 服务相互集成。 下面从架构层面和功能层面对 EventBridge 进行介绍: 架构层面 针对架构复杂问题,EventBridge 提供业内通用的 Source ,Buses,Rules,Targets 模块管理能力,同时支持 EventBus 和 EventStream 两种模式,大幅度降低事件驱动架构难度。 1)事件总线模型经典 EDA( 事件驱动)场景的 N:N 模型,提供多事件路由,事件匹配,事件转换等核心能力,帮助开发者快速搭建事件驱动架构。 2)事件流模型标准 Streaming(1:1) 流式处理场景,无总线概念,用于端到端的数据转储,数据同步及数据处理等,帮助轻松构建云上端到端的数据管道服务。 功能层面 在功能层面,EventBridge 的核心亮点应用包括: 1)事件规则驱动 针对基于事件的路由分发,EventBridge 通过事件规则驱动,支持 8 大事件模式,4 重转换器,满足路由分发的全部诉求。 2)事件追踪 针对事件无法追踪,独家提供事件追踪能力,事件分析/查询能力。为用户完善的全链路事件查询分析能力。 3)DLQ/重试机制、事件全流程触发 针对可靠性差,支持 DLQ/重试机制,与事件全流程触发,大幅度保证由于用户下游系统导致的事件故障与延迟。 4)Schema 注册中心 针对事件管理复杂,支持 Schema 注册中心,支持事件信息的解释、预览和上下游代码生成能力,帮助用户低代码完成事件的收发处理。解决跨部门信息沟通困难,业务代码冗余等一系列事件管理问题。 5)同时,基于以上功能 EventBridge 支持对接 85 种以上的阿里云产品,847 种事件类型。 更多产品功能介绍,可访问 EventBridge 官网 阿里云 EventBridge 更多场景介绍 经典 EDA 事件驱动 事件总线(EventBridge)最重要的能力是通过连接应用程序、云服务和 Serverless 服务来构建 EDA(Eventdriven Architectures) 事件驱动架构,驱动应用与应用,应用与云的连接。 流式 ETL 场景 EventBridge 另一个核心能力是为流式的数据管道的责任,提供基础的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步/跨地域备份等场景,连接不同的系统与不同服务。 统一事件通知服务 EventBridge 提供丰富的云产品事件源与事件的全生命周期管理工具,您可以通过总线直接监听云产品产生的数据,并上报至监控,通知等下游服务。 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:肯梦
#技术探索 #事件驱动架构

2022年3月11日

基于 EventBridge 构建 SaaS 应用集成方案
引言 事件驱动架构(EDA)是一种以事件为纽带,将不同系统进行解耦的异步架构设计模型。在 EDA 中,事件驱动的运行流程天然地划分了各个系统的业务语义,用户可以根据需求对事件与针对此事件做出的响应灵活定制,这使得基于 EDA 架构可以方便地构建出高伸缩性的应用。据 Daitan Group 的调研报告,早在 2017 年,例如 UBER、Deliveroo、Monzo 等公司就已经采用了 EDA 去设计他们的系统。 为了便于用户更加轻松地开发以 EDA 为架构的应用,在 2020 年云栖大会上,阿里云正式推出了 EventBridge。EventBridge 是一款无服务器事件总线服务,能够以标准化的 CloudEvents 1.0 协议在应用之间路由事件。目前,EventBridge 已经集成了众多成熟的阿里云产品,用户可以低代码甚至零代码完成各个阿里云产品和应用之间的打通,轻松高效地构建分布式事件驱动架构。 事件源是事件驱动的基石,如何获取更多事件源也是 EventBridge 一直在探索和尝试的方向。针对市场上其他云厂商和垂直领域的 Saas 服务,EventBridge 发布了 HTTP Source 能力,提供简单且易于集成的三方事件推送 ,帮助客户更加高效、便捷地实现业务上云。 HTTP Source 概述 接入 EventBridge 应用有多种情况:用户自定义应用、阿里云服务、其他云厂商服务或者其他 SaaS 产品。 对于用户自定义应用,用户可以使用 EventBridge 官方的 API 接口、多语言客户端以及 CloudEvents 社区的开源客户端来完成接入。 对于阿里云的云产品,EventBridge 原生支持,用户可以在默认事件总线中选择对应的云产品与其相关的触发事件。 而对于其他云厂商、SaaS 产品,EventBridge 同样也提供便捷的接入方式便于用户进行集成,HTTP Source 事件源便是一种典型的接入方式。 具体而言,HTTP Source 事件源是 EventBridge 支持的事件源的一种,它以 Webhook 形式暴露了发布事件的 HTTP 请求地址,用户可以在有 URL 回调的场景配置 HTTP Source 事件源,或者直接使用最简单的 HTTP 客户端来完成事件的发布。HTTP Source 事件源提供了支持 HTTP 与 HTTPS,公网与阿里云 VPC 等不同请求方式、不同网络环境的 Webhook URL,便于用户将其集成到各类应用中。接入时无需使用客户端,仅需保证应用可以访问到对应 Webhook URL 即可,这使得接入过程变得简单而高效。 在将 HTTP 请求转换为 CloudEvent 的时候,EventBridge 会将请求的头部和消息体部分置于 CloudEvent 字段中,其余字段会依据用户 EventBridge 资源属性以及系统默认规则进行填充。用户可以在事件规则中,对所需的内容进行过滤、提取,最终按照模板拼装成所需的消息内容投递给事件目标。 HTTP Source 事件源目前支持 3 种类型的安全设置,分别是请求方法、源 IP 以及请求来源域名。 请求方法:用户可以配置当前请求此事件源时合法的 HTTP 请求方法,如果方法类型不满足配置规则,请求将被过滤,不会投递到事件总线。 源 IP:用户可以设置允许访问此事件源时合法的源 IP(支持 IP 段和 IP),当请求源 IP 不在设置的范围内时,请求将被过滤,不会投递到事件总线。 请求来源域名:即 HTTP 请求的 referer 字段,当请求的 referer 与用户配置不相符时,请求被过滤,不会投递到事件总线。 抛砖引玉,下面就介绍如何使用 HTTP Source 来构建 SaaS 应用集成的最佳实践,帮助大家快速上手 SaaS 集成方案。 SaaS 集成最佳实践 钉钉监控 GitHub 代码推送事件 GitHub 提供了 Webhook 功能,代码仓库在发生某些特定操作(push、fork等)时,可以通过回调来帮助用户完成特定功能。针对多人开发的项目,将 GitHub 事件推送到特定钉钉群可以帮助成员有效关注代码变更,提高协同效率。 本节我们展示如何通过钉钉监控 GitHub 代码推送事件的最佳实践,主要包含以下几个步骤: 创建一个钉钉机器人; 创建 EventBridge 相关资源:事件总线、事件源(HTTP Source 类型)、事件规则、事件目标(钉钉); 创建自定义事件总线; 选择 GitHub 代码仓库创建 Webhook; 向 GitHub 代码仓库推送代码变更; 钉钉群接收此次代码推送相关信息。 1)创建钉钉机器人 参考钉钉官方文档[1],创建一个群机器人。创建群机器人时,安全设置请勾选“加签”并妥善保管密钥和稍后生成的机器人 Webhook 地址。 2)创建 EventBridge 相关资源 创建 EventBus 事件总线 创建事件源。事件源配置完成之后,点击跳过,我们接下来会专门配置事件规则与目标。 创建完成后,进入事件源详情页,保存刚刚生成的 Webhook URL。 在 EventBridge 控制台页面点击进入刚刚创建的 EventBus 详情页,在左侧一栏中“事件规则”选择“创建规则”。 创建时间目标。选择钉钉,并将钉钉机器人的 Webhook 地址和密钥填入,推送内容侧可以按照需求设计。 我们填写模板变量为: {"repo":"$.data.body.repository.full_name","branch":"$.data.body.ref","pusher":"$.data.body.pusher.name"} 模板为: {"msgtype": "text","text": {"content": "Github push event is triggered. repository: {repo}, git reference: {branch}, pusher: {pusher}." } } 3)在 GitHub 代码仓库创建 Webhook 登陆 GitHub,在 GitHub 代码仓库“setting”中选择左侧“Webhooks”,选择新建 Webhook。 在创建 Webhook 的配置项中填入 HTTP Source 事件源的 Webhook 地址,Content type 部分选择“application/json”,下方触发事件类型选择“Just the push event.”,随后点击“Add Webhook”,创建完成。 4)向 GitHub 代码仓库推送代码变更 本地仓库做一定变更,commit 后推送 GitHub。 5)钉钉群接收此次代码推送相关信息 _异步消费监控报警信息_ 业务上存在异步消费报警信息的场景,例如报警内容备份,根据报警频率自适应调整报警阈值等。而且对于多云业务的用户,如何将跨云服务的报警信息整合起来也是一个麻烦的问题。依托 HTTP Source,用户可以将不同云厂商(腾讯云、华为云等)、不同监控产品(Grafana、Zabbix、Nagios等)统一集成到 EventBridge 平台,以便于实现对报警信息的异步消费。 本节我们介绍如何使用 EventBridge 集成 Grafana,实现异步消费监控报警信息。Grafana 是一款开源数据可视化工具,也同时具有监控报警功能,具体使用可以参阅Grafana 官方文档[2]。本节主要包含以下步骤: 创建 MNS 队列; 创建 EventBridge 相关资源; Grafana 上配置 Webhook; 测试接收结果。 创建 MNS 队列 在 MNS 控制台,选择“队列列表创建队列”。 创建 EventBridge 相关资源 同上文所述,这里仅示例创建事件目标时相关配置。 Grafana 上配置 Webhook 点击 Grafana 控制台左侧“AlertingNotification channels”,选择“Add channel”。 在“type”一栏中选择“Webhook”,url 填写 HTTP Source 事件源的 Webhook 地址,点击下方“Test”。 测试接收结果 登陆 MNS 控制台,进入队列详情页,点击页面右上角“收发消息”,可以看到 MNS 已经接收到刚刚 Grafana 发送的消息。 点击对应消息详情可以看到消息内容,说明消息已经被成功消费。 _更多集成_ HTTP Source 支持的三方集成包括 Prometheus,Zabbix,Skywalking,Grafana,OpenFalcon,Cacti,Nagios,Dynatrace,Salesforce,Shopify,Gitee 等 SaaS 应用。通过简单配置 Webhook 无需开发既可实现事件接收能力。 _总结_ 本文重点介绍 EventBridge 的新特性:HTTP Source 事件源。作为一款无服务器事件总线服务,EventBridge 已经将阿里云云产品管控链路数据、消息产品业务数据整和到事件源生态中,提高了上云用户业务集成的便捷性,Open API 与多语言 sdk 的支持,为客户自身业务接入 EventBridge 提供了便利。 在此基础之上,HTTP Source 事件源更进一步,以 Webhook 形式开放了针对了其他云厂商、SaaS 应用的集成能力,无需代码改动,仅需要简单配置即可完成 EventBridge 集成操作。 _相关链接_ 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:昶风
#行业实践 #生态集成

2022年2月22日

EventBridge消息路由|高效构建消息路由能力
企业数字化转型过程中,天然会遇到消息路由,异地多活,协议适配,消息备份等场景。本篇主要通过 EventBridge 消息路由的应用场景和应用实验介绍,帮助大家了解如何通过 EventBridge 的消息路由高效构建消息路由能力。 背景知识 EventBridge 消息路由主要涉及以下云产品和服务: 事件总线 EventBridge 事件总线 EventBridge 是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS 应用以标准化、中心化的方式接入,并能够以标准化的 CloudEvents 1.0 协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。 消息队列 RabbitMQ 版 阿里云消息队列 RabbitMQ 版支持 AMQP 协议,完全兼容 RabbitMQ 开源生态以及多语言客户端,打造分布式、高吞吐、低延迟、高可扩展的云消息服务。开箱即用,用户无需部署免运维,轻松实现快速上云,阿里云提供全托管服务,更专业、更可靠、更安全。 消息队列 MNS 版 阿里云消息服务 MNS 版是一款高效、可靠、安全、便捷、可弹性扩展的分布式消息通知服务。MNS 能够帮助应用开发者在他们应用的分布式组件上自由的传递数据、通知消息,构建松耦合系统。 场景应用 EventBridge 消息路由功能在构建在构建消息系统过程中主要应用于下面三个场景,一是消息路由场景,二是消息多活场景,三是多协议适配场景,下面对这三个场景进行简要介绍。 消息路由场景 该场景是指希望对消息进行二次分发,通过简单过滤或者筛选将消息分发到其他 Topic 或跨地域 Topic,实现消息共享 & 消息脱敏的场景。 通过一层转发将消息分发给不同的 Topic 消费,是消息路由的核心能力。随着企业转型遇到消息拆分且做业务脱敏的场景会越来越多。如下图是一个较为典型的路由分流场景。 消息多活场景 消息多活场景指每个数据中心均部署了完整、独立的 MQ 集群。数据中心内的应用服务只连接本地的 MQ 集群,不连接其他单元的 MQ 集群。MQ 集群中包含的消息路由模块,负责在不同单元 MQ 集群之间同步指定主题的消息。 根据应用服务是否具有单元化能力,可分为中心服务和单元服务两类。中心服务只在一个数据中心提供服务;单元服务在各个数据中心都提供服务,但只负责符合规则的部分用户,而非全量用户。 所有部署了单元服务的数据中心都是一个单元,所有单元的单元服务同时对外提供服务,从而形成一个异地多活架构或者叫单元化架构。通过多活管控平台可动态调整各个单元服务负责的流量。 多协议适配场景 随着业务团队的逐渐庞大,对消息的建设诉求与日俱增,由于部门技术栈的不同会导致部门间的消息协议也不尽相同。多协议适配是指用一种消息协议平滑迁移到多种消息协议的能力。 架构描述 使用 EventBridge 的事件流能力做消息路由,事件流模型是 EventBridge 在消息领域主打的处理模型,适用标准 Streaming(1:1)流式处理场景,无总线概念。用于端到端的消息路由,消息转储,消息同步及处理等,帮助开发者轻松构建云上数据管道服务。 下面的架构展示了如何通过桥接 EventBridge 实现 MNS 消息路由至 RabbitMQ Queues,MNS Queues。(A/B 链路任选其一进行试验) 应用实验 目标 通过本实验教程的操作,您可以通过阿里云控制台,在事件总线控制台中创建消息路由服务,在 EventBridge 控制台实现消息路由与简单的消息脱敏。 体验此实验后,可以掌握的知识有: 创建消息路由任务; 创建 RabbitMQ 实例、MNS 实例与简单的消息发送。 资源 使用到的资源如下:(本次实验资源遵循最小原则,使用满足场景需求的最小化资源) 资源一:EventBridge 事件总线 资源二:阿里云消息队列 RabbitMQ 版 资源三:阿里云消息队列 MNS 版 步骤 1)创建 MNS 资源 本实验分 A /B 两个可选场景: A 、场景通过 MNS Queues1 投递至 MNS Queues2 B 、场景通过 MNS Queues1 投递至 RabbitMQ Queues 可根据兴趣选择不同场景。 本步骤将指导您如何通过控制台创建消息队列 MNS 版。 使用您自己的阿里云账号登录阿里云控制台,然后访问消息队列MNS版控制台。[1] 在控制台左边导航栏中,单击队列列表。(资源地域为同地域即可,本次引导默认选杭州) 在列表页面,单击创建队列并填写名称信息“testmnsq” 创建完成后点击“详情” 找到 MNS 公网接入点信息,并记住该信息,后续实验会用到。 E.g. 注意:重复如上步骤即可创建 A 实验链路的 “testmnsq2” 2)创建 RabbitMQ 资源(B 实验可选) 本步骤将指导您如何通过控制台创建消息队列 RabbitMQ 版。 使用您自己的阿里云账号登录阿里云控制台,然后访问消息队列RabbitMQ版控制台。[2] 在控制台左边导航栏中,单击实例列表。(资源地域为同地域即可,本次引导默认选杭州) 在列表页面,单击创建实例,并完成创建。 创建完成后点击详情进入实例详情页; 在“Vhost 列表” 创建 “testamqpv”; 在“Queue 列表” ,选择 Vhost 为“testamqpv”,并创建 “testamqpq”; 3)创建 EventBridge 事件流任务   MNS TO MNS(A 实验可选) 本步骤将指导您如何通过控制台创建 EventBridge 事件流。 使用您自己的阿里云账号登录阿里云控制台,然后访问 EventBridge 控制台。[3] 注:第一次使用需开通。 单击“事件流”列表,并在列表创建任务 (资源地域为同地域即可,本次引导默认选杭州) 创建事件流名称为“testamqpmns2mns”,点击下一步; 指定事件源,事件提供方为“消息服务 MNS”,队列名称为“testmnsq”,点击下一步; 指定规则,规则部分可不做筛选,默认匹配全部,直接点击下一步; 注意:规则内容可根据需求自行指定,为降低难度本次实验默认投递全部,更多详情请查阅: 服务类型选择“消息服务 MNS”,队列名称选择“testmnsq2”,消息内容选择“部分事件”,点击创建 注意:消息内容可根据需求自行指定,本次实验默认投递 data 字段,更多详情请查阅: 创建完成后,可点击“启动”来启动事件流 4)创建 EventBridge 事件流任务 MNS TO RabbitMQ(B 实验可选) 本步骤将指导您如何通过控制台创建 EventBridge 事件流。 使用您自己的阿里云账号登录阿里云控制台,然后访问 EventBridge 控制台。[3]注:第一次使用需开通。 单击“事件流”列表,并在列表创建任务 (资源地域为同地域即可,本次引导默认选杭州) 创建事件流名称为“testamqpmns2rabbitmq”,点击下一步 指定事件源,事件提供方为“消息服务 MNS”,队列名称为“testmnsq”,点击下一步 指定规则,规则部分可不做筛选,默认匹配全部,直接点击下一步 注意:规则内容可根据需求自行指定,为降低难度本次实验默认投递全部,更多详情请查阅: 服务类型选择“消息队列 RabbitMQ 版本”,具体配置如下,点击创建 实例ID:选择创建好的RabbitMQ ID Vhost:选择“testamqpv” 目标类型:选择“Queue” Queue:选择“testamqpq” Body:选择“部分事件”,填写“$.data” MessageId:选择“常量”,填写“0” Properties:选择“部分事件”,填写“$.source” 注意:消息内容可根据需求自行指定,本次实验默认投递 data 字段,更多详情请查阅: 创建完成后,可点击“启动”来启动事件流 5)验证路由任务 向 MNS Source  “testmnsq ” 发送实验消息 点击下载 MNS SDK[4] 修改 sample.cfg 在 “sample.cfg ” 填写 AccessKeyId,AccessKeySecret,Endpoint 等信息 AccessKeyId,AccessKeySecret 可在阿里云 RAM 控制台[5]创建 Endpoint 即步骤 1 , MNS 公网接入点地址 AccessKeyId = xxxxxx AccessKeySecret = xxxxxxx Endpoint = http://xxxx.mns.cnhangzhou.aliyuncs.com 填完效果如下,保存 找到 sample 目录的“sendmessage.py” 示例 将循环参数调整为 200,并保存 (可选) 保存并运行 “python sendmessage.py testmnsq” python sendmessage.py testmnsq 在事件流控制台[6],分别点开 “testmnsq2”, “testamqpq” 查看详情转储详情。 注意:MNS Q 仅支持单订阅,不支持广播模式。故该测试需要将 MNS/RabbitMQ 两个实验,任选其一关停后进行实验。 如需广播模式,请创建 MNS Topic 资源。 A 链路实验结果: B 链路实验结果: 优势及总结 EventBridge 事件流提供端到端的消息路由能力,通过简单配置即可完成消息分发,消息同步,跨地域消息备份,跨产品消息同步等能力。具有运维简单,成本低,效率高,使用稳定等优势。同时使用 EventBridge 可以实现基础的数据过滤,数据脱敏等数据处理类能力。是消息路由场景下运维成本最低的解决方案。 相关链接 [1] 消息队列MNS版控制台 [2] 消息队列RabbitMQ版控制台 [3] EventBridge 控制台 [4] 点击下载 MNS SDK [5] 阿里云RAM 控制台 [6] 事件流控制台 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:肯梦
#技术探索 #生态集成

2022年2月22日

RocketMQ-Streams 首个版本发布,轻量级计算的新选择
RocketMQStreams 聚焦「大数据量高过滤轻窗口计算」场景,核心打造轻资源,高性能优势,在资源敏感场景有很大优势,最低 1Core,1G 可部署。通过大量过滤优化,性能比其他大数据提升 25 倍性能。广泛应用于安全,风控,边缘计算,消息队列流计算。 RocketMQStreams 兼容 Flink 的 SQL,udf/udtf/udaf,将来我们会和 Flink 生态做深度融合,即可以独立运行,也可发布成 Flink 任务,跑在 Flink 集群,对于有 Flink 集群的场景,即能享有轻资源优势,可以做到统一部署和运维。 01 _RocketMQStreams 特点及应用场景_  RocketMQStreams 应用场景 计算场景:适合大数据量高过滤轻窗口计算的场景。不同于主流计算引擎,需要先部署集群,写任务,发布,调优,运行这么复杂的过程。RocketMQStreams 本身就是一个 lib 包,基于 SDK 写完流任务,可以直接运行。支持大数据开发需要的计算特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流Join,高吞吐、低延迟、高性能。最低 1Core,1G 可以运行。 SQL引擎:RocketMQStreams 可视作一个 SQL 引擎,兼容 Flink SQL 语法,支持 Flink udf/udtf/udaf 的扩展。支持 SQL 热升级,写完 SQL,通过 SDK 提交 SQL,就可以完成 SQL 的热发布。 ETL引擎:RocketMQStreams 还可视作 ETL 引擎,在很多大数据场景,需要完成数据从一个源经过 ETl,汇聚到统一存储,里面内置了 grok,正则解析等函数,可以结合 SQL 一块完成数据 ETL 。 开发 SDK,它也是一个数据开发 SDK 包,里面的大多数组件都可以单独使用,如 Source/sink,它屏蔽了数据源,数据存储细节,提供统一编程接口,一套代码,切换输入输出,不需要改变代码。  RocketMQStreams 设计思路 设计目标 依赖少,部署简单,1Core,1G 单实例可部署,可随意扩展规模。 实现需要的大数据特性:ExactlyONCE,灵活窗口(滚动、滑动、会话),双流 Join,高吞吐、低延迟、高性能。 实现成本可控,实现低资源,高性能。 兼容 Flink SQL,UDF/UDTF,让非技术人员更易上手。 设计思路 采用 sharednothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展。并发能力取决于分片数。 利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错。 利用存储实现状态备份,实现 ExactlyONCE 的语义。用结构化远程存储实现快速启动,不必等本地存储恢复。  RocketMQStreams 特点和创新 02 _RocketMQStreams SDK 详解_  Hello World 按照惯例,我们先从一个例子来了解 RocketMQStreams namespace:相同 namespace 的任务可以跑在一个进程里,可以共享配置 pipelineName:job name DataStreamSource:创建 source 节点 map:用户函数,可以通过实现 MapFunction 扩展功能 toPrint:结果打印出来 start:启动任务 运行上面代码就会启动一个实例。如果想多实例并发,可以启动多个实例,每个实例消费部分 RocketMQ 的数据。 运行结果:把原始消息拼接上“”,并打印出来  RocketMQStreams SDK StreamBuilder 做为起点,通过设置 namespace,jobName 创建一个 DataStreamSource 。 DataStreamSource 通过 from 方法,设置 source,创建 DataStream 对象。 DataStream 提供多种操作,会产生不同的流: to 操作产生 DataStreamAction window 操作产生 WindowStream 配置 window 参数 join 操作产生 JoinStream 配置 join 条件 Split 操作产生 SplitStream 配置 split 条件 其他操作产生 DataStream DataStreamAction 启动整个任务,也可以配置任务的各种策略参数。支持异步启动和同步启动。  RocketMQStreams 算子  RocketMQStreams 算子 SQL 有两种部署模式,1 是直接运行 client 启动 SQL,见第一个红框;2 是搭建 server 集群,通过 client 提交 SQL 实现热部署,见第二个红框。 RocketMQStreams SQL 扩展,支持多种扩展方式: 通过 FlinkUDF,UDTF,UDAF 扩展 SQL 能力,在 SQL 中通过 create function 引入,有个限制条件,即 UDF 在 open 时未用到 Flink FunctionContext 的内容。 通过内置函数扩展 SQL 的函数,语法同 Flink 语法,函数名是内置函数的名称,类名是固定的。如下图,引入了一个 now 的函数,输出当前时间。系统内置了 200 多个函数,可按需引入。 通过扩展函数实现,实现一个函数很简单,只需要在 class 上标注 Function,在需要发布成函数的方法上标注 FunctionMethod,并设置需要发布的函数名即可,如果需要系统信息,前面两个函数可以是 IMessage 和 Abstract,如果不需要,直接写参数即可,参数无格式要求。如下图,创建了一个 now 的函数,两种写法都可以。可以通过 currentTime=now()来调用,会在 Message 中增加一个 key=currentTime,value=当前时间的变量。 把现有 java 代码发布成函数,通过策略配置,把 java 代码的类名,方法名,期望用到的函数名,配置进去,把 java 的 jar 包 copy 到 jar 包目录即可。下图是几种扩展的应用实例。 03 _RocketMQStreams 架构及原理实现_  整体架构  Source 实现 Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。 Source 支持分片的自动负载均衡和容错。 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作。 当有新分片时, 发送新增分片消息,让算子完成分片的初始化。 数据源通过 start 方法,启动 consuemr 获取消息。 原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。  Sink 实现 Sink 是实时性和吞吐的一个结合。 实现一个 Sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐。 常规的使用方式是写 Messagecacheflush存储的方式,系统会严格保证,每次批次写入存储的量不超过 batchsize 的量,如果超了,会拆分成多批写入。 Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐量。(一个分片一个 cache)。 可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask 。 也可以通过调用 flush 方法刷新 cache 到存储。 Sink 的 cache 会有内存保护,当 cache 的消息条数batchSize,会强制刷新,释放内存。  RocketMQStreams ExactlyONCE Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作。消息至少消费一次。 每条消息会有消息头部,里面封装了 QueueId 和 offset 。 组件在存储数据时,会把 QueueId 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重。 内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。  RocketMQStreams Window 支持滚动,滑动和会话窗口。支持事件时间和自然时间(消息进入算子的时间)。 支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时的窗口数据会有丢失。 快速启动,无需等本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算。 利用消息队列负载均衡,实现扩容缩容,每个 Queue 是一个分组,一个分组同一刻只被一台机器消费。 正常计算依赖本地存储,具备 Flink 相似的计算性能。 支持三种触发模式,可以均衡 watermark 延迟和实时性要求 04 _RocketMQStreams 在云安全的应用_  在安全应用的背景 公共云转战专有云,在入侵检测计算方面遇到了资源问题,大数据集群默认不输出,输出最低 6 台高配机器,用户很难接受因为买云盾增配一套大数据集群。 专有云用户升级,运维困难,无法快速升级能力和修复 bug。  流计算在安全的应用 基于安全特点(大数据高过滤轻窗口计算)打造轻量级计算引擎:经过分析所有的规则都会做前置过滤,然后才会做较重的统计,窗口,join 操作,且过滤率比较高,基于此特点,可以用更轻的方案实现统计,join 操作。 通过 RocketMQStreams,覆盖 100%专有云规则(正则,join,统计)。 轻资源,内存是公共云引擎的 1/70,CPU 是 1/6,通过指纹过滤优化,性能提升 5 倍以上,且资源不随规则线性增加,新增规则无资源压力。复用以前的正则引擎资源,可支持 95%以上局点,不需要增加额外物理资源。 通过高压缩维表,支持千万情报。1000 W 数据只需要 330 M 内存。 通过 C/S 部署模式,SQL 和引擎可热发布,尤其护网场景,可快速上线规则。 05 _RocketMQStreams 未来规划_ 新版本下载地址: 活动推荐 阿里云基于 Apache RocketMQ 构建的企业级产品消息队列RocketMQ 5.0版现开启活动: 1、新用户首次购买包年包月,即可享受全系列 85折优惠! 了解活动详情:
作者:袁小栋、程君杰
#社区动态 #流处理