Docs Menu

Docs HomeMongoDB Kafka Connector

All Sink Connector Configuration Properties

On this page

  • Overview
  • MongoDB Connection
  • MongoDB Namespace
  • Connector Topic
  • Connector Message Processing
  • Connector Error Handling
  • Post Processors
  • ID Strategy
  • Write Model Strategy
  • Topic Override
  • Change Data Capture

On this page, you can view all available configuration properties for your MongoDB Kafka sink connector. This page duplicates the content of the other sink connector configuration properties pages.

To view a list of all sink connector configuration properties pages, see the Sink Connector Configuration Properties page.

Use the following configuration settings to specify how your sink connector connects and communicates with your MongoDB cluster.

To view only the options related to configuring your MongoDB connection, see the MongoDB Connection Configuration Properties page.

Name
Description
connection.uri
Required

Type: string

Description:
The MongoDB connection URI string to connect to your MongoDB instance or cluster.
For more information, see the Connect to MongoDB guide

Important

Avoid Exposing Your Authentication Credentials

To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.

Default: mongodb://localhost:27017
Accepted Values: A MongoDB connection URI string
max.num.retries
Type: int

Description:
The number of retries to attempt when encountering write errors to MongoDB.

Default: 1
Accepted Values: An integer
retries.defer.timeout
Type: int

Description:
Amount of time (in milliseconds) to defer a retry attempt.

Default: 5000
Accepted Values: An integer

Use the following configuration settings to specify to which MongoDB database and collection your sink connector writes data.

To view only the options related to specifying where the connector writes data, see the MongoDB Namespace Mapping Configuration Properties page.

Name
Description
database
Required

Type: string

Description:
The name of the MongoDB database to which the sink connector writes.

Accepted Values: A MongoDB database name
collection
Required

Type: string

Description:
The name of the MongoDB collection to which the sink connector writes. If your sink connector follows multiple topics, this is the default collection for any writes that are not otherwise specified.

Accepted Values: A MongoDB collection name

Use the following configuration settings to specify which Kafka topics the sink connector should watch for data.

To view only the options related to specifying Kafka topics, see the Kafka Topic Properties page.

Name
Description
topics
Required

Type: list

Description:
A list of Kafka topics that the sink connector watches.

Note

You can define either the topics or the topics.regex setting, but not both.

Accepted Values: A comma-separated list of valid Kafka topics
topics.regex
Required

Type: string

Description:
A regular expression that matches the Kafka topics that the sink connector watches.

Example

topics.regex=activity\\.\\w+\\.clicks$

This regex matches topic names such as "activity.landing.clicks" and "activity.support.clicks". It does not match the topic names "activity.landing.views" and "activity.clicks".

Note

You can define either the topics or the topics.regex setting, but not both.

Accepted Values: A valid regular expression pattern using java.util.regex.Pattern.

Use the settings on this page to configure the message processing behavior of the sink connector including the following:

  • Message batch size

  • Rate limits

  • Number of parallel tasks

To view only the options related to change data capture handlers, see the Connector Message Processing Properties page.

Name
Description
max.batch.size
Type: int

Description:
Maximum number of sink records to batch together for processing.

Consider the batch that contains the following records:
[ 1, 2, 3, 4, 5 ]
When set to 0, the connector performs a single bulk write for the entire batch.

When set to 1, the connector performs one bulk write for each record in the batch, for a total of five bulk writes as shown in the following example:
[1], [2], [3], [4], [5]
Default: 0
Accepted Values: An integer
rate.limiting.every.n
Type: int

Description:
Number of batches of records the sink connector processes in order to trigger the rate limiting timeout. A value of 0 means no rate limiting.

Default: 0
Accepted Values: An integer
rate.limiting.timeout
Type: int

Description:
How long (in milliseconds) to wait before the sink connector should resume processing after reaching the rate limiting threshold.

Default: 0
Accepted Values: An integer
tasks.max
Type: int

Description:
The maximum number of tasks to create for this connector. The connector may create fewer than the maximum tasks specified if it cannot handle the level of parallelism you specify.

Important

Multiple Tasks May Process Messages Out of Order

If you specify a value greater than 1, the connector enables parallel processing of the tasks. If your topic has multiple partition logs, which enables the connector to read from the topic in parallel, the tasks may process the messages out of order.

Default: 1
Accepted Values: An integer.

Use the following configuration settings to specify how the sink connector handles errors and to configure the dead letter queue.

To view only the options related to handling errors, see the Connector Error Handling Properties page.

Name
Description
errors.log.include.messages
Type: boolean

Description:
Whether the connector should include the invalid message when logging an error. An invalid message includes data such as record keys, values, and headers.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.name
Type: string

Description:
Name of topic to use as the dead letter queue. If blank, the connector does not send any invalid messages to the dead letter queue.
For more information about the dead letter queue, see the Dead Letter Queue Configuration Example.

Default: ""
Accepted Values: A valid Kafka topic name
errors.deadletterqueue.context.headers.enable
Type: boolean

Description:
Whether the connector should include context headers when it writes messages to the dead letter queue.
For more information about the dead letter queue, see the Dead Letter Queue Configuration Example.

Default: false
Accepted Values: true or false
errors.deadletterqueue.topic.replication.factor
Type: integer

Description:
The number of nodes on which to replicate the dead letter queue topic. If you are running a single-node Kafka cluster, you must set this to 1.
For more information about the dead letter queue, see the Dead Letter Queue Configuration Example.

Default: 3
Accepted Values: A valid number of nodes

Use the following configuration settings to specify how the sink connector should transform Kafka data before inserting it into MongoDB.

To view only the options related to post-processors, see the Sink Connector Post-processor Properties page.

Name
Description
post.processor.chain
Type: list

Description:
A list of post-processor classes the connector should apply to process the data before saving it to MongoDB.

Tip

See also:

For more information on post-processors and examples of their usage, see the section on Post-processors.


Default:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Accepted Values: A comma-separated list of fully qualified Java class names
field.renamer.mapping
Type: string

Description:
A list of field name mappings for key and value fields. Define the mappings in an inline JSON array in the following format:
[ { "oldName":"key.fieldA", "newName":"field1" }, { "oldName":"value.xyz", "newName":"abc" } ]
Default: []
Accepted Values: A valid JSON array
field.renamer.regexp
Type: string

Description:
A list of field name mappings for key and value fields using regular expressions. Define the mappings in an inline JSON array in the following format:
[ {"regexp":"^key\\\\..*my.*$", "pattern":"my", "replace":""}, {"regexp":"^value\\\\..*$", "pattern":"\\\\.", "replace":"_"} ]
Default: []
Accepted Values: A valid JSON array
key.projection.list
Type: string

Description:
A list of field names the connector should include in the key projection.

Default: ""
Accepted Values: A comma-separated list of field names
key.projection.type
Type: string

Description:
The key projection type the connector should use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
value.projection.list
Type: string

Description:
A list of field names the connector should include in the value projection.

Default: ""
Accepted Values: A comma-separated list of field names
value.projection.type
Type: string

Description:
The type of value projection the connector should use.

Default: none
Accepted Values: none, BlockList, or AllowList (Deprecated: blacklist, whitelist)
writemodel.strategy
Type: string

Description:
The class that specifies the WriteModelStrategy the connector should use for Bulk Writes.

Tip

See also:

For information on how to create your own strategy, see Custom Write Model Strategies.


Default:
com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy
Accepted Values: A fully qualified Java class name

Use the following configuration settings to specify how the sink connector should determine the _id value for each document it writes to MongoDB.

To view only the options related to determining the _id field of your documents, see the Sink Connector Id Strategy Properties page.

Name
Description
document.id.strategy
Type: string

Description:
The class the connector should use to generate a unique _id field.

Default:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existing
Type: boolean

Description:
Whether the connector should overwrite existing values in the _id field when it applies the strategy defined by the document.id.strategy property.

Default: false
Accepted Values: true or false
document.id.strategy.uuid.format
Type: string

Description:
Whether the connector should output the UUID in the _id field in string format or in BsonBinary format.

Default: string
Accepted Values: string or binary
delete.on.null.values
Type: boolean

Description:
Whether the connector should delete documents when the key value matches a document in MongoDB and the value field is null. This setting applies when you specify an id generation strategy that operates on the key document such as FullKeyStrategy, PartialKeyStrategy, and ProvidedInKeyStrategy.

Default: false
Accepted Values: true or false

Use the strategies in the following table to specify how the sink connector writes data into MongoDB. You can specify a write strategy with the following configuration:

writemodel.strategy=<a writemodel strategy>

To view only the options related to write model strategies, see the Sink Connector Write Model Strategies page.

Name
Description
DefaultWriteModelStrategy

Description:
This strategy uses the ReplaceOneDefaultStrategy by default, and the InsertOneDefaultStrategy if you set the timeseries.timefield option.

This is the default value for the writemodel.strategy configuration setting.
ReplaceOneDefaultStrategy

Description:
Replaces at most one document in MongoDB that matches a sink record by the _id field. If no documents match, insert the sink record as a new document.
Apply the following configuration to your sink connector to specify this setting:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
ReplaceOneBusinessKeyStrategy

Description:
Replaces at most one document that matches a sink record by a specified business key. If no documents match, insert the sink record as a new document.
Apply the following configuration to your sink connector to specify this setting:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
To see an example showing how to use this strategy, see our guide on write model strategies.
DeleteOneDefaultStrategy

Description:
Deletes at most one document that matches your sink connector's key structure by the _id field only when the document contains a null value structure.
This is implicitly specified when you set mongodb.delete.on.null.values=true.
You can set this explicitly with the following configuration:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy
UpdateOneTimestampsStrategy

Description:
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.
Apply the following configuration to your sink connector to specify this setting:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
To see an example showing how to use this strategy, see our guide on write model strategies.
UpdateOneBusinessKeyTimestampStrategy

Description:
Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents that match a business key.
Apply the following configuration to your sink connector to specify this setting:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy

Use the following sink connector configuration settings to override global or default property settings for specific topics.

To view only the options related to overriding topic settings, see the Topic Override Properties page.

Name
Description
topic.override.<topicName>.<propertyName>
Type: string

Description:
Specify a topic and property name to override the corresponding global or default property setting.

Example

The topic.override.foo.collection=bar setting instructs the sink connector to store data from the foo topic in the bar collection.

Note

You can specify any valid configuration setting in the <propertyName> segment on a per-topic basis except connection.uri and topics.

Default: ""
Accepted Values: Accepted values specific to the overridden property

Use the following configuration settings to specify a class the sink connector uses to process change data capture (CDC) events.

See the guide on Sink Connector Change Data Capture for examples using the built-in ChangeStreamHandler and Debezium event producers.

To view only the options related to change data capture handlers, see the Change Data Capture Properties page.

Name
Description
change.data.capture.handler
Type: string

Description:
The class name of the CDC handler to use for converting changes into event streams.

Default: ""
Accepted Values: An empty string or a fully qualified Java class name
←  Change Data Capture PropertiesFundamentals →