Docs Home → MongoDB Kafka Connector
All Source Connector Configuration Properties
On this page
Overview
On this page, you can view all available configuration properties for your MongoDB Kafka source connector. This page duplicates the content of the other source connector configuration properties pages.
To view a list of all source connector configuration properties pages, see the Source Connector Configuration Properties page.
MongoDB Connection
Use the following configuration settings to specify how your source connector establishes a connection and communicates with your MongoDB cluster.
To view only the options related to your MongoDB connection, see the MongoDB Source Connection Properties page.
Name | Description |
---|---|
connection.uri | Required Type: string Description: The URI connection string
to connect to your MongoDB instance or cluster. For more information, see the Connect to MongoDB guide ImportantAvoid Exposing Your Authentication CredentialsTo avoid exposing your authentication credentials in your
Default: mongodb://localhost:27017,localhost:27018,localhost:27019 Accepted Values: A MongoDB URI connection string |
database | Type: string Description: Name of the database to watch for changes. If not set, the connector
watches all databases for changes. Default: "" Accepted Values: A single database name |
collection | Type: string Description: Name of the collection in the database to watch for changes. If not
set, the connector watches all collections for changes. ImportantIf your Default: "" Accepted Values: A single collection name |
server.api.version | Type: string Description: The Stable API version you want to use with your MongoDB
cluster. For more information on the Stable API and versions of
MongoDB Server that support it, see the Stable API
guide. Default: "" Accepted Values: An empty string or a valid Stable API version. |
server.api.deprecationErrors | Type: boolean Description: When set to true , if the connector calls a command on your
MongoDB instance that's deprecated in the declared Stable API
version, it raises an exception.You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.Default: false Accepted Values: true or false |
server.api.strict | Type: boolean Description: When set to true , if the connector calls a command on your
MongoDB instance that's not covered in the declared Stable API
version, it raises an exception.You can set the API version with the server.api.version
configuration option. For more information on the Stable API, see
the MongoDB manual entry on the
Stable API.Default: false Accepted Values: true or false |
Kafka Topic
Use the following configuration settings to specify which Kafka topics the source connector should publish data to.
To view only the options related to your Kafka topic, see the Kafka Topic Properties page.
Name | Description | ||||
---|---|---|---|---|---|
topic.prefix | Type: string Description: Specifies the first part of the destination Kafka
topic name to which the connector publishes change stream events.
The destination topic name is composed of the topic.prefix
value followed by the database and collection names, separated by the value
specified in the topic.separator property.TipSee also:Default: "" Accepted Values: A string composed of ASCII alphanumeric
characters including ".", "-", and "_" | ||||
topic.suffix | Type: string Description: Specifies the last part of the destination Kafka
topic name to which the connector publishes change stream events.
The destination topic name is composed of the database and
collection names followed by the topic.suffix value,
separated by the value specified in the topic.separator property.TipSee also:Default: "" Accepted Values: A string composed of ASCII alphanumeric
characters including ".", "-", and "_" | ||||
topic.namespace.map | Type: string Description: Specifies a JSON mapping between change stream document
namespaces
and topic names. ExampleThe following mapping instructs the connector to perform the following actions:
The following examples show which topic the connector sends different change stream documents to using the preceding mapping:
You can use the "*" wildcard character to match change stream
document namespaces.ExampleThe following mapping instructs the connector to publish all change
stream documents to the
Default: "" Accepted Values: A valid JSON object | ||||
topic.separator | Type: string Description: Specifies the string the connector uses to concatenate the values used
to create the name of your topic. The connector publishes records to a
topic with a name formed by concatenating the values of the following fields
in the following order:
ExampleThe following configuration instructs the connector to publish
change stream documents from the
ImportantTopic Separator and Topic Namespace MapWhen you use the Default: "." Accepted Values: A string | ||||
topic.mapper | Type: string Description: The Java class that defines your custom topic mapping logic. Default: com.mongodb.kafka.connect.source.topic.mapping.DefaultTopicMapper Accepted Values: Valid full class name of an implementation
of the TopicMapper
class. |
Change Streams
Use the following configuration settings to specify aggregation pipelines for change streams and read preferences for change stream cursors.
To view only the options related to change streams, see the Change Stream Properties page.
Name | Description | |
---|---|---|
pipeline | Type: string Description: An array of aggregation pipelines to run in your change stream.
You must configure this setting for the change stream
event document, not the fullDocument field.Example
TipDefault: "[]" Accepted Values: Valid aggregation pipeline stage | |
change.stream.full.document | Type: string Description: Determines what values your change stream returns on update
operations. The default setting returns the differences between the original
document and the updated document.The updateLookup setting returns the differences between the
original document and updated document as well as a copy of the
entire updated document at a point in time after the update.TipFor more information on how this change stream option works, see the MongoDB server manual guide on Lookup Full Document for Update Operations. Default: "" Accepted Values: "" or "default" or "updateLookup" | |
publish.full.document.only | Type: boolean Description: Whether to return only the fullDocument field from the
change stream event document produced by any update event. The fullDocument field
contains the most current version of the updated document. To
learn more about the fullDocument field, see the Server
manual page on update events.When set to true , the connector overrides the
change.stream.full.document setting and sets it to
updateLookup so that the fullDocument field contains
updated documents.Default: false Accepted Values: true or false | |
collation | Type: string Description: A JSON collation document
that specifies language-specific ordering rules that MongoDB
applies to the documents returned by the change stream. Default: "" Accepted Values: A valid collation JSON document | |
batch.size | Type: int Description: The change stream cursor batch size. Default: 0 Accepted Values: An integer | |
poll.await.time.ms | Type: long Description: The amount of time in milliseconds to wait before checking the change
stream cursor for new results. Default: 5000 Accepted Values: An integer | |
poll.max.batch.size | Type: int Description: Maximum number of documents to read in a single batch when polling
a change stream cursor for new data. You can use this setting to
limit the amount of data buffered internally in the connector. Default: 1000 Accepted Values: An integer |
Output Format
Use the following configuration settings to specify the format of data the source connector publishes to Kafka topics.
To view only the options related to the format of your output, see the Output Format Properties page.
Name | Description | |||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
output.format.key | Type: string Description: Specifies which data format the source connector outputs the key
document. Default: json Accepted Values: bson , json , schema | |||||||||||||||||||||||||||||||
output.format.value | Type: string Description: Specifies which data format the source connector outputs the value
document. Default: json Accepted Values: bson , json , schema NoteProtobuf Output Format SupportThe connector supports Protobuf as an
output data format. You can enable this format by specifying the
| |||||||||||||||||||||||||||||||
output.json.formatter | Type: string Description: Class name of the JSON formatter the connector should use to
output data. Default:
Accepted Values: Your custom JSON formatter full class name or one of the
following built-in formatter class names:
To learn more about these output formats, see JSON Formatters. | |||||||||||||||||||||||||||||||
output.schema.key | Type: string Description: Specifies an AVRO schema definition for the key document of the
SourceRecord. TipSee also:For more information on AVRO schema, see the Data Formats guide. Default:
Accepted Values: A valid AVRO schema | |||||||||||||||||||||||||||||||
output.schema.value | Type: string Description: Specifies an AVRO schema definition for the value document of the
SourceRecord. TipSee also:For more information on AVRO schema, see the Data Formats guide. Default:
Accepted Values: A valid JSON schema | |||||||||||||||||||||||||||||||
output.schema.infer.value | Type: boolean Description: Whether the connector should infer the schema for the value
document of the SourceRecord.
Since the connector processes each document in isolation, the
connector may generate many schemas. ImportantThe connector only reads this setting when you set your
Default: false Accepted Values: true or false |
Copy Existing
Use the following configuration settings to enable the copy existing feature which converts MongoDB collections into Change Stream events.
To view only the options related to copying data, see the Copy Existing Properties page.
Name | Description | |
---|---|---|
copy.existing | Type: boolean Description: Whether to enable the copy existing feature which converts all
data in a MongoDB collection to Change Stream events and
publishes them on Kafka topics. If MongoDB changes the source
collection data after the connector starts the copy process, the
connector creates events for the changes after it completes the copy
process. NoteData Copy Can Produce Duplicate EventsIf any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent. Default: false Accepted Values: true or false | |
copy.existing.namespace.regex | Type: string Description: Regular expression the connector uses to match namespaces from
which to copy data. A namespace describes the MongoDB database name
and collection separated by a period, e.g.
databaseName.collectionName .ExampleIn the following example, the regular expression setting matches collections that start with "page" in the "stats" database.
The "\" character in the example above escapes the "." character that follows it in the regular expression. For more information on how to build regular expressions, see the Java API documentation on Patterns. Default: "" Accepted Values: A valid regular expression | |
copy.existing.pipeline | Type: string Description: An array of pipeline operations
the connector runs when copying existing data. You can use this
setting to filter the source collection and improve the use of
indexes in the copying process. ExampleThe following example shows how you can use the $match
aggregation operator to instruct the connector to copy only
documents that contain a
Default: [] Accepted Values: Valid aggregation pipeline stages | |
copy.existing.max.threads | Type: int Description: The maximum number of threads the connector can use to copy data. Default: number of processors available in the environment Accepted Values: An integer | |
copy.existing.queue.size | Type: int Description: The size of the queue the connector can use when copying data. Default: 16000 Accepted Values: An integer | |
copy.existing.allow.disk.use | Type: boolean Description: When set to true ,
the connector sets the copy existing aggregation
to use temporary disk storage.Default: true |
Error Handling and Resuming from Interruption
Use the following configuration settings to specify how the source connector behaves when it encounters errors and to specify settings related to resuming interrupted reads.
To view only the options related to handling errors, see the Error Handling and Resuming from Interruption Properties page.
Name | Description |
---|---|
mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to "none" if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to "all" if you want the connector to continue
processing messages and ignore any errors it encounters.ImportantThis property overrides the errors.tolerance Connect Framework property. Default: "none" Accepted Values: "none" or "all" |
mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to true to log all errors the connector encounters.Set this to false to log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using the errors.tolerance or mongo.errors.tolerance
setting.ImportantThis property overrides the errors.log.enable Connect Framework property. Default: false Accepted Values: true or false |
mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. ImportantYou must set Default: "" Accepted Values: A valid Kafka topic name |
offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave
this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see the Reset Stored Offsets
guide. Default: "" Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. |
heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. Heartbeat messages contain a postBatchResumeToken data field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.This mechanism improves resumability of the connector for low volume
namespaces. See the Invalid Resume Token
page for more information on this feature. Set this to 0 to disable heartbeat messages.Default: 0 Accepted Values: An integer |
heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms setting to enable this feature.Default: __mongodb_heartbeats Accepted Values: A valid Kafka topic name |