Docs Menu

Replicate Data with a Change Data Capture Handler

On this page

  • Overview
  • Replicate Data with a CDC Handler
  • Start Interactive Shells
  • Configure the Source Connector
  • Configure the Sink Connector
  • Monitor the Kafka Topic
  • Write Data into the Source and Watch the Data Flow
  • (Optional) Generate Additional Changes
  • Summary
  • Learn More

Follow this tutorial to learn how to use a change data capture (CDC) handler to replicate data with the MongoDB Kafka Connector. A CDC handler is an application that translates CDC events into MongoDB write operations. Use a CDC handler when you need to reproduce the changes in one datastore into another datastore.

In this tutorial, you configure and run MongoDB Kafka source and sink connectors to make two MongoDB collections contain the same documents using CDC. The source connector writes change stream data from the original collection to a Kafka topic and the sink connector writes the Kafka topic data to the target MongoDB collection.

If you want to learn more about how CDC handlers work, see the Change Data Capture Handlers guide.

Note
Before You Get Started

Before you start this tutorial, you must complete the steps in the Kafka Connector Tutorial Setup.

1

Start two interactive shells on the Docker container in separate windows. In the tutorial, you can use the shells to run and observe different tasks.

Run the following command in the terminal to start an interactive shell called CDCShell1 in one terminal window:

This command starts an interactive shell called CDCShell1
docker run --rm --name CDCShell1 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash

Run the following command in the terminal to start an interactive shell called CDCShell2 in the other terminal window:

This command starts an interactive shell called CDCShell2
docker run --rm --name CDCShell2 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash

Arrange the two windows on your screen to keep both of them visible to see real-time updates.

Use CDCShell1 to configure your connectors and monitor your Kafka topic. Use CDCShell2 to perform write operations in MongoDB.

2

In CDCShell1, configure a source connector to read from the CDCTutorial.Source MongoDB namespace and write to the CDCTutorial.Source Kafka topic.

Create a configuration file called cdc-source.json using the following command:

nano cdc-source.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

Run the following command in CDCShell1 to start the source connector using the configuration file you created:

cx cdc-source.json
Note

The cx command is a custom script included in the tutorial development environment. This script runs the following equivalent request to the Kafka Connect REST API to create a new connector:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

Run the following command in the shell to check the status of the connectors:

status

If your source connector started successfully, you should see the following output:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
3

In CDCShell1, configure a sink connector to copy data from the CDCTutorial.Source Kafka topic to CDCTutorial.Destination MongoDB namespace.

Create a configuration file called cdc-sink.json using the following command:

nano cdc-sink.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

Run the following command in the shell to start the sink connector using the configuration file you created:

cx cdc-sink.json

Run the following command in the shell to check the status of the connectors:

status

If your sink connector started successfully, you should see the following output:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
4

In CDCShell1, monitor the Kafka topic for incoming events. Run the following command to start the kafkacat application which outputs data published to the topic:

kc CDCTutorial.Source
Note

The kc command is a custom script included in the tutorial development environment that calls the kafkacat application with options to connect to Kafka and format the output of the specified topic.

Once started, you should see the following output that indicates there is currently no data to read:

% Reached end of topic CDCTutorial.Source [0] at offset 0
5

In CDCShell2, connect to MongoDB using mongosh, the MongoDB shell by running the following command:

mongosh "mongodb://mongo1"

After you connect successfully, you should see the following MongoDB shell prompt:

rs0 [direct: primary] test>

At the prompt, type the following commands to insert a new document into the CDCTutorial.Source MongoDB namespace:

use CDCTutorial
db.Source.insert({ proclaim: "Hello World!" });

Once MongoDB completes the insert command, you should receive an acknowledgment that resembles the following text:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

The source connector picks up the change and publishes it to the Kafka topic. You should see the following topic message in your CDCShell1 window:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

The sink connector picks up the Kafka message and sinks the data into MongoDB. You can retrieve the document from the CDCTutorial.Destination namespace in MongoDB by running the following command in the MongoDB shell you started in CDCShell2:

db.Destination.find()

You should see the following document returned as the result:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
6

Try removing documents from the CDCTutorial.Source namespace by running the following command from the MongoDB shell:

db.Source.deleteMany({})

You should see the following topic message in your CDCShell1 window:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

Run the following command to retrieve the current number of documents in the collection:

db.Destination.count()

This returns the following output, indicating the collection is empty:

0

Run the following command to exit the MongoDB shell:

exit

In this tutorial, you set up a source connector to capture changes to a MongoDB collection and send them to Apache Kafka. You also configured a sink connector with a MongoDB CDC Handler to move the data from Apache Kafka to a MongoDB collection.

Read the following resources to learn more about concepts mentioned in this tutorial:

←  Getting Started with the MongoDB Kafka Sink ConnectorMigrate an Existing Collection to a Time Series Collection →
Give Feedback
© 2022 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2022 MongoDB, Inc.