Docs 菜单
Docs 主页
/ /

Atlas Stream Processing 架构

Atlas Stream Processing的核心抽象是流处理器。流处理器是一个MongoDB 聚合管道,它对来自指定源的流媒体数据持续运行,并将输出写入接收器。要学习;了解详情,请参阅 流处理器的结构。

Stream processing 发生在 stream processing 工作区中。每个流处理工作区都是一个 Atlas 命名空间,关联以下内容:

  • 一个或多个流处理器,每个流处理器运行在自己分配的 RAM 和 CPU 上。

  • 一个默认层级,它确定在不指定层级时可提供给每个流处理器的内存量和计算量。

  • 一个最大层级,用于确定可分配给该 stream processing 工作区中 pod 的最大内存和计算量。

  • 云提供商和云区域。

  • 连接注册表,用于存储流媒体数据的可用源和接收器列表。

  • 定义用户授权的安全上下文。

  • 一个 连接字符串 到 stream processing 工作空间本身。

定义流处理器后,它仅可用于定义它的 stream processing 工作区。每个流处理器都在根据其层级分配的资源上运行。Atlas Stream Processing仅在 Stream Processing 运行时向用户收取费用。

如果您启动流处理器而未声明层级大小,它将运行流处理工作区的默认层级。您可以启动任何层级的流处理器,直至包括流处理工作区的最大层级。

例子

您在名为 myWorkspace 的流处理工作区上定义了一个流处理器,默认层级为 SP10,最大层级为 SP30。如果您启动处理器而未指定层级,Atlas Stream Processing 会将其分配给 SP10 pod。但是,您可以声明从 SP2SP30 的任何层级,Atlas Stream Processing 会将处理器分配到适当大小的 pod 中。

每个工作线程最多可以托管四个运行的流处理器。在旧版工作线程模型上运行的流处理工作区会根据工作线程的数量向用户计费。Atlas Stream Processing在您启动流处理器时,通过按需预配工作线程来自动扩展您的流处理工作区。您可以通过停止工作线程上的所有流处理器来取消配置工作线程。Atlas Stream Processing始终更愿意为现有工作线程分配流处理器,而不是预配新的工作线程。

例子

您有一个流处理工作区,运行八个流处理器,分别名为 proc01proc08proc01proc04 在一个工作线程上运行,proc05proc08 在第二个工作线程上运行。启动名为 proc09 的新流处理器。Atlas Stream Processing 预配第三个工作线程来托管 proc09

后来,您在第一个工作线程上停止了 proc03。当您停止 proc09 并重新启动它时,Atlas Stream Processing 会将 proc09 重新分配给第一个工作线程,并取消配置第三个工作线程。

如果您在停止并重新启动 proc09 之前启动名为 proc10 的新流处理器,则 Atlas Stream Processing 会将 proc10 分配给先前分配给 proc03 的槽位中的第一个工作线程。

在扩展时,Atlas Stream Processing 仅考虑当前正在运行的流处理器的数量;它不计算未运行的定义流处理器。流处理工作区的层级决定了其工作单元的RAM和CPU分配。

重要

SP10SP30 处理器根据旧版工作线程模型进行操作并向用户计费。这些处理器于 12 月 2025更新为按处理器定价模式。3要学习更多信息,请参阅Atlas Stream Processing架构概述中的工作线程模型部分。

连接注册表存储一个或多个连接。每个连接都会为网络和安全细节的组合分配一个名称,从而允许流处理器与外部服务进行交互。连接表现出以下行为:

  • 只有在特定流处理工作区的连接注册表中定义的连接,才能为该流处理工作区上托管的流处理器提供服务。

  • 每个连接可以为任意数量的流处理器提供服务

  • 只有一个连接可以作为给定流处理器的源。

  • 只有单个连接可以用作给定流处理器的接收器。

  • 连接并非天生就被定义为源或接收器。任何给定的连接都可以提供任一功能,具体取决于流处理器如何调用该连接。

Atlas Stream Processing 在多租户基础设施上运行专用客户容器中的 stream processing pods。有关 MongoDB 安全性和合规的更多信息,请参阅MongoDB 信任中心。

Atlas Stream Processing使用检查点捕获流处理器的状态。每个检查点都有一个唯一的ID ,并受流处理器逻辑流的约束。在流处理器的所有操作符将其状态添加到检查点后, Atlas Stream Processing提交检查点,生成两种类型的记录:

  • 一条提交记录,用于验证检查点ID及其所属的流处理器

  • 一组记录,描述 Atlas Stream Processing 提交检查点时相关流处理器中每个有状态操作的状态。

当您在中断后重新启动流处理器时,Atlas Stream Processing 会查询最后提交的检查点并从所述状态恢复操作。

Atlas Stream Processing 支持使用 Atlas 数据库集合作为死信队列(DLQ)。当 Atlas Stream Processing 无法处理数据流中的文档时,它会将文档的内容和处理失败的详细信息写入 DLQ。您可以在流处理器定义中将集合指定为 DLQ。

要了解更多信息,请参阅创建流处理器。

在流式数据处理中,文档受两个计时系统的约束:

  • 事件时间

  • 处理时间

Atlas Stream Processing提供各种参数来控制流处理器与这些计时系统的交互。

事件时间是指流生成文档或消息传递系统(例如Apache Kafka)接收文档。这是通过文档的时间戳来确定的。

网络延迟、上游处理和其他因素不仅会导致给定文档的这些时间之间存在差异,而且还可能导致文档不按事件时间顺序到达流处理器。无论哪种情况,Windows 都可能错过您打算让其捕获的文档。Atlas Stream Processing会将此类文档视为延迟到达,并将其发送到死信队列(如果您配置了死信队列)。

事件时间滚动Windows跳跃Windows支持的 boundary字段的可配置选项。

处理时间是流处理器使用文档的时间。这由托管流处理器的系统时钟确定。

处理时间滚动Windows跳跃Windows支持的boundary字段的可配置选项。它允许您创建带有一种窗口的管道,该窗口根据服务器的挂钟时间累积数据。与事件时间 Windows不同,处理时间Windows在每个事件到达流处理器时根据服务器的挂钟时间为每个事件分配一个时间戳。

文档时间戳和窗口边界时间戳采用 UTC 时间。配置 窗口时不能指定 idleTimeout allowedLatenessprocessingTime 选项。

例子

您创建一个具有 5 分钟事件时间窗口的管道。将一个事件添加到 09:33 处的源 Kafka 集群。由于 Kafka 集群存在一些延迟,它在 09:37 到达流处理器。

如果管道有一个 5 分钟的事件时间窗口,则此事件将被分配到 09:30-09:35 窗口。如果管道有一个 5 分钟的处理时间窗口,则事件将被分配到 09:35-09:40 窗口。

水印会取代处理时间,并且仅当处理器消耗的文档的事件时间晚于任何先前消耗的文档时进行更新。所有流处理器都会在Atlas Stream Processing中应用水印。

您可以使用 5 分钟的Windows配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:0512:05-12:10。 下表说明了在不同延迟下(带水印和不带水印),哪些Windows将捕获哪些事件。

事件时间
处理时间
窗口时间(无水印)
窗口时间(水印)

12 : 00

12 : 00

12 : 00 - 12 : 05

12 : 00 - 12 : 05

12 : 01

12 : 03

12 : 00 - 12 : 05

12 : 00 - 12 : 05

12 : 02

12 : 05

12 : 05 - 12 : 10

12 : 00 - 12 : 05

12 : 01

12 : 06

12 : 05 - 12 : 10

12 : 00 - 12 : 05

12 : 06

12 : 07

12 : 05 - 12 : 10

12 : 05 - 12 : 10

在没有水印的情况下,根据流处理工作区的系统时钟,12:00-12:05 窗口会在12:05 关闭,而12:05-12:10 窗口会立即打开。因此,尽管源在12:00-12:05期间生成了 4 个文档,但相关窗口仅捕获了 2 个文档。

使用水印时,12:00-12:05窗口不会在 12:05 关闭,因为在它截至该点摄取的文档中,最新事件时间(因此水印值)为 12:0312:00-12:05窗口直到系统时钟的 12:07 才会关闭,此时流处理器接收事件时间为 12:05 的文档,将水印前进到该时间,并打开 12:05-12:10窗口。每个窗口都会捕获所有相应的文档。

从Apache Kafka读取时, Atlas会等待所有分区都通过水印。如果分区空闲且无法生成时间戳晚于水印的事件,则该窗口不会关闭或输出结果。要解决此问题,设立partitionIdleTimeout 以确保空闲分区不会停止水印的进程。要学习;了解详情,请参阅 $source 阶段(流处理)。

如果事件时间和处理时间之间的差异足够大,则在水印已提前到足以关闭预期窗口后,文档可能会到达流处理器。 为了缓解这个问题,Atlas Stream Processing 支持“允许延迟”,该设置可将窗口关闭延迟相对于水印的设定时间间隔。

水印是流处理器的属性,而“允许延迟”是窗口的属性,并且仅在该窗口关闭时才会产生影响。如果流处理器的水印前进到会触发打开新窗口的点,则“允许延迟”会使较早的窗口保持打开状态,而不会阻止此操作。

您可以使用 5 分钟的滚动Windows来配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:0512:05-12:10。 您将允许迟到时间设置为2分钟。

下表反映了流处理器摄取所述文件的顺序。

事件时间
水印
允许迟到时间
窗口时间

12 : 00

12 : 00

11 : 58

12 : 00 - 12 : 05

12 : 02

12 : 03

12 : 01

12 : 00 - 12 : 05

12 : 01

12 : 04

12 : 02

12 : 00 - 12 : 05

12 : 05

12 : 05

12 : 03

12:00-12:15, 12:05-12:10

12 : 04

12 : 06

12 : 04

12:00-12:05, 12:05-12:10

12 : 07

12 : 07

12 : 05

12 : 05 - 12 : 10

当水印前进到 12:05 时,12:05-12:10 窗口将打开。但是,由于“允许延迟”间隔为 2 分钟,在 12:00-12:05 窗口中,它实际上仅为 12:03,因此它保持打开状态。仅当水印前进到 12:07 时,调整后的时间才会达到 12:05 。此时,12:00-12:05 窗口将关闭。

默认情况下将窗口行为与处理时间分离可以在大多数情况下提高流处理的正确性。但是,流数据源可能会有长时间的空闲状态。在这种情况下,窗口可能会在空闲期之前捕获事件,并且在等待水印前进到足以关闭时无法返回处理后的结果。

Atlas Stream Processing允许用户为Windows配置空闲超时,以缓解这些使用处理时间的情况。 空闲超时是指当处理时间超过打开窗口时间间隔的终点并且流处理器的源处于空闲状态时开始的时间间隔。如果源保持空闲的时间间隔等于空闲超时时间,则窗口将关闭,并且水印独立于任何文档摄取而前进。

您可以配置具有 3 分钟间隔和 1 分钟空闲超时的滚动窗口。下表说明了窗口间隔期间和之后空闲超时的影响。

处理时间
事件时间或状态
水印
窗口时间

12 : 00

12 : 00

12 : 00

12 : 00 - 12 : 03

12 : 01

源空闲

12 : 00

12 : 00 - 12 : 03

12 : 02

源空闲

12 : 00

12 : 00 - 12 : 03

12 : 03

源空闲

12 : 00

12 : 00 - 12 : 03

12 : 04

12 : 02

12 : 02

12 : 00 - 12 : 03

12 : 05

12 : 05

12 : 05

12 : 03 - 12 : 06

12 : 06

源空闲

12 : 05

12 : 03 - 12 : 06

12 : 07

源空闲

12 : 00

12 : 06 - 12 : 09

12 : 08

源空闲

12 : 00

12 : 06 - 12 : 09

12 : 09

12 : 09

12 : 09

12 : 09 - 12 : 12

12:00-12:03 间隔期间,源空闲三分钟,但流处理器不会关闭窗口,因为处理时间未超过窗口间隔的结束时间,并且源在窗口间隔结束后不会保持空闲状态。当水印前进到 12:05 时,窗口将正常关闭, 12:03-12:06 窗口将打开。

当源在12:06进入空闲状态时,它会在12:07之前保持空闲状态,从而触发空闲超时并将水印提前到12:06

后退

开始体验

在此页面上