Interested in speaking at MongoDB World 2022? Click here to become a speaker.
HomeLearnHow-toKafka to MongoDB Atlas End to End Tutorial

Kafka to MongoDB Atlas End to End Tutorial

Updated: Jan 25, 2022 |

Published: Jan 25, 2022

  • Atlas
  • MongoDB
  • Java
  • ...

By Pavel Duchovny

Rate this article

Data and event-driven applications are in high demand in a large variety of industries. With this demand, there is a growing challenge with how to sync the data across different data sources.

A widely adopted solution for communicating real-time data transfer across multiple components in organization systems is implemented via clustered queues. One of the popular and proven solutions is Apache Kafka.

The Kafka cluster is designed for streams of data that sequentially write events into commit logs, allowing real-time data movement between your services. Data is grouped into topics inside a Kafka cluster.

MongoDB provides a Kafka connector certified by Confluent, one of the largest Kafka providers. With the Kafka connector and Confluent software, you can publish data from a MongoDB cluster into Kafka topics using a source connector. Additionally, with a sink connector, you can consume data from a Kafka topic to persist directly and consistently into a MongoDB collection inside your MongoDB cluster.

In this article, we will provide a simple step-by-step guide on how to connect a remote Kafka cluster—in this case, a Confluent Cloud service—with a MongoDB Atlas cluster. For simplicity purposes, the installation is minimal and designed for a small development environment. However, through the article, we will provide guidance and links for production-related considerations.

Pre-requisite: To avoid JDK known certificate issues please update your JDK to one of the following patch versions or newer:

  • JDK 11.0.7+
  • JDK 13.0.3+
  • JDK 14.0.2+

#Table of Contents

#
Create a Basic Confluent Cloud Cluster

We will start by creating a basic Kafka cluster in the Confluent Cloud.

Once ready, create a topic to be used in the Kafka cluster. I created one named “orders.”

This “orders” topic will be used by Kafka Sink connector. Any data in this topic will be persisted automatically in the Atlas database.

Kafka Cluster Topics

You will also need another topic called "outsource.kafka.receipts". This topic will be used by the MongoDB Source connector, streaming reciepts from Atlas database.

Generate an api-key and api-secret to interact with this Kafka cluster. For the simplicity of this tutorial, I have selected the “Global Access” api-key. For production, it is recommended to give as minimum permissions as possible for the api-key used. Get a hold of the generated keys for future use.

Obtain the Kafka cluster connection string via Cluster Overview > Cluster Settings > Identification > Bootstrap server for future use. Basic clusters are open to the internet and in production, you will need to amend the access list for your specific hosts to connect to your cluster via advanced cluster ACLs.

#
Create a MongoDB Atlas Project and Cluster

Create a project and cluster or use an existing Atlas cluster in your project.

M0 Atlas Cluster

Prepare your Atlas cluster for a kafka-connect connection. Inside your project’s access list, enable user and relevant IP addresses of your local host, the one used for Kafka Connect binaries. Finally, get a hold of the Atlas connection string for future use.

#
Install a Kafka Connect Worker

Kafka Connect is one of the mechanisms to reliably stream data between different data systems and a Kafka cluster. For production use, we recommend using a distributed deployment for high availability, fault tolerance, and scalability. There is also a cloud version to install the connector on the Confluent Cloud.

For this simple tutorial, we will use a standalone local Kafka Connect installation.

To have the binaries to install kafka-connect and all of its dependencies, let’s download the files:

1curl -O http://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
2tar -xvf confluent-community-7.0.1.tar.gz

#
Configure Kafka Connect

Configure the plugins directory where we will host the MongoDB Kafka Connector plugin:

1mkdir -p /usr/local/share/kafka/plugins

Edit the <confluent-install-dir>/etc/schema-registry/connect-avro-standalone.properties using the content provided below. Ensure that you replace the <kafka-cluster>:<kafka-port> with information taken from Confluent Cloud bootstrap server earlier.

Additionally, replace the generated <api-key> and <api-secret> taken from Confluent Cloud in every section.

1bootstrap.servers=<kafka-cluster>:<kafka-port>
2
3Connect data. Every Connect user will
4# need to configure these based on the format they want their data in when loaded from or stored into Kafka
5key.converter=org.apache.kafka.connect.json.JsonConverter
6value.converter=org.apache.kafka.connect.json.JsonConverter
7# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter you want to apply
8# it to
9key.converter.schemas.enable=false
10value.converter.schemas.enable=false
11
12# The internal converter used for offsets and config data is configurable and must be specified, but most users will
13# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
14internal.key.converter=org.apache.kafka.connect.json.JsonConverter
15internal.value.converter=org.apache.kafka.connect.json.JsonConverter
16internal.key.converter.schemas.enable=false
17internal.value.converter.schemas.enable=false
18
19# Store offsets on local filesystem
20offset.storage.file.filename=/tmp/connect.offsets
21# Flush much faster than normal, which is useful for testing/debugging
22offset.flush.interval.ms=10000
23
24ssl.endpoint.identification.algorithm=https
25
26
27sasl.mechanism=PLAIN
28request.timeout.ms=20000
29retry.backoff.ms=500
30sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
31username="<api-key>" password="<api-secret>";
32security.protocol=SASL_SSL
33
34consumer.ssl.endpoint.identification.algorithm=https
35consumer.sasl.mechanism=PLAIN
36consumer.request.timeout.ms=20000
37consumer.retry.backoff.ms=500
38consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
39username="<api-key>" password="<api-secret>";
40consumer.security.protocol=SASL_SSL
41
42producer.ssl.endpoint.identification.algorithm=https
43producer.sasl.mechanism=PLAIN
44producer.request.timeout.ms=20000
45producer.retry.backoff.ms=500
46producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
47username="<api-key>" password="<api-secret>";
48producer.security.protocol=SASL_SSL
49
50plugin.path=/usr/local/share/kafka/plugins

Important: Place the plugin.path to point to our plugin directory with permissions to the user running the kafka-connect process.

#Install the MongoDB connector JAR:

Download the “all” jar and place it inside the plugin directory.

1cp ~/Downloads/mongo-kafka-connect-1.6.1-all.jar /usr/local/share/kafka/plugins/

#Configure a MongoDB Sink Connector

The MongoDB Sink connector will allow us to read data off a specific Kafka topic and write to a MongoDB collection inside our cluster. Create a MongoDB sink connector properties file in the main working dir: mongo-sink.properties with your Atlas cluster details replacing <username>:<password>@<atlas-cluster>/<database> from your Atlas connect tab. The working directory can be any directory that the connect-standalone binary has access to and its path can be provided to the kafka-connect command shown in "Start Kafka Connect and Connectors" section.

1name=mongo-sink
2topics=orders
3connector.class=com.mongodb.kafka.connect.MongoSinkConnector
4tasks.max=1
5connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority
6database=kafka
7collection=orders
8max.num.retries=1
9retries.defer.timeout=5000

With the above configuration, we will listen to the topic called “orders” and publish the input documents into database kafka and collection name orders.

#Configure Mongo Source Connector

The MongoDB Source connector will allow us to read data off a specific MongoDB collection topic and write to a Kafka topic. When data will arrive into a collection called receipts, we can use a source connector to transfer it to a Kafka predefined topic named “outsource.kafka.receipts” (the configured prefix followed by the <database>.<collection> name as a topic—it's possible to use advanced mapping to change that).

Let’s create file mongo-source.properties in the main working directory:

1name=mongo-source
2connector.class=com.mongodb.kafka.connect.MongoSourceConnector
3tasks.max=1
4
5# Connection and source configuration
6connection.uri=mongodb+srv://<username>:<password>@<atlas-cluster>/<database>?retryWrites=true&w=majority
7database=kafka
8collection=receipts
9
10topic.prefix=outsource
11topic.suffix=
12poll.max.batch.size=1000
13poll.await.time.ms=5000
14
15# Change stream options
16pipeline=[]
17batch.size=0
18change.stream.full.document=updateLookup
19publish.full.document.only=true
20collation=

The main properties here are the database, collection, and aggregation pipeline used to listen for incoming changes as well as the connection string. The topic.prefix adds a prefix to the <database>.<collection> namespace as the Kafka topic on the Confluent side. In this case, the topic name that will receive new MongoDB records is “outsource.kafka.receipts” and was predefined earlier in this tutorial.

I have also added publish.full.document.only=true as I only need the actual document changed or inserted without the change stream event wrapping information.

#Start Kafka Connect and Connectors

For simplicity reasons, I am running the standalone Kafka Connect in the foreground.

1 ./confluent-7.0.1/bin/connect-standalone ./confluent-7.0.1/etc/schema-registry/connect-avro-standalone.properties mongo-sink.properties mongo-source.properties

Important: Run with the latest Java version to avoid JDK SSL bugs.

Now every document that will be populated to topic “orders” will be inserted into the orders collection using a sink connector. A source connector we configured will transmit every receipt document from receipt collection back to another topic called "outsource.kafka.receipts" to showcase a MongoDB consumption to a Kafka topic.

#
Publish Documents to the Kafka Queue

Through the Confluent UI, I have submitted a test document to my “orders” topic. Produce data into "orders" topic

#Atlas Cluster is Being Automatically Populated with the Data

Looking into my Atlas cluster, I can see a new collection named orders in the kafka database.

Orders collection

Now, let's assume that our application received the order document from the orders collection and produced a receipt. We can replicate this by inserting a document in the kafka.reciepts collection:

Recipets collection

This operation will cause the source connector to produce a message into “outsource.kafka.reciepts” topic.

#Kafka "outsource.kafka.reciepts" Topic

Recieved data into "outsource.kafka.reciepts" topic

Log lines on kafka-connect will show that the process received and published the document:

1[2021-12-14 15:31:18,376] INFO [mongo-source|task-0] [Producer clientId=connector-producer-mongo-source-0] Cluster ID: lkc-65rmj (org.apache.kafka.clients.Metadata:287)
2[2021-12-14 15:31:18,675] INFO [mongo-source|task-0] Opened connection [connectionId{localValue:21, serverValue:99712}] to dev-shard-00-02.uvwhr.mongodb.net:27017 (org.mongodb.driver.connection:71)
3[2021-12-14 15:31:18,773] INFO [mongo-source|task-0] Started MongoDB source task (com.mongodb.kafka.connect.source.MongoSourceTask:203)
4[2021-12-14 15:31:18,773] INFO [mongo-source|task-0] WorkerSourceTask{id=mongo-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
5[2021-12-14 15:31:27,671] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505
6[2021-12-14 15:31:37,673] INFO [mongo-source|task-0|offsets] WorkerSourceTask{id=mongo-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:505)

#
Summary

In this how-to article, I have covered the fundamentals of building a simple yet powerful integration of MongoDB Atlas to Kafka clusters using MongoDB Kafka Connector and Kafka Connect.

This should be a good starting point to get you going with your next event-driven application stack and a successful integration between MongoDB and Kafka.

Try out MongoDB Atlas and Kafka connector today!

Rate this article

Related

Migrate from Azure CosmosDB to MongoDB Atlas Using Apache Kafka
MongoDB logo
© 2021 MongoDB, Inc.

About

  • Careers
  • Investor Relations
  • Legal Notices
  • Privacy Notices
  • Security Information
  • Trust Center
© 2021 MongoDB, Inc.