Docs Menu
Docs Home
/
MongoDB Atlas
/

Stream Processor Windows

On this page

  • Tumbling Windows
  • Hopping Windows
  • Stream Processing Timing
  • Watermarks
  • Allowed Lateness
  • Idleness Timeout

Atlas Stream Processing windows are aggregation pipeline stages that capture time-bounded subsets of a data stream, allowing you to perform operations that require finite inputs on streaming data.

Consider the example stream processor described here. The $match stage can operate directly on the stream of data pulled in by $source, checking each document against the match criteria as the stream processor ingests it.

By contrast, the $group stage and the various statistical computations contained within it cannot operate on unbounded data, as it is impossible to determine minimum, maximum, average, or median values without first bounding the set of values to consider. Many non-mathematical operators such as $push and $top also require bounded data.

A stream processor provides these bounds with a window. A window opens, and all documents that the stream processor ingests accumulate in that window's state until a predefined interval of time elapses and the window closes. The window batches all documents it captures during that interval, and passes this set through its internal pipeline. From within this pipeline, the batched documents are indistinguishable from data-at-rest.

Atlas Stream Processing provides support for Tumbling Windows and Hopping Windows.

Tumbling windows are windows defined entirely by the time intervals they capture. These time intervals don't overlap.

Example

You define a tumbling window with an interval of 3 seconds. When you start your stream processor:

  • A window opens for 3 seconds.

  • The first window captures all documents that the stream generates within those 3 seconds.

  • After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.

    If you configure allowedLateness, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the close of the window.

  • A new window opens as soon as the first one closes and captures documents from the stream for the next 3 seconds.

Tumbling windows ensure comprehensive capture of data streams without repeated processing of individual documents.

Hopping windows are windows defined by the time interval they capture and the interval between opening each window, called the hop. Since duration is decoupled from frequency, you can configure hopping windows to overlap or to space themselves apart.

To define a hopping window with overlap, set a hop smaller than the interval.

Example

You define a hopping window with an interval of 20 seconds and a hop of 5 seconds. When you start your stream processor:

  • A window opens for 20 seconds.

  • The first window captures all documents that the stream generates within those 20 seconds.

  • 5 seconds later, another window opens and captures all documents within the next 20 seconds. Because the first window is still open, all documents that the stream generates for the next 15 seconds are captured by both windows.

  • 20 seconds after the first window opened, it closes and applies your aggregation logic to all the documents in that window.

  • 5 seconds later, the second window closes and applies your aggregation logic to all the documents in that window, including those that were already subject to aggregation logic in the first window.

If you configure allowedLateness, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the close of the window.

To define a hopping window with spacing, set a hop larger than the interval.

Example

You define a hopping window with an interval of 3 seconds and a hop of 5 seconds. When you start a stream processor:

  • A window opens for 3 seconds.

  • The first window captures all documents for the next 3 seconds.

  • After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.

  • The next window opens after a further 2 seconds elapse.

  • Atlas Stream Processing does not process any documents that the stream generates during those 2 seconds.

In streaming data processing, documents are subject to two timing systems:

Network latency, upstream processing, and other factors can not only cause discrepancies between these times for a given document, but can also cause documents to arrive in a stream processor out of event-time order. In either case, windows can miss documents that you intend for them to capture. Atlas Stream Processing considers such documents late-arriving, and sends them to your Dead Letter Queue if you configure one.

Atlas Stream Processing offers various mechanisms which change window behavior to mitigate these issues.

A watermark supersedes processing time and updates only when the processor consumes a document with a later event time than any previously-consumed document. All stream processors apply watermarks in Atlas Stream Processing.

You configure a stream processor with 5-minute windows. You start the processor at 12:00, so that the first two windows will have durations of 12:00-12:05 and 12:05-12:10. The following table illustrates which windows will capture which events given varying delays, with and without watermarks.

Event Time
Processing Time
Window Time (No Watermarks)
Window Time (Watermarks)
12:00
12:00
12:00-12:05
12:00-12:05
12:00
12:01
12:00-12:05
12:00-12:05
12:01
12:03
12:00-12:05
12:00-12:05
12:03
12:04
12:00-12:05
12:00-12:05
12:02
12:05
12:05-12:10
12:00-12:05
12:01
12:06
12:05-12:10
12:00-12:05
12:04
12:06
12:05-12:10
12:00-12:05
12:05
12:07
12:05-12:10
12:05-12:10
12:06
12:07
12:05-12:10
12:04-12:10
12:06
12:08
12:05-12:10
12:05-12:10

In the scenario where watermarks don't apply, the 12:00-12:05 window closes at 12:05 according to the system clock of the stream processing instance, and the 12:05-12:10 window opens immediately. As a result, though the source generated seven of the documents during the 12:00-12:05 interval, the relevant window captures only four documents.

In the scenario where watermarks do apply, the 12:00-12:05 window doesn't close at 12:05 because among the documents it ingests up to that point, the latest event time—and thus the watermark value—is 12:03. The 12:00-12:05 window doesn't close until 12:07 on the system clock, when the stream processor ingests a document with an event time of 12:05, advances the watermark to that time, and opens the 12:05-12:10 window. Each window captures all of the appropriate documents.

If the differences between event time and processing time vary sufficiently, documents may arrive in a stream processor after the watermark has advanced enough to close the expected window. To mitigate this, Atlas Stream Processing supports Allowed Lateness, a setting which delays a window closing by a set interval relative to the watermark.

While watermarks are properties of stream processors, Allowed Lateness is a property of a window, and only affects when that window closes. If the stream processor's watermark advances to a point that would trigger a new window to open, Allowed Lateness keeps earlier windows open without preventing this.

You configure a stream processor with 5-minute tumbling windows. You start the processor at 12:00, so that the first two windows will have durations of 12:00-12:05 and 12:05-12:10. You set an Allowed Lateness of 2 minutes.

The table below reflects the order in which the stream processor ingests the described documents.

Event Time
Watermark
Allowed Lateness Time
Window Time
12:00
12:00
11:58
12:00-12:05
12:01
12:01
11:59
12:00-12:05
12:03
12:03
12:01
12:00-12:05
12:02
12:03
12:01
12:00-12:05
12:04
12:04
12:02
12:00-12:05
12:01
12:04
12:02
12:00-12:05
12:05
12:05
12:03
12:00-12:15, 12:05-12:10
12:06
12:06
12:04
12:00-12:05, 12:05-12:10
12:04
12:06
12:04
12:00-12:05, 12:05-12:10
12:07
12:07
12:05
12:05-12:10

When the watermark advances to 12:05, the 12:05-12:10 window opens. However, because the Allowed Lateness interval is 2 minutes, from within the 12:00-12:05 window, it is effectively only 12:03, so it stays open. Only when the watermark advances to 12:07 does the adjusted time reach 12:05. At this point, the 12:00-12:05 window closes.

Decoupling windowing behavior from processing time by default improves stream processing correctness in most cases. However, a streaming data source may have periods of extended idleness. In this scenario, a window may capture events prior to the period of idleness and be unable to return processed results while waiting for the watermark to advance enough to close.

Atlas Stream Processing allows users to configure an idleness timeout for windows to mitigate these scenarios using processing time. An idleness timeout is an interval of time that begins when processing time passes the end of an open window's interval, and the stream processor's source is idle. If the source remains idle for an interval equal to the idleness timeout, the window closes and the watermark advances independent of any document ingestion.

You configure a tumbling window with a 3-minute interval and a 1-minute idleness timeout. The following table illustrates the effects of the idleness timeout during and after a window's interval.

Processing Time
Event Time or Status
Watermark
Window Time
12:00
12:00
12:00
12:00-12:03
12:01
Source idle
12:00
12:00-12:03
12:02
Source idle
12:00
12:00-12:03
12:03
Source idle
12:00
12:00-12:03
12:04
12:02
12:02
12:00-12:03
12:05
12:05
12:05
12:03-12:06
12:06
Source idle
12:05
12:03-12:06
12:07
Source idle
12:00
12:06-12:09
12:08
Source idle
12:00
12:06-12:09
12:09
12:09
12:09
12:09-12:12

During the 12:00-12:03 interval, the source idles for three minutes, but the stream processor doesn't close the window because processing time is not past the end of the window's interval, and the source does not remain idle after the window's interval ends. When the watermark advances to 12:05, the window closes normally, and the 12:03-12:06 window opens.

When the source goes idle at 12:06, it remains idle through 12:07, triggering the idleness timeout and advancing the watermark to 12:06.

← Overview