架构基础知识
Atlas Stream Processing的核心抽象是流处理器。流处理器是一个MongoDB 聚合管道,它对来自指定源的流媒体数据持续运行,并将输出写入接收器。要学习;了解详情,请参阅 流处理器的结构。
流处理在流处理实例上进行。每个流处理实例都是一个关联以下内容的 Atlas 命名空间:
一个或多个工作线程,用于提供运行流处理器所需的 RAM 和 CPU。
云提供商和云区域。
连接注册表,用于存储流媒体数据的可用源和接收器列表。
定义用户授权的安全上下文。
实例本身的 string连接 。Atlas Stream Processing
Workers
定义流处理器后,该处理器只能在定义它的流处理实例中使用。每个工作节点最多可以承载四个正在运行的流处理器;当您启动流处理器时,Atlas Stream Processing 会根据需要配置工作节点,从而自动扩展您的流处理实例。您可以通过停止工作节点上的所有流处理器来取消预配该工作节点。Atlas Stream Processing 始终倾向于将流处理器分配给现有工作节点,而不是配置新的工作节点。
例子
您有一个Atlas Stream Processing实例,运行八个流处理器,分别命名为 proc01 到 proc08。 proc01到proc04在一个工作线程上运行, proc05到proc08在第二个工作线程上运行。 您启动一个名为proc09的新流处理器。 Atlas Stream Processing 预配第三个工作线程来托管proc09 。
后来,您在第一个工作线程上停止了 proc03。当您停止 proc09 并重新启动它时,Atlas Stream Processing 会将 proc09 重新分配给第一个工作线程,并取消配置第三个工作线程。
如果您在停止并重新启动 proc09 之前启动名为 proc10 的新流处理器,则 Atlas Stream Processing 会将 proc10 分配给先前分配给 proc03 的槽位中的第一个工作线程。
在扩展时,Atlas Stream Processing 仅考虑当前正在运行的流处理器的数量;它不计算未运行的定义流处理器。流处理实例的层级决定了其工作线程的 RAM 和 CPU 分配。
连接注册表
连接注册表存储一个或多个连接。每个连接都会为网络和安全细节的组合分配一个名称,从而允许流处理器与外部服务进行交互。连接表现出以下行为:
只有在特定流处理实例的连接注册表中定义的连接才能为该流处理实例上托管的流处理器提供服务。
每个连接可以为任意数量的流处理器提供服务
只有一个连接可以作为给定流处理器的源。
只有单个连接可以用作给定流处理器的接收器。
连接并非天生就被定义为源或接收器。任何给定的连接都可以提供任一功能,具体取决于流处理器如何调用该连接。
Atlas Stream Processing在多租户基础架构上的专用客户容器中运行Atlas Stream Processing工作线程。 有关 MongoDB 安全性和合规性的更多信息,请参阅MongoDB 信任中心。
检查点
Atlas Stream Processing使用检查点捕获流处理器的状态。每个检查点都有一个唯一的ID ,并受流处理器逻辑流的约束。在流处理器的所有操作符将其状态添加到检查点后, Atlas Stream Processing提交检查点,生成两种类型的记录:
一条提交记录,用于验证检查点ID及其所属的流处理器
一组记录,描述 Atlas Stream Processing 提交检查点时相关流处理器中每个有状态操作的状态。
当您在中断后重新启动流处理器时,Atlas Stream Processing 会查询最后提交的检查点并从所述状态恢复操作。
死信队列(DLQ)
Atlas Stream Processing 支持使用 Atlas 数据库集合作为死信队列(DLQ)。当 Atlas Stream Processing 无法处理数据流中的文档时,它会将文档的内容和处理失败的详细信息写入 DLQ。您可以在流处理器定义中将集合指定为 DLQ。
要了解更多信息,请参阅创建流处理器。
Atlas Stream Processing时序
在流式数据处理中,文档受两个计时系统的约束:
事件时间
处理时间
Atlas Stream Processing提供各种参数来控制流处理器与这些计时系统的交互。
事件时间
事件时间是指流生成文档或消息传递系统(例如Apache Kafka)接收文档。这是通过文档的时间戳来确定的。
网络延迟、上游处理和其他因素不仅会导致给定文档的这些时间之间存在差异,而且还可能导致文档不按事件时间顺序到达流处理器。无论哪种情况,Windows 都可能错过您打算让其捕获的文档。Atlas Stream Processing会将此类文档视为延迟到达,并将其发送到死信队列(如果您配置了死信队列)。
处理时间
处理时间是流处理器使用文档的时间。这由托管流处理器的系统时钟确定。
处理时间是滚动Windows和跳跃Windows支持的boundary字段的可配置选项。它允许您创建带有一种窗口的管道,该窗口根据服务器的挂钟时间累积数据。与事件时间 Windows不同,处理时间Windows在每个事件到达流处理器时根据服务器的挂钟时间为每个事件分配一个时间戳。
文档时间戳和窗口边界时间戳采用 UTC 时间。配置 窗口时不能指定 idleTimeout 或 allowedLatenessprocessingTime 选项。
例子
您创建一个具有 5 分钟事件时间窗口的管道。将一个事件添加到 09:33 处的源 Kafka 集群。由于 Kafka 集群存在一些延迟,它在 09:37 到达流处理器。
如果管道有一个 5 分钟的事件时间窗口,则此事件将被分配到 09:30-09:35 窗口。如果管道有一个 5 分钟的处理时间窗口,则事件将被分配到 09:35-09:40 窗口。
水印
水印会取代处理时间,并且仅当处理器消耗的文档的事件时间晚于任何先前消耗的文档时进行更新。所有流处理器都会在Atlas Stream Processing中应用水印。
例子
您可以使用 5 分钟的Windows配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05 和 12:05-12:10。 下表说明了在不同延迟下(带水印和不带水印),哪些Windows将捕获哪些事件。
事件时间 | 处理时间 | 窗口时间(无水印) | 窗口时间(水印) |
|---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 03 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 05 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 06 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
如果没有水印,12:00-12:05窗口将根据流处理实例的系统时钟在 12:05 关闭,并立即打开 12:05-12:10窗口。 因此,尽管源在 12:00-12:05 间隔内生成了其中四个文档,但相关窗口仅捕获两个文档。
使用水印时,12:00-12:05窗口不会在 12:05 关闭,因为在它截至该点摄取的文档中,最新事件时间(因此水印值)为 12:03。12:00-12:05窗口直到系统时钟的 12:07 才会关闭,此时流处理器接收事件时间为 12:05 的文档,将水印前进到该时间,并打开 12:05-12:10窗口。每个窗口都会捕获所有相应的文档。
从Apache Kafka读取时, Atlas会等待所有分区都通过水印。如果分区空闲且无法生成时间戳晚于水印的事件,则该窗口不会关闭或输出结果。要解决此问题,设立partitionIdleTimeout 以确保空闲分区不会停止水印的进程。要学习;了解详情,请参阅 $source 阶段(流处理)。
允许迟到
如果事件时间和处理时间之间的差异足够大,则在水印已提前到足以关闭预期窗口后,文档可能会到达流处理器。 为了缓解这个问题,Atlas Stream Processing 支持“允许延迟”,该设置可将窗口关闭延迟相对于水印的设定时间间隔。
水印是流处理器的属性,而“允许延迟”是窗口的属性,并且仅在该窗口关闭时才会产生影响。如果流处理器的水印前进到会触发打开新窗口的点,则“允许延迟”会使较早的窗口保持打开状态,而不会阻止此操作。
例子
您可以使用 5 分钟的滚动Windows来配置流处理器。 您在 12:00 启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05 和 12:05-12:10。 您将允许迟到时间设置为2分钟。
下表反映了流处理器摄取所述文件的顺序。
事件时间 | 水印 | 允许迟到时间 | 窗口时间 |
|---|---|---|---|
12 : 00 | 12 : 00 | 11 : 58 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 05 | 12 : 03 | 12:00-12:15, 12:05-12:10 |
12 : 04 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 07 | 12 : 07 | 12 : 05 | 12 : 05 - 12 : 10 |
当水印前进到 12:05 时,12:05-12:10 窗口将打开。但是,由于“允许延迟”间隔为 2 分钟,在 12:00-12:05 窗口中,它实际上仅为 12:03,因此它保持打开状态。仅当水印前进到 12:07 时,调整后的时间才会达到 12:05 。此时,12:00-12:05 窗口将关闭。
空闲超时
默认情况下将窗口行为与处理时间分离可以在大多数情况下提高流处理的正确性。但是,流数据源可能会有长时间的空闲状态。在这种情况下,窗口可能会在空闲期之前捕获事件,并且在等待水印前进到足以关闭时无法返回处理后的结果。
Atlas Stream Processing允许用户为Windows配置空闲超时,以缓解这些使用处理时间的情况。 空闲超时是指当处理时间超过打开窗口时间间隔的终点并且流处理器的源处于空闲状态时开始的时间间隔。如果源保持空闲的时间间隔等于空闲超时时间,则窗口将关闭,并且水印独立于任何文档摄取而前进。
例子
您可以配置具有 3 分钟间隔和 1 分钟空闲超时的滚动窗口。下表说明了窗口间隔期间和之后空闲超时的影响。
处理时间 | 事件时间或状态 | 水印 | 窗口时间 |
|---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 01 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 02 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 03 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 04 | 12 : 02 | 12 : 02 | 12 : 00 - 12 : 03 |
12 : 05 | 12 : 05 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 06 | 源空闲 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 07 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 08 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 09 | 12 : 09 | 12 : 09 | 12 : 09 - 12 : 12 |
在 12:00-12:03 间隔期间,源空闲三分钟,但流处理器不会关闭窗口,因为处理时间未超过窗口间隔的结束时间,并且源在窗口间隔结束后不会保持空闲状态。当水印前进到 12:05 时,窗口将正常关闭, 12:03-12:06 窗口将打开。
当源在12:06进入空闲状态时,它会在12:07之前保持空闲状态,从而触发空闲超时并将水印提前到12:06 。