Docs Home → MongoDB Kafka Connector
All Sink Connector Configuration Properties
On this page
Overview
On this page, you can view all available configuration properties for your MongoDB Kafka sink connector. This page duplicates the content of the other sink connector configuration properties pages.
To view a list of all sink connector configuration properties pages, see the Sink Connector Configuration Properties page.
MongoDB Connection
Use the following configuration settings to specify how your sink connector connects and communicates with your MongoDB cluster.
To view only the options related to configuring your MongoDB connection, see the MongoDB Connection Configuration Properties page.
Name | Description |
---|---|
connection.uri | Required Type: string Description: The MongoDB connection URI string
to connect to your MongoDB instance or cluster. For more information, see the Connect to MongoDB guide ImportantAvoid Exposing Your Authentication CredentialsTo avoid exposing your authentication credentials in your
Default: mongodb://localhost:27017 Accepted Values: A MongoDB connection URI string |
max.num.retries | Type: int Description: The number of retries to attempt when encountering write errors to MongoDB. Default: 1 Accepted Values: An integer |
retries.defer.timeout | Type: int Description: Amount of time (in milliseconds) to defer a retry attempt. Default: 5000 Accepted Values: An integer |
MongoDB Namespace
Use the following configuration settings to specify to which MongoDB database and collection your sink connector writes data.
To view only the options related to specifying where the connector writes data, see the MongoDB Namespace Mapping Configuration Properties page.
Name | Description |
---|---|
database | Required Type: string Description: The name of the MongoDB database to which the sink connector writes. Accepted Values: A MongoDB database name |
collection | Required Type: string Description: The name of the MongoDB collection to which the sink connector
writes. If your sink connector follows multiple topics, this
is the default collection for any writes that are not otherwise
specified. Accepted Values: A MongoDB collection name |
Connector Topic
Use the following configuration settings to specify which Kafka topics the sink connector should watch for data.
To view only the options related to specifying Kafka topics, see the Kafka Topic Properties page.
Name | Description | |
---|---|---|
topics | Required Type: list Description: A list of Kafka topics that the sink connector watches. NoteYou can define either the Accepted Values: A comma-separated list of valid Kafka topics | |
topics.regex | Required Type: string Description: A regular expression that matches the Kafka topics that the sink
connector watches. Example
This regex matches topic names such as "activity.landing.clicks" and "activity.support.clicks". It does not match the topic names "activity.landing.views" and "activity.clicks". NoteYou can define either the Accepted Values: A valid regular expression pattern using java.util.regex.Pattern . |
Connector Message Processing
Use the settings on this page to configure the message processing behavior of the sink connector including the following:
Message batch size
Rate limits
Number of parallel tasks
To view only the options related to change data capture handlers, see the Connector Message Processing Properties page.
Name | Description | ||
---|---|---|---|
max.batch.size | Type: int Description: Maximum number of sink records to batch together for processing. Consider the batch that contains the following records:
When set to 0 , the connector performs a single bulk write for
the entire batch.When set to 1 , the connector performs one bulk write for each
record in the batch, for a total of five bulk writes as shown in
the following example:
Default: 0 Accepted Values: An integer | ||
rate.limiting.every.n | Type: int Description: Number of batches of records the sink connector processes in
order to trigger the rate limiting timeout. A value of 0 means no
rate limiting. Default: 0 Accepted Values: An integer | ||
rate.limiting.timeout | Type: int Description: How long (in milliseconds) to wait before the sink connector
should resume processing after reaching the rate limiting
threshold. Default: 0 Accepted Values: An integer | ||
tasks.max | Type: int Description: The maximum number of tasks to create for this connector. The
connector may create fewer than the maximum tasks specified if it
cannot handle the level of parallelism you specify. ImportantMultiple Tasks May Process Messages Out of OrderIf you specify a value greater than Default: 1 Accepted Values: An integer. |
Post Processors
Use the following configuration settings to specify how the sink connector should transform Kafka data before inserting it into MongoDB.
To view only the options related to post-processors, see the Sink Connector Post-processor Properties page.
Name | Description | |
---|---|---|
post.processor.chain | Type: list Description: A list of post-processor classes the connector should apply to
process the data before saving it to MongoDB. TipSee also:For more information on post-processors and examples of their usage, see the section on Post-processors. Default:
Accepted Values: A comma-separated list of fully qualified Java class names | |
field.renamer.mapping | Type: string Description: A list of field name mappings for key and value fields. Define
the mappings in an inline JSON array in the following format:
Default: [] Accepted Values: A valid JSON array | |
field.renamer.regexp | Type: string Description: A list of field name mappings for key and value fields using
regular expressions. Define the mappings in an inline JSON array
in the following format:
Default: [] Accepted Values: A valid JSON array | |
key.projection.list | Type: string Description: A list of field names the connector should include in the key
projection. Default: "" Accepted Values: A comma-separated list of field names | |
key.projection.type | Type: string Description: The key projection type the connector should use. Default: none Accepted Values: none , BlockList , or AllowList (Deprecated: blacklist, whitelist) | |
value.projection.list | Type: string Description: A list of field names the connector should include in the value
projection. Default: "" Accepted Values: A comma-separated list of field names | |
value.projection.type | Type: string Description: The type of value projection the connector should use. Default: none Accepted Values: none , BlockList , or AllowList (Deprecated: blacklist, whitelist) | |
writemodel.strategy | Type: string Description: The class that specifies the WriteModelStrategy the connector should
use for Bulk Writes.TipSee also:For information on how to create your own strategy, see Custom Write Model Strategies. Default:
Accepted Values: A fully qualified Java class name |
ID Strategy
Use the following configuration settings to specify how the sink connector
should determine the _id
value for each document it writes to MongoDB.
To view only the options related to determining the _id
field of your
documents, see the Sink Connector Id Strategy Properties page.
Name | Description | |
---|---|---|
document.id.strategy | Type: string Description: The class the connector should use to generate a unique _id field.Default:
Accepted Values: An empty string or a fully qualified Java class name | |
document.id.strategy.overwrite.existing | Type: boolean Description: Whether the connector should overwrite existing values in the _id
field when it applies the strategy defined by the
document.id.strategy property.Default: false Accepted Values: true or false | |
document.id.strategy.uuid.format | Type: string Description: Whether the connector should output the UUID in the _id field
in string format or in
BsonBinary
format.Default: string Accepted Values: string or binary | |
delete.on.null.values | Type: boolean Description: Whether the connector should delete documents when the key value
matches a document in MongoDB and the value field is null. This
setting applies when you specify an id generation strategy that
operates on the key document such as FullKeyStrategy ,
PartialKeyStrategy , and ProvidedInKeyStrategy .Default: false Accepted Values: true or false |
Write Model Strategy
Use the strategies in the following table to specify how the sink connector writes data into MongoDB. You can specify a write strategy with the following configuration:
writemodel.strategy=<a writemodel strategy>
To view only the options related to write model strategies, see the Sink Connector Write Model Strategies page.
Name | Description | |
---|---|---|
DefaultWriteModelStrategy | Description: This strategy uses the ReplaceOneDefaultStrategy by default, and the InsertOneDefaultStrategy if you set the timeseries.timefield option.This is the default value for the writemodel.strategy configuration setting. | |
ReplaceOneDefaultStrategy | Description: Replaces at most one document in MongoDB that matches a sink record by the _id field. If no documents match, insert the sink record as a new document.Apply the following configuration to your sink connector to specify this setting:
| |
ReplaceOneBusinessKeyStrategy | Description: Replaces at most one document that matches a sink record by a specified business key. If no documents match, insert the sink record as a new document. Apply the following configuration to your sink connector to specify this setting:
To see an example showing how to use this strategy, see our guide on write model strategies. | |
DeleteOneDefaultStrategy | Description: Deletes at most one document that matches your sink connector's key structure by the _id field only when the document contains a null value structure.This is implicitly specified when you set mongodb.delete.on.null.values=true .You can set this explicitly with the following configuration:
| |
UpdateOneTimestampsStrategy | Description: Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents.Apply the following configuration to your sink connector to specify this setting:
To see an example showing how to use this strategy, see our guide on write model strategies. | |
UpdateOneBusinessKeyTimestampStrategy | Description: Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents that match a business key.Apply the following configuration to your sink connector to specify this setting:
|
Topic Override
Use the following sink connector configuration settings to override global or default property settings for specific topics.
To view only the options related to overriding topic settings, see the Topic Override Properties page.
Name | Description |
---|---|
topic.override.<topicName>.<propertyName> | Type: string Description: Specify a topic and property name to override the corresponding
global or default property setting. ExampleThe NoteYou can specify any valid configuration setting in the
Default: "" Accepted Values: Accepted values specific to the overridden property |
Change Data Capture
Use the following configuration settings to specify a class the sink connector uses to process change data capture (CDC) events.
See the guide on Sink Connector Change Data Capture
for examples using the built-in ChangeStreamHandler
and Debezium event
producers.
To view only the options related to change data capture handlers, see the Change Data Capture Properties page.
Name | Description |
---|---|
change.data.capture.handler | Type: string Description: The class name of the CDC handler to use for converting changes
into event streams. Default: "" Accepted Values: An empty string or a fully qualified Java
class name |