本教程将引导您完成设置 Atlas Stream Processing 和运行第一个流处理器的步骤。
先决条件
要完成本教程,您需要:
具有空集群的Atlas项目。此集群用作流处理器的数据接收器。
具有
atlasAdmin
角色的数据库用户,用于创建和运行流处理器mongosh
2.0或更高版本具有
Project Owner
或Project Stream Processing Owner
角色的 Atlas 用户,可管理流处理实例和连接注册表注意
Project Owner
角色允许您创建数据库部署、管理项目访问和项目设置、管理 IP 访问列表条目等。Project Stream Processing Owner
角色可执行 Atlas Stream Processing 操作,如查看、创建、删除和编辑流处理实例,以及查看、添加、修改和删除连接注册表中的连接。要详细了解这两个角色之间的区别,请参阅项目角色。
步骤
本教程将指导您创建流处理实例,将其连接到现有Atlas 集群,设置流处理器以从太阳能流媒体媒体设备获取示例数据并将数据写入连接的集群。
创建Atlas Stream Processing实例。
在Atlas中, Go项目的 Stream Processing 页面。
警告: 导航改进正在进行中
我们目前正在逐步推出改进的全新导航体验。如果以下步骤与您在 Atlas UI 中的视图不符,请参阅预览文档。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Services 标题下的 Stream Processing。
此时将显示 Stream Processing 页面。
单击 Create a workspace(连接)。
在Create a stream processing instance页面上,按如下方式配置实例:
Tier:
SP30
Provider:
AWS
Region:
us-east-1
Instance Name:
tutorialInstance
单击 Create(连接)。
将接收器连接添加到连接注册表。
将与现有空Atlas 集群的连接添加到连接注册表中。您的流处理器将使用此连接作为流媒体数据接收器。
在Atlas Stream Processing实例的窗格中,单击 Configure。
在Connection Registry标签页中,单击右上角的+ Add Connection 。
从 Connection Type 下拉列表中,单击 Atlas Database。
在 Connection Name 字段中输入
mongodb1
。从 Atlas Cluster 下拉列表中,选择一个未存储任何数据的Atlas 集群。
从 Execute as 下拉列表中选择 Read and write to any database。
单击 Add connection(连接)。
验证您的流数据源是否发出消息。
您的流处理实例预配置了与名为 sample_stream_solar
的示例数据源的连接。该源可生成来自各种太阳能设备的报告流。每个报告都描述了在特定时间点观察到的单个太阳能设备的瓦数和温度,以及该设备的最大瓦数。
以下文档代表来自此数据源的报告:
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
要验证此源是否发出消息,请使用 mongosh
: 以交互方式创建流处理器:
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing
mongosh
,通过 进行连接。在Atlas Stream Processing实例的窗格中,单击 Connect。
在 Connect to your instance 对话框中,选择 Shell标签页。
复制对话框中显示的连接字符串。 它采用以下格式,其中
<atlas-stream-processing-url>
是流处理实例的URL ,<username>
是具有atlasAdmin
角色的数据库用户的用户名:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 将连接字符串粘贴到终端中,并将
<password>
占位符替换为用户的凭证。按 Enter运行该程序并连接到您的流处理实例。
在
mongosh
提示符中,使用sp.process()
方法以交互方式创建流处理器。sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) 验证来自
sample_stream_solar
连接的数据是否显示在控制台上,并终止该进程。使用
sp.process()
创建的流处理器在终止后不会持续存在。
创建持久流处理器。
持久流处理器持续摄取、处理流媒体数据并将其写入指定的数据接收器,直到您删除处理器。以下流处理器是一个聚合管道,用于导出每个太阳能设备在 10 秒间隔内的最高温度以及平均瓦数、最大瓦数和最小瓦数,然后将结果写入已连接的空集群。
选择以下标签页之一以使用Atlas用户界面或 mongosh
: 创建流处理器:
要在Atlas用户界面中创建流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure。 然后选择使用可视化生成器或JSON编辑器来配置名为 solarDemo
的流处理器:
单击 Create with visual builder(连接)。
可视化构建器将打开并显示一个表单,您可以在其中配置流处理器。
在 Stream processor name 字段中输入
solarDemo
。在 Source字段中,从 Connection 下拉列表中选择
sample_stream_solar
。这会将以下
$source
阶段添加到聚合管道:{ "$source": { "connectionName": "sample_stream_solar" } } 配置
$tumblingWindow
阶段。在 Start building your pipeline 窗格中,单击 + Custom stage,然后将以下 JSON 复制并粘贴到出现的文本框中。这定义了一个带有嵌套
$tumblingWindow
$group
阶段的10 阶段,该阶段得出每个太阳能设备在 秒时间间隔内的最高温度以及最大、最小和平均瓦数。这意味着,示例,当
$group
阶段计算max_watts
的值时,它会从前 10 秒内摄取的具有给定group_id
的所有文档的obs.watts
值中提取最大值。{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } 在 Sink字段中,从 Connection 下拉列表中选择
mongodb1
。在出现的文本框中,复制并粘贴以下JSON 。这将配置一个
$merge
阶段,该阶段将处理后的流媒体数据写入到已连接Atlas 集群的solarDb
数据库中名为solarColl
的集合中:{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } 单击 Create stream processor(连接)。
流处理器已创建并列在 Stream Processing 页面的 Stream Processors标签页上。
单击 Use JSON editor(连接)。
JSON编辑器将打开,并显示一个文本框,您可以在其中以JSON格式配置流处理器。
定义流处理器。
将以下 JSON 定义复制并粘贴到JSON编辑器文本框中,以定义一个名为
solarDemo
的流处理器。此流处理器使用带有嵌套 阶段的$tumblingWindow
$group
阶段来推导每个太阳能设备在10solarColl
solarDb
秒时间间隔内的最高温度以及最大瓦数、最小瓦数和平均瓦数,然后将结果写入到集合已连接Atlas 集群的 数据库中的 。这意味着,示例,当
$group
阶段计算max_watts
的值时,它会从前 10 秒内摄取的具有给定group_id
的所有文档的obs.watts
值中提取最大值。{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] } [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "avg_watts": { "$avg": "$obs.watts" }, "max_temp": { "$avg": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "coll": "solarColl", "connectionName": "mongodb1", "db": "solarDb" } } } ]
在 mongosh
中运行以下命令以创建名为 solarDemo
的持久流处理器:
连接到您的Atlas Stream Processing实例。
使用与您的 实例关联的连接stringAtlas Stream Processing
mongosh
,通过 进行连接。在Atlas Stream Processing实例的窗格中,单击 Connect。
在 Connect to your instance 对话框中,选择 Shell标签页。
复制对话框中显示的连接字符串。 它采用以下格式,其中
<atlas-stream-processing-url>
是流处理实例的URL ,<username>
是具有atlasAdmin
角色的数据库用户的用户名:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 将连接字符串粘贴到终端中,并将
<password>
占位符替换为用户的凭证。按 Enter运行该程序并连接到您的流处理实例。
配置
$source
阶段。为从
sample_stream_solar
源摄取数据的$source
阶段定义一个变量。let s = { source: { connectionName: "sample_stream_solar" } } 配置
$group
阶段。为
$group
阶段定义一个变量,该变量根据其group_id
导出每个太阳能设备的最高温度以及平均瓦数、最大瓦数和最小瓦数。let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } 配置
$tumblingWindow
阶段。为了流媒体数据执行
$group
等累加操作, Atlas Stream Processing使用Windows来绑定数据集。为$tumblingWindow
阶段定义一个变量,将流分成连续的 10 秒间隔。这意味着,示例,当
$group
阶段计算max_watts
的值时,它会从前 10 秒内摄取的具有给定group_id
的所有文档的obs.watts
值中提取最大值。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } 配置 $merge 阶段。
为
$merge
阶段定义一个变量,该阶段将处理后的流媒体数据写入已连接Atlas 集群的solarDb
数据库中名为solarColl
的集合。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } 创建流处理器。
使用
sp.createStreamProcessor()
方法为新的流处理器指定名称并声明其聚合管道。$group
阶段属于$tumblingWindow
的嵌套管道,不得将其包含在处理器管道定义中。sp.createStreamProcessor("solarDemo", [s, t, m]) 这将创建一个名为
solarDemo
的流处理器,该处理器应用之前定义的查询,并将处理后的数据写入您连接到的集群上solarDb
数据库的solarColl
集合。它返回从太阳能设备的 10 秒间隔观测得出的各种测量值。要详细学习;了解Atlas Stream Processing如何写入静态数据库,请参阅
$merge
(流处理)。
启动流处理器。
在流处理实例的流处理器列表中,单击流处理器的 Start 图标。
使用 中的sp.processor.start()
mongosh
方法:
sp.solarDemo.start()
验证流处理器的输出。
要验证处理器是否处于活动状态,请使用 中的sp.processor.stats()
mongosh
方法:
sp.solarDemo.stats()
此方法报告 solarDemo
流处理器的操作统计信息。
您还可以使用sp.processor.sample()
中的mongosh
方法返回终端中已处理文档的样本。
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
注意
前面的输出是一个代表性示例。流数据不是静态的,每个用户看到的都是不同的文档。
删除流处理器。
在流处理实例的流处理器列表中,单击流处理器的 Delete () 图标。
在出现的确认对话框中,键入流处理器的名称 (solarDemo
) 以确认要将其删除,然后单击 Delete。
使用 中的sp.processor.drop()
mongosh
方法删除solarDemo
:
sp.solarDemo.drop()
要确认已删除 solarDemo
,请使用 sp.listStreamProcessors()
方法列出所有可用的流处理器:
sp.listStreamProcessors()
后续步骤
了解如何: