Docs Menu

Docs HomeMongoDB Spark Connector

Streaming Read Configuration Options

On this page

  • Overview
  • Change Stream Configuration
  • Specifying Properties in connection.uri

You can configure the following properties when reading data from MongoDB in streaming mode.

Note

If you use SparkConf to set the connector's read configurations, prefix spark.mongodb.read. to each property.

Property name
Description
connection.uri
Required.
The connection string configuration key.

Default: mongodb://localhost:27017/
database
Required.
The database name configuration.
collection
Required.
The collection name configuration.
comment
The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None
mongoClientFactory
MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
aggregation.pipeline
Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Important

Custom aggregation pipelines must be compatible with the partitioner strategy. For example, aggregation stages such as $group do not work with any partitioner that creates more than one partition.

aggregation.allowDiskUse
Specifies whether to allow storage to disk when running the aggregation.

Default: true
change.stream.
Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.
outputExtendedJson
When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

You can configure the following properties when reading a change stream from MongoDB:

Property name
Description
change.stream.lookup.full.document

Determines what values your change stream returns on update operations.

The default setting returns the differences between the original document and the updated document.

The updateLookup setting also returns the differences between the original document and updated document, but it also includes a copy of the entire updated document.

Default: "default"

Tip

For more information on how this change stream option works, see the MongoDB server manual guide Lookup Full Document for Update Operation.

change.stream.micro.batch.max.partition.count
The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

Warning

Event Order

Specifying a value larger than 1 can alter the order in which the Spark Connector processes change events. Avoid this setting if out-of-order processing could create data inconsistencies downstream.

change.stream.publish.full.document.only
Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • The connector filters out messages that omit the fullDocument field and publishes only the value of the field.

  • If you don't specify a schema, the connector infers the schema from the change stream document rather than from the underlying collection.

Default: false

Note

This setting overrides the change.stream.lookup.full.document setting.

change.stream.startup.mode
Specifies how the connector starts up when no offset is available.
This setting accepts the following values:

If you use SparkConf to specify any of the previous settings, you can either include them in the connection.uri setting or list them individually.

The following code example shows how to specify the database, collection, and read preference as part of the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

To keep the connection.uri shorter and make the settings easier to read, you can specify them individually instead:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Important

If you specify a setting in both the connection.uri and on its own line, the connection.uri setting takes precedence. For example, in the following configuration, the connection database is foobar, because it's the value in the connection.uri setting:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar
←  Read from MongoDB in Streaming ModeWrite to MongoDB in Streaming Mode →