Docs Menu

Getting Started with the MongoDB Kafka Sink Connector

On this page

  • Get Started with the MongoDB Kafka Sink Connector
  • Summary
  • Learn More

Follow this tutorial to learn how to configure a MongoDB Kafka sink connector to read data from an Apache Kafka topic and write it to a MongoDB collection.

Note
Before You Get Started

Before you start this tutorial, you must complete the steps in the Kafka Connector Tutorial Setup.

1

Create an interactive shell session on the tutorial Docker Container using the following command:

docker run --rm --name SinkTutorialShell --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash

Create a source configuration file called simplesink.json with the following command:

nano simplesink.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "Tutorial2.pets",
"connection.uri": "mongodb://mongo1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "Tutorial2",
"collection": "pets"
}
}
Note

The highlighted lines in the configuration properties specify converters which instruct the connector how to translate the data from Kafka.

Run the following command in the shell to start the sink connector using the configuration file you created:

cx simplesink.json
Note

The cx command is a custom script included in the tutorial development environment. This script runs the following equivalent request to the Kafka Connect REST API to create a new connector:

curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"

Run the following command in the shell to check the status of the connectors:

status

If your sink connector started successfully, you should see the following output:

Kafka topics:
...
The status of the connectors:
sink | mongo-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
Currently configured connectors
[
"mongo-sink"
]
...
2

In the same shell, create a Python script to write data to a Kafka topic.

nano kafkawrite.py

Paste the following code into the file and save your changes:

from kafka import KafkaProducer
import json
from json import dumps
p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8'))
data = {'name': 'roscoe'}
p.send('Tutorial2.pets', value = data)
p.flush()

Run the Python script:

python3 kafkawrite.py
3

In the same shell, connect to MongoDB using mongosh, the MongoDB shell by running the following command:

mongosh "mongodb://mongo1"

After you connect successfully, you should see the following MongoDB shell prompt:

rs0 [direct: primary] test>

At the prompt, type the following commands to retrieve all the documents in the Tutorial2.pets MongoDB namespace:

use Tutorial2
db.pets.find()

You should see the following document returned as the result:

{ _id: ObjectId("62659..."), name: 'roscoe' }

Exit the MongoDB shell by entering the command exit.

4

After you complete this tutorial, you can free resources by stopping the tutorial Docker containers and MongoDB data. Navigate to the tutorial directory "mongodb-kafka-base" that you created in the setup step and run the following command:

To restart the containers, follow the same steps required to start them in the Tutorial Setup.

In this tutorial, you configured a sink connector to save data from a Kafka topic to a collection in a MongoDB cluster.

Read the following resources to learn more about concepts mentioned in this tutorial:

Note

Send your tutorial feedback or ideas for future MongoDB Kafka Connector tutorials through this feedback form.

←  Getting Started with the MongoDB Kafka Source ConnectorReplicate Data with a Change Data Capture Handler →
Give Feedback
© 2022 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2022 MongoDB, Inc.