For AI agents: a documentation index is available at https://www.mongodb.com/docs/llms.txt — markdown versions of all pages are available by appending .md to any URL path.
Docs Menu

Develop Stream Processors

An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing workspace in which its definition is stored.

To create and manage a stream processor, you must have:

Many stream processor commands require you to specify the name of the relevant stream processor in the method invocation. The syntax described in the following sections assumes strictly alphanumeric names. If your stream processor's name includes non-alphanumeric characters such as hyphens (-) or full stops (.), you must enclose the name in square brackets ([]) and double quotes ("") in the method invocation, as in sp.["special-name-stream"].stats().

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.

You can modify the following elements of an existing stream processor:

To modify a stream processor, do the following:

1

See Stop a Stream Processor.

2

See the procedure for your chosen interface.

3

See Start a Stream Processor.

By default, modified processors restore from the last checkpoint. Alternatively, you can set resumeFromCheckpoint=false, in which case the processor only retains summary stats. When you modify a processor with open windows, the windows are entirely recomputed on the updated pipeline.

Note

If you change the name of a stream processor for which you had configured the Stream Processor State is failed alert by using an Operator (which contains matcher expressions like is, contains, and more), Atlas won't trigger alerts for the renamed stream processor if the matcher expression doesn't match the new name. To monitor the renamed stream processor, reconfigure the alert.

When the default setting resumeFromCheckpoint=true is enabled, the following limitations apply:

  • You can't modify the $source stage.

  • You can't modify the interval of your window.

  • You can't remove a window.

  • You can only modify a pipeline with a window if that window has either a $group or $sort stage in its inner pipeline.

  • You can't change an existing window type. For example, you can't change from a $tumblingWindow to a $hoppingWindow or vice versa.

  • Processors with windows may reprocess some data as a product of recalculating the windows.

  • Per-operator statistics are not retained after a modify operation.

To return an array of sampled results from an existing stream processor to STDOUT with mongosh, use the sp.processor.sample() method. For example, the following command samples from a stream processor named proc01.

sp.proc01.sample()

This command runs continuously until you cancel it by using CTRL-C, or until the returned samples cumulatively reach 40 MB in size. The stream processor reports invalid documents in the sample in a _dlqMessage document of the following form:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.

Note

Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.