An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing workspace in which its definition is stored.
Prerequisites
To create and manage a stream processor, you must have:
A database user with the
atlasAdminrole to create and run stream processorsAn Atlas cluster
Considerations
Many stream processor commands require you to specify the name of the relevant stream processor in the method invocation. The syntax described in the following sections assumes strictly alphanumeric names. If your stream processor's name includes non-alphanumeric characters such as hyphens (-) or full stops (.), you must enclose the name in square brackets ([]) and double quotes ("") in the method invocation, as in sp.["special-name-stream"].stats().
Create a Stream Processor Interactively
Create a Stream Processor
Start a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.
Stop a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.
Modify a Stream Processor
You can modify the following elements of an existing stream processor:
Name
To modify a stream processor, do the following:
Modify the stream processor.
See the procedure for your chosen interface.
By default, modified processors restore from the last checkpoint. Alternatively, you can set resumeFromCheckpoint=false, in which case the processor only retains summary stats. When you modify a processor with open windows, the windows are entirely recomputed on the updated pipeline.
Note
If you change the name of a stream processor for which you had configured the Stream Processor State
is failed alert by using an Operator (which contains matcher expressions like is, contains, and more), Atlas won't trigger alerts for the renamed stream processor if the matcher expression doesn't match the new name. To monitor the renamed stream processor, reconfigure the alert.
Limitations
When the default setting resumeFromCheckpoint=true is enabled, the following limitations apply:
You can't modify the
$sourcestage.You can't modify the interval of your window.
You can't remove a window.
You can only modify a pipeline with a window if that window has either a
$groupor$sortstage in its inner pipeline.You can't change an existing window type. For example, you can't change from a
$tumblingWindowto a$hoppingWindowor vice versa.Processors with windows may reprocess some data as a product of recalculating the windows.
Per-operator statistics are not retained after a modify operation.
Procedure
Initiate Failover for One Stream Processor
Drop a Stream Processor
List Available Stream Processors
List Workspace Defaults
Sample from a Stream Processor
To return an array of sampled results from an existing stream processor to STDOUT with mongosh, use the sp.processor.sample() method. For example, the following command samples from a stream processor named proc01.
sp.proc01.sample()
This command runs continuously until you cancel it by using CTRL-C, or until the returned samples cumulatively reach 40 MB in size. The stream processor reports invalid documents in the sample in a _dlqMessage document of the following form:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.
View Statistics of a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.