流处理器 Windows
Atlas Stream Processing 窗口是聚合管道阶段,用于捕获数据流的有时间限制的子集,允许您对流数据执行需要有限输入的操作。
请考虑此处描述的示例流处理器。 $match
阶段可以直接对$source
拉入的数据流进行操作,在流处理器摄取每个文档时对照匹配条件检查每个文档。
相比之下, $group
阶段及其中包含的各种统计计算无法对无界数据进行操作,因为在不首先限制要考虑的值集的情况下,无法确定最小值、最大值、平均值或中值。许多非数学运算符(例如$push和$top )也需要有界数据。
流处理器为这些边界提供了一个窗口。将打开一个窗口,流处理器摄取的所有文档都会以该窗口的状态累积,直到经过预定义的时间间隔并且窗口关闭。该窗口会对该时间间隔内捕获的所有文档进行批处理,并将该集合通过其内部管道传递。在此管道中,批处理文档与静态数据无法区分。
Atlas Stream Processing 提供对Tumbling Windows和Hopping Windows 的支持。
翻滚 Windows
翻滚Windows是完全由其捕获的时间间隔定义的Windows 。 这些时间间隔不重叠。
例子
您可以定义间隔为 3 秒的滚动窗口。 当您启动流处理器时:
窗口会打开 3 秒钟。
第一个窗口捕获流在这 3 秒内生成的所有文档。
3 秒后,窗口关闭并将聚合逻辑应用于该窗口中的所有文档。
如果您配置
allowedLateness
,则 Atlas Stream Processing 会在窗口关闭后将迟到的消息写入死信队列。第一个窗口关闭后会立即打开一个新窗口,并在接下来的 3 秒内捕获数据流中的文档。
滚动窗口可确保全面捕获数据流,而无需重复处理单个文档。
跳跃Windows
跳跃Windows Windows由捕获的时间间隔和打开每个窗口之间的间隔(称为跳跃)定义的窗口。 由于持续时间与频率分离,因此您可以将跳跃Windows配置为重叠或彼此间隔开。
要定义重叠的跳跃窗口,请设置小于间隔的跳跃。
例子
您可以定义间隔为 20 秒、跳跃为 5 秒的跳跃窗口。 当您启动流处理器时:
窗口会打开 20 秒。
第一个窗口捕获流在这 20 秒内生成的所有文档。
5 秒后,另一个窗口将打开并捕获接下来 20 秒内的所有文档。 由于第一个窗口仍处于打开状态,因此流在接下来的 15 秒内生成的所有文档都会被两个窗口捕获。
第一个窗口在打开 20 秒后关闭,并将聚合逻辑应用于该窗口中的所有文档。
5 秒后,第二个窗口将关闭,并将聚合逻辑应用于该窗口中的所有文档,包括在第一个窗口中已受聚合逻辑约束的文档。
如果您配置allowedLateness
,则 Atlas Stream Processing 会在窗口关闭后将迟到的消息写入死信队列。
要定义带间距的跳跃窗口,请设置大于间隔的跳跃。
例子
您可以定义一个间隔为 3 秒、跳跃为 5 秒的跳跃窗口。 启动流处理器时:
窗口会打开 3 秒钟。
第一个窗口捕获接下来 3 秒的所有文档。
3 秒后,窗口关闭并将聚合逻辑应用于该窗口中的所有文档。
再过 2 秒后,下一个窗口将打开。
Atlas Stream Processing 不会处理该流在这两秒内生成的任何文档。
Atlas Stream Processing时序
在流数据处理中,文档受两个计时系统的约束:
事件时间 — 源流生成文档的时间,或消息系统(例如 Apache Kafka )接收文档。这是通过文档的时间戳来确定的。
处理时间 — 流处理器使用文档的时间。 这是通过托管流处理器的系统的时钟来确定的。
网络延迟、上游处理和其他因素不仅会导致给定文档的这些时间之间存在差异,而且还可能导致文档不按事件时间顺序到达流处理器。无论哪种情况,Windows 都可能错过您打算让其捕获的文档。 Atlas Stream Processing 会将此类文档视为延迟到达,并将其发送到死信队列(如果您配置了死信队列)。
Atlas Stream Processing 提供了各种更改窗口行为的机制,以缓解这些问题。
水印
水印会取代处理时间,并且仅当处理器消耗的文档的事件时间晚于任何先前消耗的文档时才会进行更新。 所有流处理器都会在 Atlas Stream Processing 中应用水印。
例子
您可以使用 5 分钟的Windows配置流处理器。 您在 12:00
启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05
和 12:05-12:10
。 下表说明了在不同延迟下(带水印和不带水印),哪些Windows将捕获哪些事件。
活动时间 | 处理时间 | 窗口时间(无水印) | 窗口时间(水印) |
---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 00 | 12 : 01 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 03 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 03 | 12 : 04 | 12 : 00 - 12 : 05 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 05 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 04 | 12 : 06 | 12 : 05 - 12 : 10 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
12 : 06 | 12 : 07 | 12 : 05 - 12 : 10 | 12 : 04 - 12 : 10 |
12 : 06 | 12 : 08 | 12 : 05 - 12 : 10 | 12 : 05 - 12 : 10 |
在不应用水印的情况下,12:00-12:05
窗口会根据Atlas Stream Processing实例的系统时钟在 12:05
关闭,并立即打开 12:05-12:10
窗口。 因此,尽管源在12:00-12:05
间隔内生成了其中的七个文档,但相关窗口仅捕获了四个文档。
在水印确实适用的情况下, 12:00-12:05
窗口不会在12:05
关闭,因为在它截至该点摄取的文档中,最新事件时间(因此水印值)为12:03
。 12:00-12:05
窗口直到系统时钟的12:07
才会关闭,此时流处理器接收事件时间为12:05
的文档,将水印提前到该时间,并打开12:05-12:10
窗口。每个窗口都会捕获所有相应的文档。
允许迟到
如果事件时间和处理时间之间的差异足够大,则在水印已提前到足以关闭预期窗口后,文档可能会到达流处理器。 为了缓解这个问题,Atlas Stream Processing 支持“允许延迟”,该设置可将窗口关闭延迟相对于水印的设定时间间隔。
水印是流处理器的属性,而“允许延迟”是窗口的属性,并且仅在该窗口关闭时影响。如果流处理器的水印前进到会触发打开新窗口的程度,则“允许延迟”会使较早的窗口保持打开状态,而不会阻止此操作。
例子
您可以使用 5 分钟的滚动Windows来配置流处理器。 您在 12:00
启动处理器,因此前两个Windows的持续时间分别为 12:00-12:05
和 12:05-12:10
。 您将允许迟到时间设置为2分钟。
下表反映了流处理器摄取所述文档的顺序。
活动时间 | 水印 | 允许迟到时间 | 窗口时间 |
---|---|---|---|
12 : 00 | 12 : 00 | 11 : 58 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 01 | 11 : 59 | 12 : 00 - 12 : 05 |
12 : 03 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 02 | 12 : 03 | 12 : 01 | 12 : 00 - 12 : 05 |
12 : 04 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 01 | 12 : 04 | 12 : 02 | 12 : 00 - 12 : 05 |
12 : 05 | 12 : 05 | 12 : 03 | 12:00-12:15, 12:05-12:10 |
12 : 06 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 04 | 12 : 06 | 12 : 04 | 12:00-12:05, 12:05-12:10 |
12 : 07 | 12 : 07 | 12 : 05 | 12 : 05 - 12 : 10 |
当水印前进到12:05
时,将打开12:05-12:10
窗口。但是,由于“允许的延迟时间间隔”为2分钟,在12:00-12:05
窗口内,它实际上仅为12:03
分钟,因此它保持打开状态。仅当水印前进到12:07
时,调整后的时间才会达到12:05
。此时, 12:00-12:05
窗口将关闭。
空闲超时
默认情况下,将窗口行为与处理时间解耦可以提高大多数情况下流处理的正确性。但是,流数据源可能会有长时间的空闲状态。在这种情况下,窗口可能会在空闲期之前捕获事件,并且在等待水位线前进到足以关闭时无法返回处理后的结果。
Atlas Stream Processing允许用户为Windows配置空闲超时,以缓解这些使用处理时间的情况。 空闲超时是指当处理时间超过打开窗口时间间隔的终点并且流处理器的源处于空闲状态时开始的时间间隔。 如果源保持空闲的时间间隔等于空闲超时时间,则窗口将关闭,并且水印独立于任何文档摄取而前进。
例子
您可以配置一个滚动窗口,其间隔为3分钟,空闲超时为1分钟。下表说明了窗口间隔期间和之后空闲超时的影响。
处理时间 | 事件时间或状态 | 水印 | 窗口时间 |
---|---|---|---|
12 : 00 | 12 : 00 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 01 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 02 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 03 | 源空闲 | 12 : 00 | 12 : 00 - 12 : 03 |
12 : 04 | 12 : 02 | 12 : 02 | 12 : 00 - 12 : 03 |
12 : 05 | 12 : 05 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 06 | 源空闲 | 12 : 05 | 12 : 03 - 12 : 06 |
12 : 07 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 08 | 源空闲 | 12 : 00 | 12 : 06 - 12 : 09 |
12 : 09 | 12 : 09 | 12 : 09 | 12 : 09 - 12 : 12 |
在12:00-12:03
间隔内,源空闲了三分钟,但流处理器不会关闭窗口,因为处理时间尚未超过窗口间隔结束,并且源在窗口间隔结束后不会保持空闲状态。当水印前进到12:05
时,窗口正常关闭,然后12:03-12:06
窗口打开。
当源在12:06
进入空闲状态时,它会在12:07
之前保持空闲状态,从而触发空闲超时并将水印提前到12:06
。