Kafka to MongoDB Atlas End to End Tutorial
Rate this tutorial
Data and event-driven applications are in high demand in a large variety of industries. With this demand, there is a growing challenge with how to sync the data across different data sources.
The Kafka cluster is designed for streams of data that sequentially write events into commit logs, allowing real-time data movement between your services. Data is grouped into topics inside a Kafka cluster.
MongoDB provides a certified by , one of the largest Kafka providers. With the Kafka connector and Confluent software, you can publish data from a MongoDB cluster into Kafka topics using a source connector. Additionally, with a sink connector, you can consume data from a Kafka topic to persist directly and consistently into a MongoDB collection inside your MongoDB cluster.
In this article, we will provide a simple step-by-step guide on how to connect a remote Kafka cluster—in this case, a Confluent Cloud service—with a cluster. For simplicity purposes, the installation is minimal and designed for a small development environment. However, through the article, we will provide guidance and links for production-related considerations.
Once ready, create a topic to be used in the Kafka cluster. I created one named “orders.”
This “orders” topic will be used by Kafka Sink connector. Any data in this topic will be persisted automatically in the Atlas database.
You will also need another topic called "outsource.kafka.receipts". This topic will be used by the MongoDB Source connector, streaming reciepts from Atlas database.
Obtain the Kafka cluster connection string via
Cluster Overview > Cluster Settings > Identification > Bootstrap serverfor future use. Basic clusters are open to the internet and in production, you will need to amend the access list for your specific hosts to connect to your cluster via advanced cluster ACLs.
To have the binaries to install kafka-connect and all of its dependencies, let’s download the files:
Configure the plugins directory where we will host the MongoDB Kafka Connector plugin:
<confluent-install-dir>/etc/schema-registry/connect-avro-standalone.propertiesusing the content provided below. Ensure that you replace the
<kafka-cluster>:<kafka-port>with information taken from Confluent Cloud bootstrap server earlier.
Additionally, replace the generated
<api-secret>taken from Confluent Cloud in every section.
Important: Place the
plugin.pathto point to our plugin directory with permissions to the user running the kafka-connect process.
The will allow us to read data off a specific Kafka topic and write to a MongoDB collection inside our cluster. Create a MongoDB sink connector properties file in the main working dir:
mongo-sink.propertieswith your Atlas cluster details replacing
<username>:<password>@<atlas-cluster>/<database>from your Atlas connect tab. The working directory can be any directory that the
connect-standalonebinary has access to and its path can be provided to the
kafka-connectcommand shown in section.
With the above configuration, we will listen to the topic called “orders” and publish the input documents into database
kafkaand collection name
The will allow us to read data off a specific MongoDB collection topic and write to a Kafka topic. When data will arrive into a collection called
receipts, we can use a source connector to transfer it to a Kafka predefined topic named “outsource.kafka.receipts” (the configured prefix followed by the
<database>.<collection>name as a topic—it's possible to use advanced mapping to change that).
Let’s create file
mongo-source.propertiesin the main working directory:
The main properties here are the database, collection, and aggregation pipeline used to listen for incoming changes as well as the connection string. The
topic.prefixadds a prefix to the
<database>.<collection>namespace as the Kafka topic on the Confluent side. In this case, the topic name that will receive new MongoDB records is “outsource.kafka.receipts” and was predefined earlier in this tutorial.
I have also added
publish.full.document.only=trueas I only need the actual document changed or inserted without the change stream event wrapping information.
For simplicity reasons, I am running the standalone Kafka Connect in the foreground.
Important: Run with the latest Java version to avoid JDK SSL bugs.
Now every document that will be populated to topic “orders” will be inserted into the
orderscollection using a sink connector. A source connector we configured will transmit every receipt document from
receiptcollection back to another topic called "outsource.kafka.receipts" to showcase a MongoDB consumption to a Kafka topic.
Through the Confluent UI, I have submitted a test document to my “orders” topic.
Looking into my Atlas cluster, I can see a new collection named
Now, let's assume that our application received the order document from the
orderscollection and produced a receipt. We can replicate this by inserting a document in the
This operation will cause the source connector to produce a message into “outsource.kafka.reciepts” topic.
Log lines on kafka-connect will show that the process received and published the document:
In this how-to article, I have covered the fundamentals of building a simple yet powerful integration of MongoDB Atlas to Kafka clusters using MongoDB Kafka Connector and Kafka Connect.
This should be a good starting point to get you going with your next event-driven application stack and a successful integration between MongoDB and Kafka.