For AI agents: a documentation index is available at https://www.mongodb.com/docs/llms.txt — markdown versions of all pages are available by appending .md to any URL path.
On this page, you can view all available configuration properties for your MongoDB Kafka sink connector. This page duplicates the content of the other sink connector configuration properties pages.
IMPORTANT: To avoid exposing your authentication credentials in your connection.uri setting, use a ConfigProvider and set the appropriate configuration parameters.
Default: mongodb://localhost:27017 Accepted Values: A MongoDB connection URI string
server.api.version
Type: string
Description: The Stable API version you want to use with your MongoDB
server. For more information on the Stable API and versions of
the server that support it, see the Stable API
MongoDB server manual guide.
Default: "" Accepted Values: An empty string or a valid Stable API version.
server.api.deprecationErrors
Type: boolean
Description: When set to true, if the connector calls a command on your
MongoDB instance that's deprecated in the declared Stable API
version, it raises an exception.
You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.
Default: false Accepted Values: true or false
server.api.strict
Type: boolean
Description: When set to true, if the connector calls a command on your
MongoDB instance that's not covered in the declared Stable API
version, it raises an exception.
You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.
Default: false Accepted Values: true or false
MongoDB Namespace
Use the following configuration settings to specify which MongoDB database and collection that your MongoDB Kafka sink connector writes data to. You can use the default DefaultNamespaceMapper or specify a custom class.
Description: The fully-qualified class name of the class that specifies which
database or collection in which to sink the data. The default
DefaultNamespaceMapper uses values specified in the
database and collection properties.
The connector includes an alternative class for specifying the database and collection called FieldPathNamespaceMapper. See FieldPathNamespaceMapper Settings for more information.
Accepted Values: A fully qualified Java class name of a class that implements the NamespaceMapper interface.
database
Required
Type: string
Description: The name of the MongoDB database to which the sink connector writes.
Accepted Values: A MongoDB database name
collection
Type: string
Description: The name of the MongoDB collection to which the sink connector
writes. If your sink connector follows multiple topics, this
is the default collection for any writes that are not otherwise
specified.
Default: The topic name.
Accepted Values: A MongoDB collection name
FieldPathNamespaceMapper Settings
If you configure your sink connector to use the FieldPathNamespaceMapper, you can specify which database and collection to sink a document based on the data's field values.
To enable this mapping behavior, set your sink connector namespace.mapper configuration property to the fully-qualified class name as shown below:
The FieldPathNamespaceMapper requires you to specify the following settings:
One or both mapping properties to a database and collection
One of the key or value mappings to a database
One of the key or value mappings to a collection
You can use the following settings to customize the behavior of the FieldPathNamespaceMapper:
Name
Description
namespace.mapper.key.database.field
Type: string
Description: The name of the key document field that specifies the name of the
database in which to write.
namespace.mapper.key.collection.field
Type: string
Description: The name of the key document field that specifies the name of the
collection in which to write.
namespace.mapper.value.database.field
Type: string
Description: The name of the value document field that specifies the name of the
database in which to write.
namespace.mapper.value.collection.field
Type: string
Description: The name of the value document field that specifies the name of the
collection in which to write.
namespace.mapper.error.if.invalid
Type: boolean
Description: Whether to throw an exception when either the document is missing the
mapped field or it has an invalid BSON type.
When set to true, the connector does not process documents
missing the mapped field or that contain an invalid BSON type.
The connector may halt or skip processing depending on the related
error-handling configuration settings.
When set to false, if a document is missing the mapped field or
if it has an invalid BSON type, the connector defaults to
writing to the specified database and collection settings.
Default: false Accepted Values: true or false
Connector Topic
Use the following configuration settings to specify which Kafka topics the MongoDB Kafka sink connector should watch for data.
Description: A list of Kafka topics that the sink connector watches.
You can define either the topics or the topics.regex setting, but not both.
Accepted Values: A comma-separated list of valid Kafka topics
topics.regex
Required
Type: string
Description: A regular expression that matches the Kafka topics that the sink
connector watches.
For example, the following regex matches topic names such as "activity.landing.clicks" and "activity.support.clicks". It does not match the topic names "activity.landing.views" and "activity.clicks".
topics.regex=activity\\.\\w+\\.clicks$
You can define either the topics or the topics.regex setting, but not both.
Accepted Values: A valid regular expression pattern using java.util.regex.Pattern.
Connector Message Processing
Use the settings on this page to configure the message processing behavior of the MongoDB Kafka sink connector including the following:
Description: Maximum number of sink records to batch together for processing.
Consider the batch that contains the following records:
[ 1, 2, 3, 4, 5 ]
When set to 0, the connector performs a single bulk write for
the entire batch.
When set to 1, the connector performs one bulk write for each
record in the batch, for a total of five bulk writes as shown in
the following example:
[1], [2], [3], [4], [5]
Default: 0 Accepted Values: An integer
bulk.write.ordered
Type: boolean
Description: Whether the connector writes a batch of records
as an ordered or unordered bulk write operation.
When set to true, the default value, the
connector writes a batch of records as an ordered bulk write
operation.
Description: Number of batches of records the sink connector processes in
order to trigger the rate limiting timeout. A value of 0 means no
rate limiting.
Default: 0 Accepted Values: An integer
rate.limiting.timeout
Type: int
Description: How long (in milliseconds) to wait before the sink connector
should resume processing after reaching the rate limiting
threshold.
Default: 0 Accepted Values: An integer
tasks.max
Type: int
Description: The maximum number of tasks to create for this connector. The
connector may create fewer than the maximum tasks specified if it
cannot handle the level of parallelism you specify.
IMPORTANT: If you specify a value greater than 1,
the connector enables parallel processing of the tasks.
If your topic has multiple partition logs, which enables
the connector to read from the topic in parallel,
the tasks may process the messages out of order.
Default: 1 Accepted Values: An integer
Connector Error Handling
Use the following configuration settings to specify how the MongoDB Kafka sink connector handles errors and to configure the dead letter queue.
Description: Whether to continue processing messages if the connector encounters
an error. Allows the connector to override the errors.tolerance
Kafka cluster setting.
When set to none, the connector reports any error and
blocks further processing of the rest of the messages.
When set to all, the connector ignores any problematic messages.
When set to data, the connector tolerates only data errors and
fails on all other errors.
To learn more about error handling strategies, see the
Handle Errors page.
This property overrides the errors.tolerance property of the Connect Framework.
Default: Inherits the value from the errors.tolerance
setting. Accepted Values: "none" or "all"
mongo.errors.log.enable
Type: boolean
Description: Whether the connector should write details of errors including
failed operations to the log file. The connector classifies
errors as "tolerated" or "not tolerated" using the
errors.tolerance or mongo.errors.tolerance settings.
When set to true, the connector logs both "tolerated" and
"not tolerated" errors. When set to false, the connector logs "not tolerated" errors.
This property overrides the errors.log.enable property of the Connect Framework.
Default:false Accepted Values: true or false
errors.log.include.messages
Type: boolean
Description: Whether the connector should include the invalid message when
logging an error. An invalid message includes data such as record
keys, values, and headers.
Default:false Accepted Values: true or false
errors.deadletterqueue.topic.name
Type: string
Description: Name of topic to use as the dead letter queue. If blank, the
connector does not send any invalid messages to the dead letter
queue.
To learn about the exceptions the connector defines and
reports through context headers, see
Bulk Write Exceptions.
Default:false Accepted Values: true or false
errors.deadletterqueue.topic.replication.factor
Type: integer
Description: The number of nodes on which to replicate the dead letter queue
topic. If you are running a single-node Kafka cluster, you must
set this to 1.
Description: A list of field name mappings for key and value fields using
regular expressions. Define the mappings in an inline JSON array
in the following format:
Accepted Values: A fully qualified Java class name
ID Strategy
Use the following configuration settings to specify how the MongoDB Kafka sink connector should determine the _id value for each document it writes to MongoDB.
Accepted Values: An empty string or a fully qualified Java class name
document.id.strategy.overwrite.existing
Type: boolean
Description: Whether the connector should overwrite existing values in the _id
field when it applies the strategy defined by the
document.id.strategy property.
Default: false Accepted Values: true or false
document.id.strategy.uuid.format
Type: string
Description: Whether the connector should output the UUID in the _id field
in string format or in
BsonBinary
format.
Default: string Accepted Values: string or binary
delete.on.null.values
Type: boolean
Description: Whether the connector should delete documents when the key value
matches a document in MongoDB and the value field is null.
This setting applies when you specify an id generation strategy that
operates on the key document such as FullKeyStrategy,
PartialKeyStrategy, and ProvidedInKeyStrategy.
Default: false Accepted Values: true or false
Write Model Strategy
You can set configuration properties to specify how the MongoDB Kafka sink connector writes data into MongoDB. The following sections describe the configuration properties that you can set to customize this behavior.
Write Model
Set the writemodel.strategy configuration property to specify how the sink connector writes data when it receives a sink record.
You can set the value of writemodel.strategy to any of the fully qualified class names of the write model strategies described in the Strategies section of this page. You can specify a strategy by setting the following configuration:
writemodel.strategy=<a write model strategy>
Delete Write Model
Set the delete.writemodel.strategy configuration property to specify how the sink connector writes data when it receives a tombstone event. A tombstone event is a record that contains a key but no value, which signifies a deleted record.
You can set the value of delete.writemodel.strategy to any of the fully qualified class names of the write model strategies described in the Strategies section of this page. You can specify a strategy by setting the following configuration:
delete.writemodel.strategy=<a write model strategy>
Description: Replaces at most one document in MongoDB that matches a sink
record by the _id field. If no documents match, the
connector inserts the sink record as a new document. To specify this strategy, set the configuration property to the
following class name:
Description: Replaces at most one document that matches a sink record by a
specified business key. If no documents match, the
connector inserts the sink record as a new document. To specify this strategy, set the configuration property to the
following class name:
Description: Deletes at most one document that matches your sink connector's
key structure by the _id field only when the document
contains a null value structure.
This is the default value for the
delete.writemodel.strategy configuration property.
This strategy is set as the default value of the
writemodel.strategy property when you set
delete.on.null.values=true. To specify this strategy, set the configuration property to the
following class name:
Description: Deletes at most one MongoDB document that matches a sink
record by a business key. This strategy requires a
valueDoc for key generation. To leverage the
PartialKeyStrategy for key generation, use the DeleteOneTombstoneBusinessKeyStrategy.
To specify this strategy, set the configuration property to the
following class name:
Description: Deletes at most one MongoDB document that matches a sink
record by a business key. This strategy leverages the
PartialKeyStrategy to create the key used for deletion.
To specify this strategy, set the configuration property to the
following class name:
Description: Updates at most one document in MongoDB that matches a sink
record by the _id field. If no documents match, the
connector inserts the sink record as a new document. To specify this strategy, set the configuration property to the
following class name:
Description: Add _insertedTS (inserted timestamp) and _modifiedTS (modified timestamp) fields into documents. To specify this strategy, set the configuration property to the
following class name:
Description: Add _insertedTS (inserted timestamp) and _modifiedTS
(modified timestamp) fields into documents that match a business
key. To specify this strategy, set the configuration property to the
following class name:
Use the following MongoDB Kafka sink connector configuration settings to override global or default property settings for specific topics.
To view only the options related to overriding topic settings, see the Topic Override Properties page.
Name
Description
topic.override.<topicName>.<propertyName>
Type: string
Description: Specify a topic and property name to override the corresponding
global or default property setting.
For example, the topic.override.foo.collection=bar setting instructs the sink connector to store data from the foo topic in the bar collection.
You can specify any valid configuration setting in the <propertyName> segment on a per-topic basis except connection.uri and topics.
Default: "" Accepted Values: Accepted values specific to the overridden property
Change Data Capture
Use the following configuration settings to specify a class the MongoDB Kafka sink connector uses to process change data capture (CDC) events.
See the guide on Change Data Capture Handlers for examples using the built-in ChangeStreamHandler and handlers for the Debezium and Qlik Replicate event producers.
Description: The class name of the CDC handler to use for converting changes
into event streams. See
Available CDC Handlers
for a list of CDC handlers.
Default: "" Accepted Values: An empty string or a fully qualified Java
class name
Time Series
Use the following configuration settings to specify how the MongoDB Kafka sink connector should sink data to a MongoDB time series collection.
Description: The name of the top-level field in the source data that contains time
information that you want to associate with the new document in the
time series collection.
Default: "" Accepted Values: An empty string or the name of a field
that contains a BSON DateTime value
timeseries.timefield.auto.convert.date.format
Type: string
Description: The date format pattern the connector should use to convert the
source data contained in the field specified by the
timeseries.timefield setting.
If the date value from the source data only contains date information,
the connector sets the time information to the start of the specified
day. If the date value does not contain the timezone offset, the
connector sets the offset to UTC.
Description: Whether to convert the data in the field into the BSON Date
format.
When set to true, the connector uses the milliseconds
after epoch and discards fractional parts if the value is
a number. If the value is a string, the connector uses the
setting in the following configuration to parse the date:
timeseries.timefield.auto.convert.date.format
If the connector fails to convert the value, it sends the
original value to the time series collection.
Description: Which DateTimeFormatter locale language tag to use with the date
format pattern (e.g. "en-US").
To learn more about locales, see the Java SE documentation of Locale.
Default: ROOT Accepted Values: A valid Locale language tag format
timeseries.metafield
Type: string
Description: Which top-level field to read from the source data to describe
a group of related time series documents.
IMPORTANT: This field must not be the _id field nor the field you specified
in the timeseries.timefield setting.
Default: "" Accepted Values: An empty string or the name of a field
that contains any BSON type except BsonArray.
timeseries.expire.after.seconds
Type: int
Description: The number of seconds MongoDB should wait before automatically
removing the time series collection data. The connector disables
timed expiry when the setting value is less than 1.