Docs Menu
Docs Home
/
MongoDB Kafka Connector
/ /

Copy Existing Data

On this page

  • Examples
  • Copy and Filter Collection Data
  • Copy Data From Multiple Sources

These usage examples demonstrate how to copy data from MongoDB to an Apache Kafka topic using the MongoDB Kafka source connector.

The following examples show how to configure your source connector to copy existing data from a single collection or from multiple collections.

Suppose you want to copy a MongoDB collection to Apache Kafka and filter some data.

Your requirements and your solutions are as follows:

Requirement
Solution

Copy the customers collection of the shopping database in your MongoDB deployment onto an Apache Kafka topic.

See the Copy Data section of this guide.

Only copy documents that have the value "Mexico" in the country field.

See the Filter Data section of this guide.

The customers collection contains the following documents:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

Copy the contents of the customers collection of the shopping database by specifying the following configuration options in your source connector:

database=shopping
collection=customers
startup.mode=copy_existing

Your source connector copies your collection by creating change event documents that describe inserting each document into your collection.

Note

Data Copy Can Produce Duplicate Events

If any system changes the data in the database while the source connector converts existing data from it, MongoDB may produce duplicate change stream events to reflect the latest changes. Since the change stream events on which the data copy relies are idempotent, the copied data is eventually consistent.

To learn more about change event documents, see the Change Streams guide.

To learn more about the startup.mode option, see Startup Properties.

You can filter data by specifying an aggregation pipeline in the startup.mode.copy.existing.pipeline option of your source connector configuration. The following configuration specifies an aggregation pipeline that matches all documents with "Mexico" in the country field:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

To learn more about the startup.mode.copy.existing.pipeline option, see Startup Properties.

To learn more about aggregation pipelines, see the following resources:

  • Customize a Pipeline to Filter Change Events Usage Example

  • Aggregation in the MongoDB manual.

Your final source connector configuration to copy the customers collection should look like this:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Once your connector copies your data, you see the following change event document corresponding to the preceding sample collection in the shopping.customers Apache Kafka topic:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

Note

Write the Data in your Topic into a Collection

Use a change data capture handler to convert change event documents in an Apache Kafka topic into MongoDB write operations. To learn more, see the Change Data Capture Handlers guide.

Suppose you want to copy data from another collection in the shopping database named products, which contains the following document:

{
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
}

You can copy from both the customers and products collections by using the startup.mode.copy.existing.namespace.regex configuration setting, as shown in the following code:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
startup.mode=copy_existing
startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$

In addition to the change event document in the shopping.customers Apache Kafka topic, described in the preceding section, you can see the following document in the shopping.products topic:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
},
"ns": { "db": "shopping", "coll": "products" }
}

Tip

To learn more about the startup.mode.copy.existing.namespace.regex setting, see the Settings table in the Startup Properties guide.

Back

Topic Naming