Replicate Data with a Change Data Capture Handler
On this page
Overview
Learn how to use a change data capture (CDC) handler to replicate data with the MongoDB Kafka Connector. A CDC handler is a program that translates CDC events into MongoDB write operations. Use a CDC handler when you need to reproduce the changes in one datastore into another datastore.
In this tutorial, you use a CDC handler to make two MongoDB collections contain the same documents.
If you want to learn more about how CDC handlers work rather than view a tutorial that demonstrates how you use them, see the Change Data Capture Handlers guide.
Requirements
Download and install the following packages:
This guide uses the following Docker-specific terminology:
Learn more about Docker from the Docker official Get Started Guide.
The sandbox uses Docker for convenience and consistency. To learn more about deployment options for Apache Kafka, see the following resources:
Tutorial
Download Source Files
We provide a sandbox that includes the services you need to build your sample data pipeline. To download the sandbox, clone the tutorial repository to your development environment. Then navigate to the directory that corresponds to the Replicate Data with a Change Data Capture Handler tutorial. If you use bash or a similar shell, use the following commands:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/examples/v1.7/cdc-tutorial
Set Up the Environment
The sample pipeline consists of the following tools running in Docker containers on your computer:
- A MongoDB replica set
- An Apache Kafka instance
- A Kafka Connect instance with the MongoDB Kafka Connector installed
- A Zookeeper instance (Zookeeper is a dependency of Apache Kafka)
The pipeline comes with a source connector already installed. The source
connector writes change event documents corresponding to the Source
collection in the CDCTutorial
database to a Kafka topic. The configuration
for the source connector is as follows:
name=mongo-source-CDCTutorial-eventroundtrip connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017 database=CDCTutorial collection=Source
To download and start the pipeline, execute the following command from within the root directory of your cloned repository:
docker-compose -p cdc-tutorial up -d
In total, the Docker images for this tutorial require about 2.4 GB of space. The following list shows how long it takes to download the images with different internet speeds:
- 40 megabits per second: 8 minutes
- 20 megabits per second: 16 minutes
- 10 megabits per second: 32 minutes
Once the preceding command finishes and the pipeline starts, you should see output that looks like this:
... Creating mongo1 ... done Creating mongo1 ... done Creating zookeeper ... done Creating broker ... done Creating mongo1-setup ... done Creating connect ... done Creating shell ... done
Open a second terminal window. You will use one terminal to monitor your topic, and the other terminal to perform write operations on your database. Enter the following command into both terminals:
docker exec -it shell /bin/bash
Once you have entered the preceding command into both terminal windows, your terminals should look like:

Arrange your two terminal windows to match the preceding image so that both are visible and one is above the other.
To monitor your topic, type the following command in your upper terminal window:
kafkacat -b broker:29092 -C -t CDCTutorial.Source
If you receive the following output, run the preceding kafkacat
command
a second time:
% Error: Topic CDCTutorial.Source error: Broker: Leader not available
Once you enter the preceding command, you should see output that looks like this:
% Reached end of topic CDCTutorial.Source [0] at offset 0
Your upper terminal window is now listening to the CDCTutorial.Source
Kafka
topic. Changes to your topic will print in this terminal window.
To learn more about kafkacat
, see the kcat repository on GitHub.
Configure Sink Connector
To configure your sink connector, execute the following command in your lower terminal window:
curl -X POST -H "Content-Type: application/json" --data ' { "name": "mongo-sink-CDCTutorial-eventroundtrip", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "topics":"CDCTutorial.Source", "change.data.capture.handler":"com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017", "database":"CDCTutorial", "collection":"Destination"} }' http://connect:8083/connectors -w "\n" | jq .
After you run the preceding command, you should see the following output:
... { "name": "mongo-sink-CDCTutorial-eventroundtrip", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max": "1", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017", "database": "CDCTutorial", "collection": "Destination", "name": "mongo-sink-CDCTutorial-eventroundtrip" }, "tasks": [], "type": "sink" }
This configuration makes your sink connector do the following things:
- Listen for events on the
CDCTutorial.Source
topic - Apply a change data capture handler to documents it receives from the
Tutorial.Source
topic - Write received documents to the
Destination
collection in theCDCTutorial
database
Change Data in MongoDB
From your lower terminal, enter the MongoDB Shell with the following command:
mongosh mongodb://mongo1:27017/?replicaSet=rs0
Once you are in the MongoDB Shell, your terminal prompt should look like this:
rs0 [primary] test>
Insert a document into the Source
collection of the CDCTutorial
database
with the following commands:
use CDCTutorial db.Source.insert({proclaim: "Hello World!"});
Once you insert the document, you should see output that resembles the following in your upper shell:
{"schema":{"type":"string","optional":false}, "payload":{"_id": {"_data": "8260...4"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1611348141, "i": 2}}, "fullDocument": {"_id": {"$oid": "600b38ad..."}, "proclaim": "Hello World!"}, "ns": {"db": "CDCTutorial", "coll": "Source"}, "documentKey": {"_id": {"$oid": "600b38a...."}}}}
In your lower shell, inspect the Destination
collection with the following
command:
db.Destination.find()
You should see output that looks like this:
{ _id: ..., proclaim: 'Hello World!' }
Try deleting your document from your Source
collection with the following
command:
db.Source.deleteMany({})
Once you delete the document, you should see output that resembles the following in your upper shell:
{"schema":{"type":"string","optional":false},"payload":"{\"_id\": {\"_data\": \"826138BCBA000000012B022C0100296E5A10041FD232D9ECE347FFABA837E9AB05D95046645F696400646138BCAF2A52D9E0D299336F0004\"}, \"operationType\": \"delete\", \"clusterTime\": {\"$timestamp\": {\"t\": 1631108282, \"i\": 1}}, \"ns\": {\"db\": \"CDCTutorial\", \"coll\": \"Source\"}, \"documentKey\": {\"_id\": {\"$oid\": \"6138bcaf2a52d9e0d299336f\"}}}"}
Now see how many documents are in your Destination
collection:
db.Destination.count()
You should see the following output:
0
Once you have finished exploring the connector in the MongoDB shell, you can exit the MongoDB shell with the following command:
exit
Explore the sample pipeline on your own. Here are some challenges to get you started:
- Add a new source connector that writes to the
CDCTutorial.Source
topic. Configure your new connector to write insert events. To learn how to filter event types in your connector, see the Customize a Pipeline to Filter Change Events guide. - Add a new source connector configured with the
publish.full.document.only=true
option that writes to theCDCTutorial.Source
topic. Publish a document with your new source connector. This produces an error in your sink connector and your sink connector stops. Configure your sink connector to write errant messages to a topic rather than stop. To learn how to write errant messages to a topic, see Write Errors and Errant Messages to a Topic. - Remove the
change.data.capture.handler
from your sink connector. Add the source connector from the tutorial to Kafka Connect if its not already added. Insert a document into MongoDB as done in the tutorial. Look at the change event document your sink connector inserts into MongoDB.
Stop the Pipeline
To conserve resources on your computer, make sure to stop the sample pipeline once you are done exploring this example.
Before you stop the sample pipeline, make sure to exit your Docker shell. You can exit your Docker shell by running the following command in your lower terminal:
exit
To stop the sample pipeline and remove containers and images, run the following command:
docker-compose -p cdc-tutorial down --rmi 'all'
Further Reading
To learn more about the topics discussed in this tutorial, see the following MongoDB Kafka Connector guides: