Docs Menu
Docs Home
/
Atlas
/ /

$sessionWindow

The $sessionWindow stage specifies a session window for aggregation of data. Session windows allow you to run a pipeline on each "session" of activity in an input stream. Two documents are in the same session if they have the same partition, and the difference of their timestamps is less than the session gap. When a window is closed, its results are released to the next stage.

A session window closes when the watermark advances a duration of the gap value plus the allowedLateness value beyond the maximum document timestamp in the session. A closed session window's start time is the timestamp of the first event in the session, and the end time is the timestamp of the last event in the session, plus the gap. Session window end time adds the gap to the maximum timestamp in the session. The window results are the output of $sessionWindow.pipeline on the documents that fit in the window.

$sessionWindow

A $sessionWindow pipeline stage has the following prototype form:

{
$sessionWindow: {
partitionBy: "$userId",
gap: {unit: "minute", size: 5},
pipeline: [{$match: {ad: true}}, {$group: { _id: null, total: {$sum: "$value"}}}],
boundary: "eventTime",
allowedLateness: {unit: "second", size: 5}
}
}

The $sessionWindow stage takes a document with the following fields:

Field
Type
Necessity
Description

partitionBy

expression

Required

Fields that the $sessionWindow will be partitioned by. Each incoming document that shares the same paritionBy fields will be processed together.

gap

document

Required

Document that defines the amount of time as a combination of a size and a unit to wait for additional records that share partitionBy values before closing the session, where:

  • The value of size must be a non-zero positive integer.

  • The value of unit must be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

For example, with a size of "1" and a unit of "minute", this stage waits one minute for additional records with the same partitionBy values before closing the session window.

boundary

string

Optional

String specifying whether window boundaries are determined by event time or processing time. Value can be either eventTime or processingTime. See stream processing timing to learn more. If omitted, this field defaults to eventTime.

You can't set the allowedLateness field when boundary is set to processingTime.

pipeline

array

Required

Nested aggregation pipeline evaluated against the messages within the window.

allowedLateness

duration

Optional

Document that specifies the amount of time as a combination of a size and a unit to keep windows generated from the source open to accept late-arriving data after processing documents for window end time, where:

  • The value of size must be a non-zero positive integer.

  • The value of unit must be one of the following:

    • "ms" (millisecond)

    • "second"

    • "minute"

    • "hour"

    • "day"

For example, a size of "3" and a unit of "second" waits 3 seconds after the gap for late-arriving records before moving records to the next stage.

If omitted, defaults to 3 seconds.

Each input document that reaches the $sessionWindow stage is assigned a partition from the partitionBy expression. Each document is assigned to a session window based on its partition and timestamp. This might be a new or an existing session window. When a window is closed, its results are released to the next stage. The window results are the output of the pipeline on the documents in the window.

A session window is closed when the watermark advances the gap and the allowedLateness values beyond the maximum document timestamp in the session. A closed session window's start time is the timestamp of the first event in the session. A closed session window's end time is the timestamp of the last event in the session, plus the gap. The session window end time adds the gap to the maximum timestamp in the session.

Note

Suppose there is a session window for partition A, with a maximum timestamp of 2024-01-01 00:40:00. With a 1 hour gap, if no later documents arrive for partition A, the session window is closed when the watermark reaches 2024-01-01 01:40:00.

If the partitionBy expression fails for an input document, the document is sent to the DLQ. When processing out-of-order data, a document can arrive with a timestamp within the gap of existing session windows of the same partition. This will merge the session windows.

If a document arrives at the $sessionWindow stage with a timestamp that is less than the value of the most recent watermark from the source, and there is no open session for that document, the document is sent to the DLQ. When a document reaches the $sessionWindow stage, its stream meta window.partition, window.start, and window.end fields are set.

Events can arrive out of order. Two events that have the same partitionBy value might arrive, and their timestamps might be separated by more than the gap. Initially, these events will be placed in separate session windows. Later, another event could arrive that merges the two session windows.

Back

$externalFunction

On this page