Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$source

On this page

  • Definition
  • Syntax
  • Apache Kafka Broker
  • MongoDB Collection Change Stream
  • MongoDB Database Change Stream
  • Document Array
  • Behavior
$source

The $source stage specifies a connection in the Connection Registry to stream data from. The following connection types are supported:

Note

You can't use Atlas serverless instances as a $source.

To operate on streaming data from an Apache Kafka broker, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : "<source-topic>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description
connectionName
string
Required
Label that identifies the connection in the Connection Registry, to ingest data from.
topic
string
Required
Name of the Apache Kafka topic to stream messages from.
timeField
document
Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName
string
Optional

Name that overrides the name of the timestamp field projected by the $source.

The $source stage in an Atlas Stream Processing pipeline projects a field called _ts with the assigned timestamp of the document. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

partitionIdleTimeout
document
Optional
Document specifying the amount of time that a partition is allowed to be idle before it is ignored in watermark calculations.
partitionIdleTimeout.size
integer
Optional
Number specifying the duration of the partition idle timeout.
partitionIdleTimeout.unit
string
Optional

Unit of time for the duration of the partition idle timeout.

The value of unit can be one of the following:

  • "ms" (millisecond)

  • "second"

  • "minute"

  • "hour"

  • "day"

config
document
Optional
Document containing fields that override various default values.
config.auto_offset_reset
string
Optional

Specifies which event in the Apache Kafka source topic to begin ingestion with. auto_offset_reset takes the following values:

  • end, latest, or largest : to begin ingestion from the latest event in the topic at the time the aggregation is initialized.

  • earliest, beginning, or smallest : to begin ingestion from the earliest event in the topic.

Defaults to latest.

config.group_id
string
Optional

ID of the kafka consumer group to associate with the stream processor. If omitted, Atlas Stream Processing associates the stream processing instance with an auto-generated ID in the following format:

asp-${streamProcessorId}-consumer

Atlas Stream Processing commits partition offsets to the Apache Kafka broker for the specified consumer group ID after a checkpoint is committed. It commits an offset when messages up through that offset are durably recorded in a checkpoint. This allows you to track the offset lag and progress of the stream processor directly from the Kafka broker consumer group metadata.

config.keyFormat
string
Optional

Data type used to deserialize Apache Kafka key data. Must be one of the following values:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Defaults to binData.

config.keyFormatError
string
Optional

How to handle errors encountered when deserializing Apache Kafka key data. Must be one of the following values:

  • dlq, which writes the document to your Dead Letter Queue.

  • passThrough, which sends the document to the next stage without key data.

Note

Atlas Stream Processing requires that documents in the source data stream be valid json or ejson. Atlas Stream Processing sets the documents that don't meet this requirement to your dead letter queue if you have configured one.

To operate on streaming data from an Atlas collection change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description
connectionName
string
Conditional
Label that identifies the connection in the Connection Registry, to ingest data from.
timeField
document
Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName
string
Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store checkpointing information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

db
string
Required
Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.
coll
string or array of strings
Required
Name of one or more MongoDB collections hosted on the Atlas instance specified by connectionName. The change stream of these collections act as the streaming data source. If you omit this field, your stream processor will source from a MongoDB Database Change Stream.
config
document
Optional
Document containing fields that override various default values.
config.startAfter
token
Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime
timestamp
Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.fullDocument
string
Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentOnly
boolean
Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.fullDocumentBeforeChange
string
Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a collection change stream, you must enable change stream Pre- and Post-Images on that collection.

config.pipeline
document
Optional
Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output.

To operate on streaming data from an Atlas database change stream, the $source stage has the following prototype form:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description
connectionName
string
Conditional
Label that identifies the connection in the Connection Registry, to ingest data from.
timeField
document
Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName
string
Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store checkpointing information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

db
string
Required
Name of a MongoDB database hosted on the Atlas instance specified by connectionName. The change stream of this database acts as the streaming data source.
config
document
Optional
Document containing fields that override various default values.
config.startAfter
token
Conditional

The change event after which the source begins reporting. This takes the form of a resume token.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.startAtOperationTime
timestamp
Conditional

The operation time after which the source should begin reporting.

You can use only one of either config.startAfter or config.StartAtOperationTime.

config.fullDocument
string
Conditional

Setting that controls whether a change stream source should return a full document, or only the changes when an update occurs. Must be one of the following:

  • updateLookup : Returns only changes on update.

  • required : Must return a full document. If a full document is unavailable, returns nothing.

  • whenAvailable : Returns a full document whenever one is available, otherwise returns changes.

If you do not specify a value for fullDocument, it defaults to updateLookup.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentOnly
boolean
Conditional

Setting that controls whether a change stream source returns the entire change event document including all metadata, or only the contents of fullDocument. If set to true, the source returns only the contents of fullDocument.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.fullDocumentBeforeChange
string
Optional

Specifies whether a change stream source should include the full document in its original "before changes" state in the output. Must be one of the following:

  • off : Omits the fullDocumentBeforeChange field.

  • required : Must return a full document in its before changes state. If a full document in its before changes state is unavailable, the stream processor fails.

  • whenAvailable : Returns a full document in its before changes state whenever one is available, otherwise omits the fullDocumentBeforeChange field.

If you do not specify a value for fullDocumentBeforeChange, it defaults to off.

To use this field with a database change stream, you must enable change stream Pre- and Post-Images on every collection in that database.

config.pipeline
document
Optional
Specifies an aggregation pipeline to filter change stream output at the point of origin. This pipeline must conform to the parameters described in change-stream-modify-output.

To operate on an array of documents, the $source stage has the following prototype form:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

The $source stage takes a document with the following fields:

Field
Type
Necessity
Description
timeField
document
Optional

Document that defines an authoritative timestamp for incoming messages.

If you use timeField, you must define it as one of the following:

  • a $toDate expression that takes a source message field as an argument

  • a $dateFromString expression that takes a source message field as an argument.

If you do not declare a timeField, Atlas Stream Processing creates a timestamp from the message timestamp provided by the source.

tsFieldName
string
Optional

Name that overrides the name of default timestamp fields declared by the source.

Atlas Stream Processing pipelines internally add a field to incoming messages called _ts to store checkpointing information. Sources of streaming data might also use a field named _ts to store the timestamps of each message. To prevent a conflict between these fields, use tsFieldName to rename any source-provided field named _ts before additional processing takes place.

documents
array
Conditional
Array of documents to use as a streaming data source. The value of this field can either be an array of objects or an expression that evaluates to an array of objects. Do not use this field when using the connectionName field.

$source must be the first stage of any pipeline it appears in. You can use only one $source stage per pipeline.

Back

Aggregation Pipelines

Next

$validate