Docs Menu
Docs Home
/
MongoDB Kafka Connector

Kafka Connector Quick Start

On this page

  • Overview
  • Install the Required Packages
  • Download the Sandbox
  • Start the Sandbox
  • Add Connectors
  • Add a Source Connector
  • Add a Sink Connector
  • Send the Contents of a Document through Your Connectors
  • Remove the Sandbox
  • Next Steps

This guide shows you how to configure the MongoDB Kafka Connector to send data between MongoDB and Apache Kafka.

After completing this guide, you should understand how to use the Kafka Connect REST API to configure MongoDB Kafka Connectors to read data from MongoDB and write it to a Kafka topic, and to read data from a Kafka topic and write it to MongoDB.

To complete the steps in this guide, you must download and work in a sandbox, a containerized development environment that includes services required to build a sample data pipeline.

Read the following sections to set up your sandbox and sample data pipeline.

Note

After you complete this guide, you can remove the environment by following the instructions in the Remove the Sandbox section.

Download and install the following packages:

Tip

Read the Docker Documentation

This guide uses the following Docker-specific terminology:

Learn more about Docker from the Docker official Get Started Guide.

The sandbox uses Docker for convenience and consistency. To learn more about deployment options for Apache Kafka, see the following resources:

We created a sandbox that includes the services you need in this tutorial to build your sample data pipeline.

To download the sandbox, clone the tutorial repository to your development environment. Then navigate to the directory that corresponds to the quickstart tutorial. If you use bash or a similar shell, use the following commands:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

The sandbox starts the following services in Docker containers:

  • MongoDB, configured as a replica set

  • Apache Kafka

  • Kafka Connect with the MongoDB Kafka Connector installed

  • Apache Zookeeper which manages the configuration for Apache Kafka

To start the sandbox run the following command from the tutorial directory:

docker-compose -p mongo-kafka up -d --force-recreate

When you start the sandbox, Docker downloads any images it needs to run.

Note

How long does the download take?

In total, the Docker images for this tutorial require about 2.4 GB of space. The following list shows how long it takes to download the images with different internet speeds:

  • 40 megabits per second: 8 minutes

  • 20 megabits per second: 16 minutes

  • 10 megabits per second: 32 minutes

After Docker downloads and builds the images, you should see the following output in your development environment:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

Note

Port Mappings

The sandbox maps the following services to ports on your host machine:

  • The sandbox MongoDB server maps to port 35001 on your host machine

  • The sandbox Kafka Connect JMX server maps to port 35000 on your host machine

These ports must be free to start the sandbox.

To complete the sample data pipeline, you must add connectors to Kafka Connect to transfer data between Kafka Connect and MongoDB. Add a source connector to transfer data from MongoDB to Apache Kafka. Add a sink connector to transfer data from Apache Kafka to MongoDB.

To add connectors in the sandbox, first start an interactive bash shell in your Docker container using the following command:

docker exec -it mongo1 /bin/bash

After your shell session starts, you should see the following prompt:

MongoDB Kafka Connector Sandbox $

Use the shell in your Docker container to add a source connector using the Kafka Connect REST API.

The following API request adds a source connector configured with the following properties:

  • The class Kafka Connect uses to instantiate the connector

  • The connection URI, database, and collection of the MongoDB replica set from which the connector reads data

  • An aggregation pipeline that adds a field travel with the value "MongoDB Kafka Connector" to inserted documents the connector reads from MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

Note

Why do I see the message 'Failed to connect'?

It takes up to three minutes for the Kafka Connect REST API to start. If you receive the following error, wait three minutes and run the preceding command again:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

To confirm that you added the source connector, run the following command:

curl -X GET http://connect:8083/connectors

The preceding command should output the names of the running connectors:

["mongo-source"]

To learn more about source connector properties, see the page on Source Connector Configuration Properties.

To learn more about aggregation pipelines, see the MongoDB manual page on Aggregation Pipelines.

Use the shell in your Docker container to add a sink connector using the Kafka Connect REST API.

The following API request adds a sink connector configured with the following properties:

  • The class Kafka Connect uses to instantiate the connector

  • The connection URI, database, and collection of the MongoDB replica set to which the connector writes data

  • The Apache Kafka topic from which the connector reads data

  • A change data capture handler for MongoDB change event documents

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

To confirm that you added both source and sink connector, run the following command:

curl -X GET http://connect:8083/connectors

The preceding command should output the names of the running connectors:

["mongo-source", "mongo-sink"]

To learn more about sink connector properties, see the page on Sink Connector Configuration Properties.

To learn more about change data capture events, see the Change Data Capture Handlers guide.

To send the contents of a document through your connectors, insert a document into the MongoDB collection from which your source connector reads data.

To insert a new document into your collection, enter the MongoDB shell from the shell in your Docker container using the following command:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

After you run the preceding command, you should see the following prompt:

rs0 [primary] test>

From the MongoDB shell, insert a document into the sampleData collection of the quickstart database using the following commands:

use quickstart
db.sampleData.insertOne({"hello":"world"})

After you insert a document into the sampleData collection, confirm that your connectors processed the change. Check the contents of the topicData collection using the following command:

db.topicData.find()

You should see output that resembles the following:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

Exit the MongoDB shell with the following command:

exit

To conserve resources in your development environment, remove the sandbox.

Before you remove the sandbox, exit from the shell session in your Docker container by running the following command:

exit

You can choose to remove both the Docker containers and images, or exclusively the containers. If you remove the containers and images, you must download them again to restart your sandbox which is approximately 2.4 GB in size. If you exclusively remove the containers, you can reuse the images and avoid downloading most of the large files in the sample data pipeline.

Select the tab that corresponds to the removal task you want to run.

Run the following shell command to remove the Docker containers and images from the sandbox:

docker-compose -p mongo-kafka down --rmi all

Run the following shell command to remove the Docker containers but keep the images for the sandbox:

docker-compose -p mongo-kafka down

To learn how to install the MongoDB Kafka Connector, see the Install the MongoDB Kafka Connector guide.

To learn more about how to process and move data from Apache Kafka to MongoDB, see the Sink Connector guide.

To learn more about how to process and move data from MongoDB to Apache Kafka, see the Source Connector guide.

Back

What's New