Docs Menu

Docs HomeMongoDB Kafka Connector

All Source Connector Configuration Properties

On this page

  • Overview
  • MongoDB Connection
  • Kafka Topic
  • Change Streams
  • Output Format
  • Copy Existing
  • Error Handling and Resuming from Interruption

On this page, you can view all available configuration properties for your MongoDB Kafka source connector. This page duplicates the content of the other source connector configuration properties pages.

To view a list of all source connector configuration properties pages, see the Source Connector Configuration Properties page.

Use the following configuration settings to specify how your source connector establishes a connection and communicates with your MongoDB cluster.

To view only the options related to your MongoDB connection, see the MongoDB Source Connection Properties page.

Name
Description
connection.uri
Required

Type: string

Description:
The URI connection string to connect to your MongoDB instance or cluster.
For more information, see the Connect to MongoDB guide

Important

Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017,localhost:27018,localhost:27019
Accepted Values: A MongoDB URI connection string
database
Type: string

Description:
Name of the database to watch for changes. If not set, the connector watches all databases for changes.

Default: ""
Accepted Values: A single database name
collection
Type: string

Description:
Name of the collection in the database to watch for changes. If not set, the connector watches all collections for changes.

Important

If your database configuration is set to "", the connector ignores the collection setting.


Default: ""
Accepted Values: A single collection name

Use the following configuration settings to specify which Kafka topics the source connector should publish data to.

To view only the options related to your Kafka topic, see the Kafka Topic Properties page.

Name
Description
topic.prefix
Type: string

Description:
The prefix to prepend to database and collection names to generate the name of the Kafka topic on which to publish the data.

Tip

Default: ""
Accepted Values: A string composed of ASCII alphanumeric characters including ".", "-", and "_"

Use the following configuration settings to specify aggregation pipelines for change streams and read preferences for change stream cursors.

To view only the options related to change streams, see the Change Stream Properties page.

Name
Description
pipeline
Type: string

Description:
An array of aggregation pipelines to run in your change stream. You must configure this setting for the change stream event document, not the fullDocument field.

Example

[{"$match": { "$and": [{"operationType": "insert"}, {"fullDocument.eventId": 1404 }] } }]

Tip

Additional Examples

Default: "[]"
Accepted Values: Valid aggregation pipeline stage
change.stream.full.document
Type: string

Description:
Determines what values your change stream returns on update operations.
The default setting returns the differences between the original document and the updated document.
The updateLookup setting returns the differences between the original document and updated document as well as a copy of the entire updated document at a point in time after the update.

Tip

For more information on how this change stream option works, see the MongoDB server manual guide on Lookup Full Document for Update Operations.


Default: ""
Accepted Values: "" or "default" or "updateLookup"
publish.full.document.only
Type: boolean

Description:
Whether to return only the fullDocument field from the change stream event document produced by any update event. The fullDocument field contains the most current version of the updated document. To learn more about the fullDocument field, see the Server manual page on update events.
When set to true, the connector overrides the change.stream.full.document setting and sets it to updateLookup so that the fullDocument field contains updated documents.
Default: false
Accepted Values: true or false
collation
Type: string

Description:
A JSON collation document that specifies language-specific ordering rules that MongoDB applies to the documents returned by the change stream.

Default: ""
Accepted Values: A valid collation JSON document
batch.size
Type: int

Description:
The change stream cursor batch size.

Default: 0
Accepted Values: An integer
poll.await.time.ms
Type: long

Description:
The amount of time in milliseconds to wait before checking the change stream cursor for new results.

Default: 5000
Accepted Values: An integer
poll.max.batch.size
Type: int

Description:
Maximum number of documents to read in a single batch when polling a change stream cursor for new data. You can use this setting to limit the amount of data buffered internally in the connector.

Default: 1000
Accepted Values: An integer

Use the following configuration settings to specify the format of data the source connector publishes to Kafka topics.

To view only the options related to the format of your output, see the Output Format Properties page.

Name
Description
output.format.key
Type: string

Description:
Specifies which data format the source connector outputs the key document.

Default: json
Accepted Values: bson, json, schema
output.format.value
Type: string

Description:
Specifies which data format the source connector outputs the value document.

Default: json
Accepted Values: bson, json, schema

Note

Protobuf Output Format Support

The connector supports Protobuf as an output data format. You can enable this format by specifying the schema value and installing and configuring the Kafka Connect Protobuf Converter.

output.json.formatter
Type: string

Description:
Class name of the JSON formatter the connector should use to output data.

Default:
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
Accepted Values:
Your custom JSON formatter full class name or one of the following built-in formatter class names:
com.mongodb.kafka.connect.source.json.formatter.DefaultJson
com.mongodb.kafka.connect.source.json.formatter.ExtendedJson
com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
To learn more about these output formats, see JSON Formatters.
output.schema.key
Type: string

Description:
Specifies an AVRO schema definition for the key document of the SourceRecord.

Tip

See also:

For more information on AVRO schema, see the Data Formats guide.

Default:
{
"type": "record",
"name": "keySchema",
"fields" : [ { "name": "_id", "type": "string" } ]"
}
Accepted Values: A valid AVRO schema
output.schema.value
Type: string

Description:
Specifies an AVRO schema definition for the value document of the SourceRecord.

Tip

See also:

For more information on AVRO schema, see the Data Formats guide.

Default:
{
"name": "ChangeStream",
"type": "record",
"fields": [
{ "name": "_id", "type": "string" },
{ "name": "operationType", "type": ["string", "null"] },
{ "name": "fullDocument", "type": ["string", "null"] },
{ "name": "ns",
"type": [{"name": "ns", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "to",
"type": [{"name": "to", "type": "record", "fields": [
{"name": "db", "type": "string"},
{"name": "coll", "type": ["string", "null"] } ]
}, "null" ] },
{ "name": "documentKey", "type": ["string", "null"] },
{ "name": "updateDescription",
"type": [{"name": "updateDescription", "type": "record", "fields": [
{"name": "updatedFields", "type": ["string", "null"]},
{"name": "removedFields",
"type": [{"type": "array", "items": "string"}, "null"]
}] }, "null"] },
{ "name": "clusterTime", "type": ["string", "null"] },
{ "name": "txnNumber", "type": ["long", "null"]},
{ "name": "lsid", "type": [{"name": "lsid", "type": "record",
"fields": [ {"name": "id", "type": "string"},
{"name": "uid", "type": "string"}] }, "null"] }
]
}
Accepted Values: A valid JSON schema
output.schema.infer.value
Type: boolean

Description:
Whether the connector should infer the schema for the value document of the SourceRecord. Since the connector processes each document in isolation, the connector may generate many schemas.

Important

The connector only reads this setting when you set your output.format.value setting to schema.

Default: false
Accepted Values: true or false

Use the following configuration settings to enable the copy existing feature which converts MongoDB collections into Change Stream events.

To view only the options related to copying data, see the Copy Existing Properties page.

Name
Description
copy.existing
Type: boolean

Description:
Whether to enable the copy existing feature which converts all data in a MongoDB collection to Change Stream events and publishes them on Kafka topics. If MongoDB changes the source collection data after the connector starts the copy process, the connector creates events for the changes after it completes the copy process.

Note

Data Copy Can Produce Duplicate Events

If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.

Default:false
Accepted Values: true or false
copy.existing.namespace.regex
Type: string

Description:
Regular expression the connector uses to match namespaces from which to copy data. A namespace describes the MongoDB database name and collection separated by a period, e.g. databaseName.collectionName.

Example

In the following example, the regular expression setting matches collections that start with "page" in the "stats" database.

copy.existing.namespace.regex=stats\.page.*

The "\" character in the example above escapes the "." character that follows it in the regular expression. For more information on how to build regular expressions, see the Java API documentation on Patterns.

Default: ""
Accepted Values: A valid regular expression
copy.existing.pipeline
Type: string

Description:
An array of pipeline operations the connector runs when copying existing data. You can use this setting to filter the source collection and improve the use of indexes in the copying process.

Example

The following example shows how you can use the $match aggregation operator to instruct the connector to copy only documents that contain a closed field with a value of false.

copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: []
Accepted Values: Valid aggregation pipeline stages
copy.existing.max.threads
Type: int

Description:
The maximum number of threads the connector can use to copy data.
Default: number of processors available in the environment
Accepted Values: An integer
copy.existing.queue.size
Type: int

Description:
The size of the queue the connector can use when copying data.
Default: 16000
Accepted Values: An integer

Use the following configuration settings to specify how the source connector behaves when it encounters errors and to specify settings related to resuming interrupted reads.

To view only the options related to handling errors, see the Error Handling and Resuming from Interruption Properties page.

Name
Description
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name based on the connection details. To view a strategy for naming offset partitions, see the Reset Stored Offsets guide.
Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval.
Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.
This mechanism improves resumability of the connector for low volume namespaces. See the Invalid Resume Token page for more information on this feature.
Set this to 0 to disable heartbeat messages.
Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name
←  Error Handling and Resuming from Interruption PropertiesUsage Examples →