Architecture Fundamentals
The core abstraction of Atlas Stream Processing is the stream processor. A stream processor is a MongoDB aggregation pipeline that operates continuously on streaming data from a specified source and writes the output to a sink. To learn more, see Structure of a Stream Processor.
Stream processing takes place on stream processing instances. Each stream processing instance is an Atlas namespace that associates the following:
One or more workers, which provide the RAM and CPUs necessary to run your stream processors.
A cloud provider and cloud region.
A connection registry, which stores the list of available sources and sinks of streaming data.
A security context in which to define user authorizations.
A Connection String to the stream processing instance itself.
Workers
When you define a stream processor, it becomes available only for the stream processing instance in which you define it. Each worker can host up to four running stream processors; Atlas Stream Processing automatically scales your stream processing instance up as you start stream processors by provisioning workers as needed. You can deprovision a worker by stopping all stream processors on it. Atlas Stream Processing always prefers to assign a stream processor to an existing worker over provisioning new workers.
Example
You have a stream processing instance running eight stream processors, named
proc01
through proc08
. proc01
through proc04
run on
one worker, proc05
through proc08
run on a second
worker. You start a new stream processor named
proc09
. Atlas Stream Processing provisions a third worker to host
proc09
.
Later, you stop proc03
on the first worker. When you stop
proc09
and restart it, Atlas Stream Processing reassigns proc09
to the
first worker and deprovisions the third worker.
If you start a new stream processor named proc10
before you stop
and restart proc09
, Atlas Stream Processing assigns proc10
to the first
worker in the slot previously allocated to proc03
.
When scaling, Atlas Stream Processing only considers the number of currently running stream processors; it doesn't count defined stream processors that aren't running. The tier of the stream processing instance determines the RAM and CPU allotment of its workers.
Connection Registry
Connection registries store one or more connections. Each connection assigns a name to the combination of networking and security details that allow a stream processor to interact with external services. Connections exhibit the following behavior:
Only a connection defined in a given stream processing instance's connection registry can service stream processors hosted on that stream processing instance.
Each connection can service an arbitrary number of stream processors
Only a single connection can serve as a given stream processor's source.
Only a single connection can serve as a given stream processor's sink.
A connection is not innately defined as either a source or a sink. Any given connection can serve either function depending on how a stream processor invokes that connection.
Atlas Stream Processing runs stream processing workers in dedicated customer containers, on multi-tenant infrastructure. For more information on MongoDB security and compliance, see the MongoDB Trust Center.
Checkpoints
Atlas Stream Processing captures the state of a stream processor using checkpoints. Each checkpoint has a unique ID and are subject to the flow of your stream processor logic. After all operators of a stream processor add their state to a checkpoint, Atlas Stream Processing commits the checkpoint, generating two types of records:
A single commit record that validates the checkpoint ID and the stream processor to which it belongs
A set of records describing the state of each stateful operation in the relevant stream processor at the instant Atlas Stream Processing committed the checkpoint.
When you restart a stream processor after an interruption, Atlas Stream Processing queries the last committed checkpoint and resumes operation from the described state.
Dead Letter Queue
Atlas Stream Processing supports the use of an Atlas database collection as a dead letter queue (DLQ). When Atlas Stream Processing cannot process a document from your data stream, it writes the content of the document to the DLQ along with details of the processing failure. You can assign a collection as a DLQ in your stream processor definitions.
To learn more, see Create a Stream Processor.
Stream Processing Timing
In streaming data processing, documents are subject to two timing systems:
event time
processing time
Atlas Stream Processing offers various parameters to control how stream processors interact with these timing systems.
Event Time
Event time is the time at which either the source stream generates a document, or the messaging system (e.g. Apache Kafka) receives the document. This is ascertained by the timestamp of the document.
Network latency, upstream processing, and other factors can not only cause discrepancies between these times for a given document, but can also cause documents to arrive in a stream processor out of event-time order. In either case, windows can miss documents that you intend for them to capture. Atlas Stream Processing considers such documents late-arriving, and sends them to your Dead Letter Queue if you configure one.
Event Time is a configurable option for the
boundary
field supported in Tumbling Windows
and Hopping Windows.
Processing Time
Processing time is the time at which the stream processor consumes a document. This is ascertained by the clock of the system hosting the stream processor.
Processing Time is a configurable option for the boundary
field supported in Tumbling Windows and
Hopping Windows. It allows you to create a
pipeline with a kind of window which accumulates data based on the
wall clock time of the server. As opposed to event time windows, processing time windows assign each
event a timestamp based on the wall clock time of the server when it
arrives at the stream processor.
Document timestamps and window boundary timestamps are in UTC. You
can't specify idleTimeout or
allowedLateness options when
configuring a processingTime
window.
Example
You create a pipeline with a 5 minute event time window. An event is added
to a source Kafka cluster at 09:33
. Due to some delay in the Kafka
cluster, it arrives at the stream processor at 09:37
.
If the pipeline has a 5 minute event time window, then this event will be
assigned to the 09:30-09:35
window. If the pipeline has a 5 minute
processing time window, then the event will instead be assigned to the
09:35-09:40
window.
Watermarks
A watermark supersedes processing time and updates only when the processor consumes a document with a later event time than any previously-consumed document. All stream processors apply watermarks in Atlas Stream Processing.
Example
You configure a stream processor with 5-minute windows. You start the
processor at 12:00
, so that the first two windows will have
durations of 12:00-12:05
and 12:05-12:10
. The following table
illustrates which windows will capture which events given varying
delays, with and without watermarks.
Event Time | Processing Time | Window Time (No Watermarks) | Window Time (Watermarks) |
---|---|---|---|
12:00 | 12:00 | 12:00-12:05 | 12:00-12:05 |
12:01 | 12:03 | 12:00-12:05 | 12:00-12:05 |
12:02 | 12:05 | 12:05-12:10 | 12:00-12:05 |
12:01 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:06 | 12:07 | 12:05-12:10 | 12:05-12:10 |
Without watermarks, the 12:00-12:05
window closes at 12:05
according to the system clock of the stream processing instance, and the 12:05-12:10
window opens immediately. Thus, though the source generated
four of the documents during the 12:00-12:05
interval, the
relevant window captures only two documents.
With watermarks, the 12:00-12:05
window doesn't close at 12:05
because among the documents it ingests up to that point, the latest
event time—and thus the watermark value—is 12:03
. The
12:00-12:05
window doesn't close until 12:07
on the system
clock, when the stream processor ingests a document with an event time
of 12:05
, advances the watermark to that time, and opens the
12:05-12:10
window. Each window captures all of the appropriate
documents.
When reading from Apache Kafka, Atlas waits for all partitions to
pass the watermark. If a partition idles and fails to produce events
with timestamps later than the watermark, the window doesn't close or
output results. To address this, set partitionIdleTimeout
to
ensure that idle partitions don't halt the progression of
watermarks. To learn more, see $source
Stage (Stream Processing).
Allowed Lateness
If the differences between event time and processing time vary sufficiently, documents may arrive in a stream processor after the watermark has advanced enough to close the expected window. To mitigate this, Atlas Stream Processing supports Allowed Lateness, a setting which delays a window closing by a set interval relative to the watermark.
While watermarks are properties of stream processors, Allowed Lateness is a property of a window, and only affects when that window closes. If the stream processor's watermark advances to a point that would trigger a new window to open, Allowed Lateness keeps earlier windows open without preventing this.
Example
You configure a stream processor with 5-minute tumbling windows. You
start the processor at 12:00
, so that the first two windows will
have durations of 12:00-12:05
and 12:05-12:10
. You set an
Allowed Lateness of 2 minutes.
The table below reflects the order in which the stream processor ingests the described documents.
Event Time | Watermark | Allowed Lateness Time | Window Time |
---|---|---|---|
12:00 | 12:00 | 11:58 | 12:00-12:05 |
12:02 | 12:03 | 12:01 | 12:00-12:05 |
12:01 | 12:04 | 12:02 | 12:00-12:05 |
12:05 | 12:05 | 12:03 | 12:00-12:15, 12:05-12:10 |
12:04 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:07 | 12:07 | 12:05 | 12:05-12:10 |
When the watermark advances to 12:05
, the 12:05-12:10
window
opens. However, because the Allowed Lateness interval is 2 minutes,
from within the 12:00-12:05
window, it is effectively only
12:03
, so it stays open. Only when the watermark advances to
12:07
does the adjusted time reach 12:05
. At this point, the
12:00-12:05
window closes.
Idleness Timeout
Decoupling windowing behavior from processing time by default improves stream processing correctness in most cases. However, a streaming data source may have periods of extended idleness. In this scenario, a window may capture events prior to the period of idleness and be unable to return processed results while waiting for the watermark to advance enough to close.
Atlas Stream Processing allows users to configure an idleness timeout for windows to mitigate these scenarios using processing time. An idleness timeout is an interval that begins when processing time passes the end of an open window's interval, and the stream processor's source is idle. If the source remains idle for an interval equal to the idleness timeout, the window closes and the watermark advances independent of any document ingestion.
Example
You configure a tumbling window with a 3-minute interval and a 1-minute idleness timeout. The following table illustrates the effects of the idleness timeout during and after a window's interval.
Processing Time | Event Time or Status | Watermark | Window Time |
---|---|---|---|
12:00 | 12:00 | 12:00 | 12:00-12:03 |
12:01 | Source idle | 12:00 | 12:00-12:03 |
12:02 | Source idle | 12:00 | 12:00-12:03 |
12:03 | Source idle | 12:00 | 12:00-12:03 |
12:04 | 12:02 | 12:02 | 12:00-12:03 |
12:05 | 12:05 | 12:05 | 12:03-12:06 |
12:06 | Source idle | 12:05 | 12:03-12:06 |
12:07 | Source idle | 12:00 | 12:06-12:09 |
12:08 | Source idle | 12:00 | 12:06-12:09 |
12:09 | 12:09 | 12:09 | 12:09-12:12 |
During the 12:00-12:03
interval, the source idles for three
minutes, but the stream processor doesn't close the window because
processing time is not past the end of the window's interval, and the
source does not remain idle after the window's interval ends. When the
watermark advances to 12:05
, the window closes normally, and the
12:03-12:06
window opens.
When the source goes idle at 12:06
, it remains idle through
12:07
, triggering the idleness timeout and advancing the watermark
to 12:06
.