An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.
Prerequisites
To create and manage a stream processor, you must have:
mongosh
version 2.0 or higherA database user with the
atlasAdmin
role to create and run stream processorsAn Atlas cluster
Considerations
Many stream processor commands require you to specify the name of the
relevant stream processor in the method invocation. The syntax
described in the following sections assumes strictly alphanumeric
names. If your stream processor's name includes non-alphanumeric
characters such as hyphens (-
) or full stops (.
), you must
enclose the name in square brackets ([]
) and double quotes
(""
) in the method invocation, as in
sp.["special-name-stream"].stats()
.
Create a Stream Processor Interactively
You can create a stream processor interactively with the
sp.process()
method in mongosh
. Stream processors that you
create interactively exhibit the following behavior:
Write output and dead letter queue documents to the shell
Begin running immediately upon creation
Run for either 10 minutes or until the user stops them
Don't persist after stopping
Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.
sp.process()
has the following syntax:
sp.process(<pipeline>)
Field | Type | Necessity | Description |
---|---|---|---|
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
To create a stream processor interactively:
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Create a Stream Processor
To create a stream processor that persists until you drop it:
The Atlas Administration API provides an endpoint for creating a stream processor.
To create a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance.
You can choose between using the Visual Builder or the JSON editor to configure your stream processor:
Add a source connection.
In the Source field, select a connection from the Connection drop-down list to use as the source for your stream processor.
This opens a JSON text box where you can configure the
source
stage for your stream processor. To learn
more about source
stage syntax, see $source
.
Example
The following source
stage operates on
real-time data from the pre-configured
sample_stream_solar
connection:
{ "$source": { "connectionName": "sample_stream_solar" } }
Add aggregation stages to the stream processor pipeline.
In the Start building your pipeline pane, click the button for the aggregation stage that you want to add to your pipeline. This opens a text box where you can configure the selected aggregation stage in JSON format.
If your aggregation stage isn't listed, click + Custom stage to define a supported aggregation stage in JSON format. To learn more about stream processing aggregation stages and their syntax, see Aggregation Pipeline Stages.
Example
The following $match
stage matches all
documents in the pre-configured sample_stream_solar
stream where the obs.watts
field is greater than
300
:
{ "$match": { "obs.watts": { "$gt": 300 } } }
(Optional) Configure additional aggregation stages.
To add additional aggregation stages to your pipeline, click the + Add stage below button below the last stage in your pipeline, and select the aggregation stage that you want to add or click Custom stage to define a different supported aggregation stage. This opens a text box where you can configure the new stage in JSON format.
Add a sink connection.
In the Sink field, select a destination connection from the Connection drop-down list.
In the Sink field, select a connection from the Connection drop-down list to write your processed data to.
This opens a JSON text box where you can configure the
merge
stage for your stream processor. To learn
more about merge
stage syntax, see $merge
.
Example
The following sink
stage write processed data to the
demoDb.demoColl
collection in a connection named
demoConnection
connection:
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
Define the stream processor.
Specify the JSON definition for your stream processor in
the JSON editor text box. This definition must include a
name for your stream processor and an aggregation pipeline
that starts with a $source
stage and ends with
the $merge
stage. You can include any number of
additional aggregation stages between the $source
and
$merge
stages.
To learn more about stream processing aggregation stages and their syntax, see Aggregation Pipeline Stages.
Example
The following JSON definition creates a stream
processor named solarDemo
that uses a
$tumblingWindow
stage with a nested
$group
stage to aggregate real-time data from
the pre-configured sample_stream_solar
connection
over 10-second intervals, and writes the processed data
to a collection in a connection named mongodb1
.
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
To create a new stream processor with mongosh
, use the
sp.createStreamProcessor()
method. It has the following syntax:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Type | Necessity | Description |
---|---|---|---|
| string | Required | Logical name for the stream processor. This must be unique within the stream processing instance. This name should contain only alphanumeric characters. |
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
| object | Optional | Object defining various optional settings for your stream processor. |
| object | Conditional | Object assigning a
dead letter queue for your stream processing instance. This field is
necessary if you define the |
| string | Conditional | Human-readable label that identifies a connection in your
connection registry. This connection must reference an
Atlas cluster. This field is necessary if you define the
|
| string | Conditional | Name of an Atlas database on the cluster specified
in |
| string | Conditional | Name of a collection in the database specified in
|
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance
to connect using mongosh
.
In the pane for your stream processing instance, click Connect.
In the Connect to your instance dialog, select the Shell tab.
Copy the connection string displayed in the dialog. It has the following format, where
<atlas-stream-processing-url>
is the URL of your stream processing instance and<username>
is the username of a database user with theatlasAdmin
role:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Paste the connection string into your terminal and replace the
<password>
placeholder with the credentials for the user. Press Enter to run it to connect to your stream processing instance.
Example
The following command connects to a stream processing instance as a user named
streamOwner
using x.059
authentication.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the
aggregation stages you want to apply to a variable named
pipeline
.
The following example pipeline uses the stuff
topic in
the myKafka
connection in the connection registry as the
$source
, matches records where the temperature
field has a value of 46
and emits the processed messages to
the output
topic of the mySink
connection in
the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Optional) Define a DLQ.
In the mongosh
prompt, assign an object containing the
following properties of your DLQ:
Connection name
Database name
Collection name
The following example defines a DLQ over the cluster01
connection, in the metadata.dlq
database collection.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Start a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To start a stream processor:
The Atlas Administration API provides an endpoint for starting a stream processor.
To start a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance to view the list of stream processors defined for it.
Then, click the Start icon for your stream processor.
To start an existing stream processor with mongosh
, use the
sp.processor.start()
method.
For example, to start a stream processor named proc01
, run the
following command:
sp.proc01.start()
{ "ok" : 1 }
This method returns { "ok": 1 }
if the stream processor exists
and isn't currently running. If you invoke
sp.processor.start()
for a stream processor that is not
STOPPED
, mongosh
will return an error.
Stop a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To stop a stream processor:
The Atlas Administration API provides an endpoint for stopping a stream processor.
To pause a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance to view the list of stream processors defined for it.
Then, click the Pause icon for your stream processor.
To stop an existing stream processor with mongosh
, use the
sp.processor.stop()
method.
For example, to stop a stream processor named proc01
, run the
following command:
sp.proc01.stop()
{ "ok" : 1 }
This method returns { "ok": 1 }
if the stream processor exists
and is currently running. If you invoke sp.processor.stop()
for a stream processor that is not running
, mongosh
will
return an error.
Modify a Stream Processor
You can modify the following elements of an existing stream processor:
To modify a stream processor, do the following:
By default, modified processors restore from the last checkpoint. Alternatively,
you can set resumeFromCheckpoint=false
, in which case the processor only
retains summary stats. When you modify a processor with open windows, the windows
are entirely recomputed on the updated pipeline.
Note
If you change the name of a stream processor for which you had
configured the Stream Processor State
is failed alert by using an Operator
(which contains matcher expressions like is
, contains
, and
more), Atlas won't trigger alerts for the renamed stream
processor if the matcher expression doesn't match the new name. To
monitor the renamed stream processor, reconfigure the alert.
Limitations
When the default setting resumeFromCheckpoint=true
is enabled, the following
limitations apply:
You can't modify the
$source
stage.You can't modify the interval of your window.
You can't remove a window.
You can only modify a pipeline with a window if that window has either a
$group
or$sort
stage in its inner pipeline.You can't change an existing window type. For example, you can't change from a
$tumblingWindow
to a$hoppingWindow
or vice versa.Processors with windows may reprocess some data as a product of recalculating the windows.
To modify a stream processor:
The Atlas Administration API provides an endpoint for modifying a stream processor.
Requires mongosh
v2.3.4+.
Use the sp.<streamprocessor>.modify()
command to modify an existing
stream processor. <streamprocessor>
must be the name of a stopped stream
processor defined for the current stream processing instance.
For example, to modify a stream processor named proc01
, run the
following command:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Add a Stage to an Existing Pipeline
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modify the Input Source of a Stream Processor
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remove a Dead Letter Queue from a Stream Processor
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modify a Stream Processor with a Window
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Drop a Stream Processor
To drop a stream processor:
The Atlas Administration API provides an endpoint for deleting a stream processor.
To delete a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance to view the list of stream processors defined for it.
Then, click the Delete () icon for
your stream processor. In the confirmation dialog that appears,
type the name of the stream processor (solarDemo
) to confirm
that you want to delete it, and then click Delete.
To delete an existing stream processor with mongosh
, use the
sp.processor.drop()
method.
For example, to drop a stream processor named proc01
, run the
following command:
sp.proc01.drop()
This method returns:
true
if the stream processor exists.false
if the stream processor doesn't exist.
When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.
List Available Stream Processors
To list all available stream processors:
The Atlas Administration API provides an endpoint for listing all available stream processors.
To view the list of stream processors defined for your stream processing instance in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance.
The list of stream processors and their statuses displays.
To list all available stream processors on the current stream processing instance
with mongosh
, use the sp.listStreamProcessors()
method. This returns a list of documents containing the name,
start time, current state, and pipeline associated with each
stream processor. It has the following syntax:
sp.listStreamProcessors(<filter>)
<filter>
is a document specifying which field(s) to filter the list
by.
Example
The following example shows a return value for an unfiltered request:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
If you run the command again on the same stream processing instance, filtering for a
"state"
of "running"
, you see the following output:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Sample from a Stream Processor
To return an array of sampled results from an existing stream processor
to STDOUT
with mongosh
, use the sp.processor.sample()
method. For example, the following command samples from a stream
processor named proc01
.
sp.proc01.sample()
This command runs continuously until you cancel it by using
CTRL-C
, or until the returned samples cumulatively reach 40 MB in
size. The stream processor reports invalid documents in the sample in
a _dlqMessage
document of the following form:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.
View Statistics of a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which
have been stopped
for 45 days or more. When you start such a
processor, it operates and reports statistics identically to its initial run.
To view statistics of a stream processor:
The Atlas Administration API provides an endpoint for viewing the statistics of a stream processor.
To view monitoring for your stream processor, go to the Stream Processing page for your Atlas project and open the Monitoring tab. Then, select your stream processor from the Stream processor drop-down list at the top left of the page.
To return a document summarizing the current status of an existing
stream processor with mongosh
, use the
sp.processor.stats()
method. It has the following syntax:
sp.<streamprocessor>.stats({options: {<options>}})
Where options
is an optional document with the following fields:
Field | Type | Description |
---|---|---|
| integer | Unit to use for the size of items in the output. By default,
Atlas Stream Processing displays item size in bytes. To display in KB,
specify a |
| boolean | Flag that specifies the verbosity level of the output document.
If set to |
The output document has the following fields:
Field | Type | Description |
---|---|---|
| string | The namespace the stream processor is defined in. |
| object | A document describing the operational state of the stream processor. |
| string | The name of the stream processor. |
| string | The status of the stream processor. This field can have the following values:
|
| integer | The scale in which the size field displays. If set to |
| integer | The number of documents published to the stream. A document
is considered 'published' to the stream once it passes
through the |
| integer | The number of bytes or kilobytes published to the stream.
Bytes are considered 'published' to the stream once they pass
through the |
| integer | The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline. |
| integer | The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline. |
| integer | The number of documents sent to the Dead Letter Queue. |
| integer | The number of bytes or kilobytes sent to the Dead Letter Queue. |
| integer | The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog. |
| token | The most recent change stream resume token. Only applies to stream processors with a change stream source. |
| document | Latency statistics for the stream processor as a whole.
Atlas Stream Processing returns this field only if you pass in the
|
| integer | The estimated 50th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window. For example, if your |
| integer | The estimated 99th percentile latency of all documents processed in the past 30 seconds. If your pipeline includes a window stage, latency measurements include the interval of the window. For example, if your |
| datetime | Wall time at which the most recent 30 second measurement window began. |
| datetime | Wall time at which the most recent 30 second measurement window ended. |
| string | Unit of time in which latency is counted. This value is
always |
| integer | Number of documents the stream processor has processed in the most recent 30 second measurement window. |
| integer | Sum of all inividual latency measurements, in microseconds, taken in the most recent 30 second measurement window. |
| integer | The number of bytes used by windows to store processor state. |
| integer | The timestamp of the current watermark. |
| array | The statistics for each operator in the processor pipeline.
Atlas Stream Processing returns this field only if you pass in the
|
| integer | The maximum memory usage of the operator in bytes or kilobytes. |
| integer | The total execution time of the operator in seconds. |
| date | The start time of the minimum open window. This value is optional. |
| date | The start time of the maximum open window. This value is optional. |
| array | Offset information for an Apache Kafka broker's partitions.
|
| integer | The Apache Kafka topic partition number. |
| integer | The offset that the stream processor is on for the
specified partition. This value equals the previous offset
that the stream processor processed plus |
| integer | The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint. |
| boolean | The flag that indicates whether the partition is idle. This
value defaults to |
For example, the following shows the status of a stream processor named
proc01
on a stream processing instance named inst01
with item sizes displayed in
KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }