Note
Atlas Stream Processing provides MongoDB-native tooling to continuously process streaming data, validate schemas, and materialize views into either Atlas database collections or Apache Kafka topics.
To learn more about Atlas Stream Processing, see Atlas Stream Processing.
Overview
This guide shows you how to configure the MongoDB Kafka Connector to send data between MongoDB and Apache Kafka.
After completing this guide, you should understand how to use the Kafka Connect REST API to configure MongoDB Kafka Connectors to read data from MongoDB and write it to a Kafka topic, and to read data from a Kafka topic and write it to MongoDB.
To complete the steps in this guide, you must download and work in a sandbox, a containerized development environment that includes services required to build a sample data pipeline.
Read the following sections to set up your sandbox and sample data pipeline.
Note
After you complete this guide, you can remove the environment by following the instructions in the Remove the Sandbox section.
Install the Required Packages
Download and install the following packages:
Tip
Read the Docker Documentation
This guide uses the following Docker-specific terminology:
Learn more about Docker from the Docker official Get Started Guide.
The sandbox uses Docker for convenience and consistency. To learn more about deployment options for Apache Kafka, see the following resources:
Download the Sandbox
We created a sandbox that includes the services you need in this tutorial to build your sample data pipeline.
To download the sandbox, clone the tutorial repository to your development environment. Then navigate to the directory that corresponds to the quickstart tutorial. If you use bash or a similar shell, use the following commands:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/ 
Start the Sandbox
The sandbox starts the following services in Docker containers:
- MongoDB, configured as a replica set 
- Apache Kafka 
- Kafka Connect with the MongoDB Kafka Connector installed 
- Apache Zookeeper which manages the configuration for Apache Kafka 
To start the sandbox run the following command from the tutorial directory:
docker compose -p mongo-kafka up -d --force-recreate 
When you start the sandbox, Docker downloads any images it needs to run.
Note
How long does the download take?
In total, the Docker images for this tutorial require about 2.4 GB of space. The following list shows how long it takes to download the images with different internet speeds:
- 40 megabits per second: 8 minutes 
- 20 megabits per second: 16 minutes 
- 10 megabits per second: 32 minutes 
After Docker downloads and builds the images, you should see the following output in your development environment:
... Creating zookeeper ... done Creating broker    ... done Creating schema-registry ... done Creating connect         ... done Creating rest-proxy      ... done Creating mongo1          ... done Creating mongo1-setup    ... done 
Note
Port Mappings
The sandbox maps the following services to ports on your host machine:
- The sandbox MongoDB server maps to port - 35001on your host machine
- The sandbox Kafka Connect JMX server maps to port - 35000on your host machine
These ports must be free to start the sandbox.
Add Connectors
To complete the sample data pipeline, you must add connectors to Kafka Connect to transfer data between Kafka Connect and MongoDB. Add a source connector to transfer data from MongoDB to Apache Kafka. Add a sink connector to transfer data from Apache Kafka to MongoDB.
To add connectors in the sandbox, first start an interactive bash shell in your Docker container using the following command:
docker exec -it mongo1 /bin/bash 
After your shell session starts, you should see the following prompt:
MongoDB Kafka Connector Sandbox $ 
Add a Source Connector
Use the shell in your Docker container to add a source connector using the Kafka Connect REST API.
The following API request adds a source connector configured with the following properties:
- The class Kafka Connect uses to instantiate the connector 
- The connection URI, database, and collection of the MongoDB replica set from which the connector reads data 
- An aggregation pipeline that adds a field - travelwith the value- "MongoDB Kafka Connector"to inserted documents the connector reads from MongoDB
curl -X POST \      -H "Content-Type: application/json" \      --data '      {"name": "mongo-source",       "config": {          "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",          "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",          "database":"quickstart",          "collection":"sampleData",          "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"          }      }      ' \      http://connect:8083/connectors -w "\n" 
Note
Why do I see the message 'Failed to connect'?
It takes up to three minutes for the Kafka Connect REST API to start. If you receive the following error, wait three minutes and run the preceding command again:
... curl: (7) Failed to connect to connect port 8083: Connection refused 
To confirm that you added the source connector, run the following command:
curl -X GET http://connect:8083/connectors 
The preceding command should output the names of the running connectors:
["mongo-source"] 
To learn more about source connector properties, see the page on Source Connector Configuration Properties.
To learn more about aggregation pipelines, see the MongoDB manual page on Aggregation Pipelines.
Add a Sink Connector
Use the shell in your Docker container to add a sink connector using the Kafka Connect REST API.
The following API request adds a sink connector configured with the following properties:
- The class Kafka Connect uses to instantiate the connector 
- The connection URI, database, and collection of the MongoDB replica set to which the connector writes data 
- The Apache Kafka topic from which the connector reads data 
- A change data capture handler for MongoDB change event documents 
curl -X POST \      -H "Content-Type: application/json" \      --data '      {"name": "mongo-sink",       "config": {          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",          "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",          "database":"quickstart",          "collection":"topicData",          "topics":"quickstart.sampleData",          "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"          }      }      ' \      http://connect:8083/connectors -w "\n" 
To confirm that you added both source and sink connector, run the following command:
curl -X GET http://connect:8083/connectors 
The preceding command should output the names of the running connectors:
["mongo-source", "mongo-sink"] 
To learn more about sink connector properties, see the page on Sink Connector Configuration Properties.
To learn more about change data capture events, see the Change Data Capture Handlers guide.
Send the Contents of a Document through Your Connectors
To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data.
To insert a new document into your collection, enter the MongoDB shell from the shell in your Docker container using the following command:
mongosh mongodb://mongo1:27017/?replicaSet=rs0 
After you run the preceding command, you should see the following prompt:
rs0 [primary] test> 
From the MongoDB shell, insert a document into the sampleData
collection of the quickstart database using the following commands:
use quickstart db.sampleData.insertOne({"hello":"world"}) 
After you insert a document into the sampleData collection, confirm that
your connectors processed the change. Check the contents of the topicData
collection using the following command:
db.topicData.find() 
You should see output that resembles the following:
[     {       _id: ObjectId(...),       hello: 'world',       travel: 'MongoDB Kafka Connector'     } ] 
Exit the MongoDB shell with the following command:
exit 
Remove the Sandbox
To conserve resources in your development environment, remove the sandbox.
Before you remove the sandbox, exit from the shell session in your Docker container by running the following command:
exit 
You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your sandbox which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.
Select the tab that corresponds to the removal task you want to run.
Run the following shell command to remove the Docker containers and images from the sandbox:
docker-compose -p mongo-kafka down --rmi all 
Run the following shell command to remove the Docker containers but keep the images for the sandbox:
docker-compose -p mongo-kafka down 
Next Steps
To learn how to install the MongoDB Kafka Connector, see the Install the MongoDB Kafka Connector guide.
To learn more about how to process and move data from Apache Kafka to MongoDB, see the Sink Connector guide.
To learn more about how to process and move data from MongoDB to Apache Kafka, see the Source Connector guide.