Docs Menu

Docs HomeLaunch & Manage MongoDBMongoDB Atlas

Atlas Stream Processing Overview

On this page

  • About Atlas Stream Processing
  • What is a Stream?
  • Data Isolation and Security
  • Atlas Stream Processing Key Concepts
  • Windows
  • Tumbling Windows
  • Hopping Windows
  • Recoverability
  • Schema Validation
  • Dead Letter Queue
  • Continuous Merge
  • Supported Aggregation Pipeline Stages

Important

Atlas Stream Processing is currently in Public Preview.

For information on limitations to Atlas Stream Processing Public Preview functionality, see Limitations.

Atlas Stream Processing enables processing streams of complex data using the same data model and Query API used in Atlas databases. Atlas Stream Processing allows you to:

  • Build aggregation pipelines to continuously operate on streaming data without the delays inherent to batch processing.

  • Perform continuous schema validation to check that messages are properly formed, detect message corruption, and detect late-arriving data.

  • Continuously publish results to Atlas collections or Apache Kafka clusters, ensuring up-to-date views and analysis of data.

Atlas Stream Processing components belong directly to Atlas projects and operate independent of Atlas clusters.

A stream is a continuous flow of data originating from one or more sources, taking the form of an append-only log—also called a journal. Examples of data streams include temperature or pressure readings from sensors, records of financial transactions, or change data capture events.

Data streams originate from sources such as Apache Kafka Topics or change streams. You can then write processed data to sinks such as Apache Kafka Topics or Atlas collections.

Streams of data originate in systems with rapidly changing state. Atlas Stream Processing provides native stream processing capabilities to operate on continuous data without the time and computational constraints of an at-rest database.

Atlas Stream Processing runs stream processing workers in dedicated customer containers, on multi-tenant infrastructure. For more information on MongoDB security and compliance, see the MongoDB Trust Center.

Stream Processor

A stream processor is a MongoDB aggregation pipeline query that runs continuously against your data stream. Each stream processor has a name unique to its stream processing instance, and its pipeline definition is persisted. You can apply a stream processor to any connection defined in the connection registry. You can start or stop stream processors as needed.

Connection Registry

A list of connections to streaming data sources and sinks, with metadata and connection strings for each entry.

Important

After adding an external connection such as an Apache Kafka cluster to your connection registry, you must add Atlas IP addresses to an access list for that external connection. For more information, see Allow Access to or from the Atlas Control Plane.

Stream Processing Instance

A stream processing instance is an Atlas namespace with an associated:

  • Connection string

  • Cloud provider

  • Cloud provider region

  • (Optional) Security context.

Each stream processing instance associates one connection registry with one or more stream processors to enable operations on streams of data. You can connect to a stream processing instance to manage stream queries by using the same connection methods that you use for Atlas clusters.

Allowed Lateness

The configurable period of time to keep the window open past the interval in order to wait for late arriving data. If data arrives after this allowed lateness and the window has closed, Atlas Stream Processing writes the data to a Dead Letter Queue.

A late-arriving message is a message issued while a given window is open, but that does not reach Atlas Stream Processing until after that same window closes. Network delays and packet corruption are common reasons for late arrival.

Processing Guarantee

A guarantee of correctness in streaming data processing. A system can guarantee either:

  • At most once processing. The system either receives and processes the message once or skips it entirely.

  • At least once processing. The system always receives and processes a message, but it might process the same message more than once.

    Atlas Stream Processing guarantees at least once processing.

Streaming data is infinite, but you can apply queries to finite sets of streaming data documents by setting time intervals for stream sampling. These time-bounded sets of documents from streams are called windows.

Windows in Atlas Stream Processing are aggregation pipeline stages that capture subsets of a data stream beginning with the first document that arrives after the defined start time and ending with the last document that arrives before the defined end time. Windows also contain the aggregation pipeline that defines the processing logic you want to apply to your data stream. This logic applies to the set of documents captured in a given window.

Atlas Stream Processing provides support for Tumbling Windows and Hopping Windows.

Tumbling windows are windows defined by the time intervals they capture. These time intervals don't overlap.

Example

You define a tumbling window with an interval of 3 seconds. When you start your stream processor:

  • A window opens for 3 seconds.

  • The first window captures all documents that the stream generates within those 3 seconds.

  • After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.

    If you configure allowedLateness, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the close of the window.

  • A new window opens as soon as the first one closes and captures documents from the stream for the next 3 seconds.

Tumbling windows ensure comprehensive capture of data streams without repeated processing of individual documents.

Hopping windows are windows defined by the time interval they capture and the interval between opening each window, called the hop. Since duration is decoupled from frequency, you can configure hopping windows to overlap or be spaced apart.

To define a hopping window with overlap, set a hop smaller than the interval.

Example

You define a hopping window with an interval of 20 seconds and a hop of 5 seconds. When you start your stream processor:

  • A window opens for 20 seconds.

  • The first window captures all documents that the stream generates within those 20 seconds.

  • 5 seconds later, another window opens and captures all documents within the next 20 seconds. Because the first window is still open, all documents that the stream generates for the next 15 seconds are captured by both windows.

  • 20 seconds after the first window opened, it closes and applies your aggregation logic to all the documents in that window.

  • 5 seconds later, the second window closes and applies your aggregation logic to all the documents in that window, including those that were already subject to aggregation logic in the first window.

If you configure allowedLateness, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the close of the window.

To define a hopping window with spacing, set a hop larger than the interval.

Example

You define a hopping window with an interval of 3 seconds and a hop of 5 seconds. When you start a stream processor:

  • A window opens for 3 seconds.

  • The first window captures all documents for the next 3 seconds.

  • After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.

  • The next window opens after a further 2 seconds elapse.

  • Atlas Stream Processing does not process any documents that the stream generates during those 2 seconds.

Atlas Stream Processing checkpoints store the processing state, so you can resume processing in the event of an interruption. Checkpoints are documents subject to the flow of your stream processor logic that have unique IDs. When the last operator of your stream processor finishes handling the checkpoint document, Atlas Stream Processing commits that checkpoint. This commit takes the form of two types of documents:

  • A single commit record that validates the ID of the checkpoint and the stream processor being used.

  • Records describing the state of each operator in your stream processor at the instant Atlas Stream Processing committed that checkpoint.

When you resume stream processing after an interruption, Atlas Stream Processing queries internal storage for the latest committed checkpoint and restarts your stream processor with the associated state.

Data streams often contain records from varied sources with varied schemas. Atlas Stream Processing leverages MongoDB's flexible document model to process this data without preprocessing or schema validation. However, for applications with strict schema requirements, Atlas Stream Processing supports optional schema definition and validation in the form of a stream aggregation stage.

For more information, see $validate.

Atlas Stream Processing supports the use of an Atlas database collection as a dead letter queue (DLQ). When Atlas Stream Processing cannot process a document from your data stream, it writes the content of the document to the DLQ along with details of the processing failure. You can assign a collection as a DLQ in your stream processor definitions.

To learn more, see Create a Stream Processor.

Atlas Stream Processing supports continuous merging of streaming data into a collection with the $merge operator, or into an Apache Kafka Topic with the $emit operator. You can't use a time series collection as either a source or a sink.

Atlas Stream Processing provides a number of extensions to the core MongoDB Aggregation Pipeline syntax. To learn more about these extensions, see Supported Aggregation Pipeline Stages.

Certain core Aggregation Pipeline stages have limited support, or are unsupported as noted in the following table. If a stage is not listed in the table, Atlas Stream Processing supports it.

Aggregation Stage
Support Status
$group
Only supported in a $tumblingWindow or $hoppingWindow.
Only supported in a $tumblingWindow or $hoppingWindow.
Only supported in a $tumblingWindow or $hoppingWindow.
Unsupported.
Only supported in a $tumblingWindow or $hoppingWindow.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Supported with modified syntax. For more information, see $merge.
Supported with modified syntax. For more information, see $lookup.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
Unsupported.
← Atlas Stream Processing