$sessionWindow
Definition
The $sessionWindow
stage specifies a session window for aggregation of
data. Session windows allow you to run a pipeline on each "session" of activity in an
input stream. Two documents are in the same session if they have the same partition, and
the difference of their timestamps is less than the session gap. When a window is closed,
its results are released to the next stage.
A session window closes when the watermark advances a duration of the gap
value plus the allowedLateness
value beyond the maximum document timestamp in the session. A closed session window's start time
is the timestamp of the first event in the session, and the end time is the timestamp
of the last event in the session, plus the gap. Session window end time adds the gap
to the maximum timestamp in the session. The window results are the output of
$sessionWindow.pipeline
on the documents that fit in the window.
$sessionWindow
A
$sessionWindow
pipeline stage has the following prototype form:{ $sessionWindow: { partitionBy: "$userId", gap: {unit: "minute", size: 5}, pipeline: [{$match: {ad: true}}, {$group: { _id: null, total: {$sum: "$value"}}}], boundary: "eventTime", allowedLateness: {unit: "second", size: 5} } }
Syntax
The $sessionWindow
stage takes a document with the following
fields:
Field | Type | Necessity | Description |
---|---|---|---|
| expression | Required | Fields that the |
| document | Required | Document that defines the amount of time as a combination of a
For example, with a |
| string | Optional | String specifying whether window boundaries are determined
by event time or processing time. Value can be either You can't set the |
| array | Required | Nested aggregation pipeline evaluated against the messages within the window. |
| duration | Optional | Document that specifies the amount of time as a combination of a
For example, a If omitted, defaults to 3 seconds. |
Behavior
Each input document that reaches the $sessionWindow
stage is assigned a partition
from the partitionBy
expression. Each document is assigned to a session window
based on its partition and timestamp. This might be a new or an existing session window.
When a window is closed, its results are released to the next stage. The window results
are the output of the pipeline on the documents in the window.
A session window is closed when the watermark advances the gap and the allowedLateness
values beyond the maximum document timestamp in the session. A closed session window's
start time is the timestamp of the first event in the session. A closed session window's end
time is the timestamp of the last event in the session, plus the gap. The session window end
time adds the gap to the maximum timestamp in the session.
Note
Suppose there is a session window for partition A, with a maximum timestamp of
2024-01-01 00:40:00
. With a 1 hour gap, if no later documents arrive for partition A,
the session window is closed when the watermark reaches 2024-01-01 01:40:00
.
If the partitionBy
expression fails for an input document, the document is sent to the DLQ.
When processing out-of-order data, a document can arrive with a timestamp within the gap of
existing session windows of the same partition. This will merge the session windows.
If a document arrives at the $sessionWindow
stage with a timestamp that is less than the value
of the most recent watermark from the source, and there is no open session for that document, the
document is sent to the DLQ. When a document reaches the $sessionWindow
stage, its stream
meta window.partition
, window.start
, and window.end
fields are set.
Events can arrive out of order. Two events that have the same partitionBy
value might arrive,
and their timestamps might be separated by more than the gap. Initially, these events will be placed
in separate session windows. Later, another event could arrive that merges the two session windows.