Docs Menu

Docs HomeDevelop ApplicationsMongoDB Kafka Connector

Startup Properties

On this page

  • Overview
  • Settings

Use the following configuration settings to configure startup of the MongoDB Kafka source connector to convert MongoDB collections into Change Stream events.

Tip

For an example using the copy existing feature, see the Copy Existing Data Usage Example.

For a list of source connector configuration settings organized by category, see the guide on Source Connector Configuration Properties.

Name
Description
startup.mode
Type: string

Description:
Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector gets from the source offset. If no source offset is available, the connector may either ignore all or some of the existing source data, or may at first copy all existing source data and then continue with processing new data.

If startup.mode=latest, the connector ignores all existing source data.

If startup.mode=timestamp, the connector actuates startup.mode.timestamp.* properties. If no properties are configured, timestamp is equivalent to latest.

If startup.mode=copy_existing, the connector copies all existing source data to Change Stream events. This setting is equivalent to the deprecated setting copy.existing=true.

Note

Data Copy Can Produce Duplicate Events

If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.

Default:latest
Accepted Values: latest, timestamp, copy_existing
startup.mode.timestamp.start.at.operation.time
Type: string

Description:
Actuated only if startup.mode=timestamp. Specifies the starting point for the change stream. To learn more about Change Stream parameters, see the Server manual entry.
Default: ""
Accepted Values:
  • An integer number of seconds since the Epoch in decimal format (for example, 30)

  • An instant in the ISO-8601 format with one second precision (for example, 1970-01-01T00:00:30Z)

  • A BSON Timestamp in the canonical extended JSON (v2) format (for example, {"$timestamp": {"t": 30, "i": 0}})

startup.mode.copy.existing.namespace.regex
Type: string

Description:
Regular expression the connector uses to match namespaces from which to copy data. A namespace describes the MongoDB database name and collection separated by a period (for example, databaseName.collectionName).

Example

In the following example, the regular-expression setting matches collections that start with "page" in the stats database.

startup.mode.copy.existing.namespace.regex=stats\.page.*

The "" character in the example above escapes the "." character that follows it in the regular expression. For more information on how to build regular expressions, see the Java API documentation on Patterns.

Default: ""
Accepted Values: A valid regular expression
startup.mode.copy.existing.pipeline
Type: string

Description:
An inline array of pipeline operations the connector runs when copying existing data. You can use this setting to filter the source collection and improve the use of indexes in the copying process.

Example

The following example shows how you can use the $match aggregation operator to instruct the connector to copy only documents that contain a closed field with a value of false.

startup.mode.copy.existing.pipeline=[ { "$match": { "closed": "false" } } ]
Default: ""
Accepted Values: Valid aggregation pipeline stages
startup.mode.copy.existing.max.threads
Type: int

Description:
The maximum number of threads the connector can use to copy data.
Default: number of processors available in the environment
Accepted Values: An integer
startup.mode.copy.existing.queue.size
Type: int

Description:
The size of the queue the connector can use when copying data.
Default: 16000
Accepted Values: An integer
startup.mode.copy.existing.allow.disk.use
Type: boolean

Description:
When set to true, the connector uses temporary disk storage for the copy existing aggregation.
Default: true
Accepted Values: true or false
← Output Format Properties

On this page