Docs Menu

Getting Started with the MongoDB Kafka Source Connector

On this page

  • Get Started with the MongoDB Kafka Source Connector
  • Summary
  • Learn More

Follow this tutorial to learn how to configure a MongoDB Kafka source connector to read data from a change stream and publish it to an Apache Kafka topic.

Note
Before You Get Started

Before you start this tutorial, you must complete the steps in the Kafka Connector Tutorial Setup.

1

Create an interactive shell session on the tutorial Docker container downloaded for the Tutorial Setup using the following command:

docker run --rm --name SourceTutorialShell --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash

Create a source configuration file called simplesource.json with the following command:

nano simplesource.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

Run the following command in the shell to start the source connector using the configuration file you created:

cx simplesource.json
Note

The cx command is a custom script included in the tutorial development environment. This script runs the following equivalent request to the Kafka Connect REST API to create a new connector:

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

Run the following command in the shell to check the status of the connectors:

status

If your source connector started successfully, you should see the following output:

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
2

In the same shell, connect to MongoDB using mongosh, the MongoDB shell by running the following command:

mongosh "mongodb://mongo1"

After you connect successfully, you should see the following MongoDB shell prompt:

rs0 [direct: primary] test>

At the prompt, type the following commands to insert a new document:

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

Once MongoDB completes the insert command, you should receive an acknowledgment that resembles the following text:

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

Exit the MongoDB shell by entering the command exit.

Check the status of your Kafka environment using the following command:

status

In the output of the preceding command, you should see the new topic that the source connector created after receiving the change event:

...
"topic": "Tutorial1.orders",
...

Confirm the content of data on the new Kafka topic by running the following command:

kc Tutorial1.orders
Note

The kc command is a helper script that outputs the content of a Kafka topic.

You should see the following Kafka topic data, organized by "Key" and "Value" sections when you run the preceding command:

Partition: 0 Offset: 0
Key (198 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\"_id\": {\"_data\": \"8262655A...\"}}"}
Value (530 bytes):
{"schema":{"type":"string","optional":false},"payload":"{\"_id\": {\"_data\": \"8262655A...\"}, \"operationType\": \"insert\", \"clusterTime\": {\"$timestamp\": {\"t\": 1650809557, \"i\": 2}}, \"fullDocument\": {\"_id\": {\"$oid\": \"62655a...\"}, \"order_id\": 1, \"item\": \"coffee\"}, \"ns\": {\"db\": \"Tutorial1\", \"coll\": \"orders\"}, \"documentKey\": {\"_id\": {\"$oid\": \"62655a...\"}}}"}

From the "Value" section of the output, you can find the part of the payload that includes the fullDocument data as highlighted in the following formatted JSON document:

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
3

You can omit the metadata from the events created by the change stream by configuring it to only return the fullDocument field.

Stop the connector using the following command:

del mongo-simple-source
Note

The del command is a helper script that calls the Kafka Connect REST API to stop the connector and is equivalent to the following command:

curl -X DELETE connect:8083/connectors/<parameter>

Edit the source configuration file called simplesource.json with the following command:

nano simplesource.json

Remove the existing configuration, add the following configuration, and save the file:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

Run the following command in the shell to start the source connector using the configuration file you updated:

cx simplesource.json

Connect to MongoDB using mongosh using the following command:

mongosh "mongodb://mongo1"

At the prompt, type the following commands to insert a new document:

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

Confirm the content of data on the new Kafka topic by running the following command:

kc Tutorial1.orders

The payload field in the "Value" document should contain only the following document data:

{"_id": {"$oid": "626565..."}, "order_id": 2, "item": "oatmeal"}"}
4

After you complete this tutorial, you can free resources by stopping the tutorial Docker containers and MongoDB data. Navigate to the tutorial directory "mongodb-kafka-base" that you created in the setup step and run the following command:

To restart the containers, follow the same steps required to start them in the Tutorial Setup.

In this tutorial, you started a source connector using different configurations to alter the change stream event data published to a Kafka topic.

Read the following resources to learn more about concepts mentioned in this tutorial:

Note

Send your tutorial feedback or ideas for future MongoDB Kafka Connector tutorials through this feedback form.

←  Explore MongoDB Change StreamsGetting Started with the MongoDB Kafka Sink Connector →
Give Feedback
© 2022 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2022 MongoDB, Inc.