开始使用 Atlas Stream Processing
在此页面上
本教程将引导您完成设置 Atlas Stream Processing 和运行第一个流处理器的步骤。
先决条件
要完成本教程,您需要:
Atlas 项目
mongosh
2.0或更高版本具有
Project Owner
或Project Stream Processing Owner
角色的 Atlas 用户,用于管理流处理实例和连接注册表注意
Project Owner
角色允许您创建数据库部署、管理项目访问和项目设置、管理 IP 访问列表条目等。Project Stream Processing Owner
角色支持 Atlas Stream Processing 操作,例如查看、创建、删除和编辑流处理实例,以及查看、添加、修改和删除连接注册表中的连接。请参阅项目角色,详细了解这两个角色之间的区别。
具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器Atlas 集群
步骤
在 AtlasStream Processing 中,转到项目的 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击Services标题下的Stream Processing 。
此时会显示“流处理”页面。
获取Atlas Stream Processing实例连接string 。
找到Atlas Stream Processing实例的概述面板,然后单击 Connect。
选择 I have the MongoDB shell installed 。
从Select your mongo shell version下拉菜单中,选择最新版本的
mongosh
。复制 Run your connection string in your command line 下提供的连接string 。 您将在后续步骤中用到它。
单击 Close(连接)。
验证您的流数据源是否发出消息。
您的流处理实例预先配置了与名为sample_stream_solar
的示例数据源的连接。该源生成来自各种太阳能设备的一系列报告。每份报告都描述了在特定时间点观察到的单个太阳能设备的瓦数和温度,以及该设备的最大瓦数。
以下文档是一个代表性示例。
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 }, _ts: ISODate('2024-08-12T21:41:01.788Z'), _stream_meta: { source: { type: 'generated' } } }
要验证此源是否发出消息,请以交互方式创建流处理器。
打开您选择的终端应用程序。
使用 连接到Atlas Stream Processing
mongosh
实例。将您在上一步中复制的 连接
mongosh
粘贴到终端,其中string<atlas-stream-processing-url>
URL是Atlas Stream Processing 实例的 ,<username>
是具有atlasAdmin
角色的用户。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> 根据提示输入密码。
创建流处理器。
将以下代码复制到
mongosh
提示符中:sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) 验证来自
sample_stream_solar
连接的数据是否显示在控制台上,并终止该进程。使用
sp.process()
创建的流处理器在终止后不会持续存在。
创建持久流处理器。
使用聚合管道,您可以在摄取每个文档时对其进行转换。以下聚合管道以一秒为间隔导出每个太阳能设备的最高温度以及平均值、中值、最大和最小瓦数。
配置
$source
阶段。以下
$source
阶段从sample_stream_solar
源摄取数据,并将 Atlas Stream Processing 时间字段值设置为等于源的timestamp
字段的值。let s = { source: { connectionName: "sample_stream_solar", timeField: { $dateFromString: { dateString: '$timestamp' } } } } 配置
$group
阶段。以下
$group
阶段根据group_id
组织所有传入数据,为每个group_id
累加所有文档的obs.temp
和obs.watts
字段的值,然后派生所需的数据。let g = { group: { _id: "$group_id", max_temp: { $avg: "$obs.temp" }, avg_watts: { $min: "$obs.watts" }, median_watts: { $min: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } 配置
$tumblingWindow
阶段。为了对流数据执行
$group
等累加操作,Atlas Stream Processing 使用窗口来限制数据集。以下$tumblingWindow
阶段将流分成连续的10秒间隔。这意味着,举例来说,当
$group
阶段计算median_watts
的值时,它会获取在前10秒内摄取的具有给定group_id
的所有文档的obs.watts
值。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } 配置$merge阶段。
$merge
允许您将处理后的流数据写入 Atlas 数据库。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } 创建流处理器。
为新的流处理器指定名称,并通过按顺序列出每个阶段来声明其聚合管道。
$group
阶段属于$tumblingWindow
的嵌套管道,您不得将其包含在处理器管道定义中。sp.createStreamProcessor("solarDemo", [s, t, m])
这将创建一个名为solarDemo
的流处理器,它应用之前定义的查询并将处理后的数据写入您连接到的集群上的solarDb
数据库的solarColl
集合。它会返回从太阳能设备的10秒观测间隔中获得的各种测量值。
要详细了解 Atlas Stream Processing 如何写入静态数据库,请参阅$merge
。
验证流处理器的输出。
要验证处理器是否处于活动状态,请在mongosh
中运行以下命令:
sp.solarDemo.stats()
此命令报告solarDemo
流处理器的操作统计信息。
要验证流处理器是否正在将数据写入 Atlas 集群,请执行以下操作:
在 Atlas 中,进入项目的 Clusters 页面。
如果尚未显示,请选择包含所需项目的组织导航栏中的Organizations菜单。
如果尚未显示,请从导航栏的Projects菜单中选择所需的项目。
如果 Clusters(数据库部署)页面尚未出现,请单击侧边栏中的 Database(数据库)。
此时会显示“集群”页面。
单击集群的对应 Browse Collections 按钮。
显示数据浏览器。
查看
MySolar
集合。
或者,您可以使用mongosh
在终端中显示已处理文档的样本:
sp.solarDemo.sample()
{ _id: 10, max_watts: 136, min_watts: 130, avg_watts: 133, median_watts: 130, max_temp: 7, _stream_meta: { source: { type: 'generated' }, window: { start: ISODate('2024-08-12T22:49:05.000Z'), end: ISODate('2024-08-12T22:49:10.000Z') } } }
注意
以上是一个具有代表性的示例。流数据不是静态的,每个用户看到的都是不同的文档。
后续步骤
了解如何: