Docs Menu

Docs HomeMongoDB Kafka Connector

Copy Existing Data

This usage example demonstrates how to copy data from a MongoDB collection to an Apache Kafka topic using the MongoDB Kafka source connector.

Suppose you need to copy a MongoDB collection to Apache Kafka and filter some of the data.

Your requirements and your solutions are as follows:

Requirement
Solution
Copy the customers collection of the shopping database in your MongoDB deployment onto an Apache Kafka topic.
See the Copy Data section of this guide.
Only copy documents that have the value "Mexico" in the country field.
See the Filter Data section of this guide.

The customers collection contains the following documents:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

Copy the contents of the customers collection of the shopping database by specifying the following configuration options in your source connector:

database=shopping
collection=customers
startup.mode=copy_existing

Your source connector copies your collection by creating change event documents that describe inserting each document into your collection.

Note

Data Copy Can Produce Duplicate Events

If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.

To learn more about change event documents, see the Change Streams guide.

To learn more about the startup.mode option, see Startup Properties.

You can filter data by specifying an aggregation pipeline in the startup.mode.copy.existing.pipeline option of your source connector configuration. The following configuration specifies an aggregation pipeline that matches all documents with "Mexico" in the country field:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

To learn more about the startup.mode.copy.existing.pipeline option, see Startup Properties.

To learn more about aggregation pipelines, see the following resources:

Your final source connector configuration to copy the customers collection should look like this:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Once your connector copies your data, you see the following change event document corresponding to the preceding sample collection in the shopping.customers Apache Kafka topic:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

Note

Write the Data in your Topic into a Collection

Use a change data capture handler to convert change event documents in an Apache Kafka topic into MongoDB write operations. To learn more, see the Change Data Capture Handlers guide.

←  Topic NamingSpecify a Schema →