Docs 菜单
Docs 主页
/
Atlas
/

开始使用 Atlas Stream Processing

本教程将引导您完成设置 Atlas Stream Processing 和运行第一个流处理器的步骤。

要完成本教程,您需要:

  • 具有空集群的Atlas项目。此集群用作流处理器的数据接收器。

  • 具有 atlasAdmin 角色的数据库用户,用于创建和运行流处理器

  • mongosh 2.0或更高版本

  • 具有 Project OwnerProject Stream Processing Owner 角色的 Atlas 用户,可管理流处理实例和连接注册表

    注意

    Project Owner角色允许您创建数据库部署、管理项目访问和项目设置、管理 IP 访问列表条目等。

    Project Stream Processing Owner 角色可执行 Atlas Stream Processing 操作,如查看、创建、删除和编辑流处理实例,以及查看、添加、修改和删除连接注册表中的连接。

    要详细了解这两个角色之间的区别,请参阅项目角色

本教程将指导您创建流处理实例,将其连接到现有Atlas 集群,设置流处理器以从太阳能流媒体媒体设备获取示例数据并将数据写入连接的集群。

1
  1. 在Atlas中, Go项目的 Stream Processing 页面。

    警告: 导航改进正在进行中

    我们目前正在逐步推出改进的全新导航体验。如果以下步骤与您在 Atlas UI 中的视图不符,请参阅预览文档。

    1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。

    2. 如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。

    3. 在侧边栏中,单击 Services 标题下的 Stream Processing

      此时将显示 Stream Processing 页面。

  2. 单击 Create a workspace(连接)。

  3. Create a stream processing instance页面上,按如下方式配置实例:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. 单击 Create(连接)。

2

将与现有空Atlas 集群的连接添加到连接注册表中。您的流处理器将使用此连接作为流媒体数据接收器。

  1. 在Atlas Stream Processing实例的窗格中,单击 Configure

  2. Connection Registry标签页中,单击右上角的+ Add Connection

  3. Connection Type 下拉列表中,单击 Atlas Database

  4. Connection Name 字段中输入 mongodb1

  5. Atlas Cluster 下拉列表中,选择一个未存储任何数据的Atlas 集群。

  6. Execute as 下拉列表中选择 Read and write to any database

  7. 单击 Add connection(连接)。

3

您的流处理实例预配置了与名为 sample_stream_solar 的示例数据源的连接。该源可生成来自各种太阳能设备的报告流。每个报告都描述了在特定时间点观察到的单个太阳能设备的瓦数和温度,以及该设备的最大瓦数。

以下文档代表来自此数据源的报告:

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

要验证此源是否发出消息,请使用 mongosh: 以交互方式创建流处理器:

  1. 连接到您的Atlas Stream Processing实例。

    使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

    1. 在Atlas Stream Processing实例的窗格中,单击 Connect

    2. Connect to your instance 对话框中,选择 Shell标签页。

    3. 复制对话框中显示的连接字符串。 它采用以下格式,其中 <atlas-stream-processing-url> 是流处理实例的URL ,<username> 是具有 atlasAdmin 角色的数据库用户的用户名:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 将连接字符串粘贴到终端中,并将 <password> 占位符替换为用户的凭证。

      按 Enter运行该程序并连接到您的流处理实例。

  2. mongosh 提示符中,使用 sp.process() 方法以交互方式创建流处理器。

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    验证来自sample_stream_solar连接的数据是否显示在控制台上,并终止该进程。

    使用sp.process()创建的流处理器在终止后不会持续存在。

4

持久流处理器持续摄取、处理流媒体数据并将其写入指定的数据接收器,直到您删除处理器。以下流处理器是一个聚合管道,用于导出每个太阳能设备在 10 秒间隔内的最高温度以及平均瓦数、最大瓦数和最小瓦数,然后将结果写入已连接的空集群。

选择以下标签页之一以使用Atlas用户界面或 mongosh: 创建流处理器:

要在Atlas用户界面中创建流处理器,Go转到Atlas项目的 Stream Processing 页面,然后单击流处理实例窗格中的 Configure。 然后选择使用可视化生成器或JSON编辑器来配置名为 solarDemo 的流处理器:

  1. 单击 Create with visual builder(连接)。

    可视化构建器将打开并显示一个表单,您可以在其中配置流处理器。

  2. Stream processor name 字段中输入 solarDemo

  3. Source字段中,从 Connection 下拉列表中选择 sample_stream_solar

    这会将以下$source阶段添加到聚合管道:

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. 配置 $tumblingWindow 阶段。

    Start building your pipeline 窗格中,单击 + Custom stage,然后将以下 JSON 复制并粘贴到出现的文本框中。这定义了一个带有嵌套$tumblingWindow $group阶段的10 阶段,该阶段得出每个太阳能设备在 秒时间间隔内的最高温度以及最大、最小和平均瓦数。

    这意味着,示例,当 $group 阶段计算 max_watts 的值时,它会从前 10 秒内摄取的具有给定 group_id 的所有文档的 obs.watts 值中提取最大值。

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. Sink字段中,从 Connection 下拉列表中选择 mongodb1

    在出现的文本框中,复制并粘贴以下JSON 。这将配置一个 $merge 阶段,该阶段将处理后的流媒体数据写入到已连接Atlas 集群的 solarDb数据库中名为 solarColl 的集合中:

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. 单击 Create stream processor(连接)。

    流处理器已创建并列在 Stream Processing 页面的 Stream Processors标签页上。

  1. 单击 Use JSON editor(连接)。

    JSON编辑器将打开,并显示一个文本框,您可以在其中以JSON格式配置流处理器。

  2. 定义流处理器。

    将以下 JSON 定义复制并粘贴到JSON编辑器文本框中,以定义一个名为 solarDemo 的流处理器。此流处理器使用带有嵌套 阶段的$tumblingWindow $group阶段来推导每个太阳能设备在10 solarCollsolarDb秒时间间隔内的最高温度以及最大瓦数、最小瓦数和平均瓦数,然后将结果写入到集合已连接Atlas 集群的 数据库中的 。

    这意味着,示例,当 $group 阶段计算 max_watts 的值时,它会从前 10 秒内摄取的具有给定 group_id 的所有文档的 obs.watts 值中提取最大值。

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

mongosh 中运行以下命令以创建名为 solarDemo 的持久流处理器:

  1. 连接到您的Atlas Stream Processing实例。

    使用与您的 实例关联的连接stringAtlas Stream Processing mongosh,通过 进行连接。

    1. 在Atlas Stream Processing实例的窗格中,单击 Connect

    2. Connect to your instance 对话框中,选择 Shell标签页。

    3. 复制对话框中显示的连接字符串。 它采用以下格式,其中 <atlas-stream-processing-url> 是流处理实例的URL ,<username> 是具有 atlasAdmin 角色的数据库用户的用户名:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 将连接字符串粘贴到终端中,并将 <password> 占位符替换为用户的凭证。

      按 Enter运行该程序并连接到您的流处理实例。

  2. 配置 $source 阶段。

    为从 sample_stream_solar 源摄取数据的 $source 阶段定义一个变量。

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. 配置 $group 阶段。

    $group 阶段定义一个变量,该变量根据其 group_id 导出每个太阳能设备的最高温度以及平均瓦数、最大瓦数和最小瓦数。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. 配置 $tumblingWindow 阶段。

    为了流媒体数据执行 $group 等累加操作, Atlas Stream Processing使用Windows来绑定数据集。为 $tumblingWindow 阶段定义一个变量,将流分成连续的 10 秒间隔。

    这意味着,示例,当 $group 阶段计算 max_watts 的值时,它会从前 10 秒内摄取的具有给定 group_id 的所有文档的 obs.watts 值中提取最大值。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. 配置 $merge 阶段。

    $merge 阶段定义一个变量,该阶段将处理后的流媒体数据写入已连接Atlas 集群的 solarDb数据库中名为 solarColl 的集合。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. 创建流处理器。

    使用 sp.createStreamProcessor() 方法为新的流处理器指定名称并声明其聚合管道。$group 阶段属于 $tumblingWindow 的嵌套管道,不得将其包含在处理器管道定义中。

    sp.createStreamProcessor("solarDemo", [s, t, m])

    这将创建一个名为 solarDemo 的流处理器,该处理器应用之前定义的查询,并将处理后的数据写入您连接到的集群上 solarDb 数据库的 solarColl 集合。它返回从太阳能设备的 10 秒间隔观测得出的各种测量值。

    要详细学习;了解Atlas Stream Processing如何写入静态数据库,请参阅 $merge(流处理)。

5

在流处理实例的流处理器列表中,单击流处理器的 Start 图标。

使用 中的sp.processor.start() mongosh方法:

sp.solarDemo.start()
6

要验证流处理器是否正在将数据写入 Atlas 集群:

  1. 在 Atlas 中,进入项目的 Clusters 页面。

    警告: 导航改进正在进行中

    我们目前正在推出改进的全新导航体验。如果以下步骤与Atlas用户界面中的视图不匹配,请参阅预览文档。

    1. 如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含所需项目的组织。

    2. 如果尚未显示,请从导航栏的Projects菜单中选择所需的项目。

    3. 如果尚未出现,请单击侧边栏中的 Clusters(集群)。

      会显示集群页面。

  2. 单击集群的对应 Browse Collections 按钮。

    显示数据浏览器

  3. 查看 MySolar 集合。

要验证处理器是否处于活动状态,请使用 中的sp.processor.stats() mongosh方法:

sp.solarDemo.stats()

此方法报告 solarDemo流处理器的操作统计信息。

您还可以使用sp.processor.sample() 中的mongosh 方法返回终端中已处理文档的样本。

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

注意

前面的输出是一个代表性示例。流数据不是静态的,每个用户看到的都是不同的文档。

7

在流处理实例的流处理器列表中,单击流处理器的 Delete () 图标。

在出现的确认对话框中,键入流处理器的名称 (solarDemo) 以确认要将其删除,然后单击 Delete

使用 中的sp.processor.drop() mongosh方法删除solarDemo

sp.solarDemo.drop()

要确认已删除 solarDemo,请使用 sp.listStreamProcessors() 方法列出所有可用的流处理器:

sp.listStreamProcessors()

了解如何:

后退

Overview

在此页面上