Getting started with the MongoDB Connector for Apache Kafka and MongoDB Atlas

Robert Walters

#Kafka

Apache Kafka is an event streaming solution designed for boundless streams of data that sequentially write events into commit logs, allowing real-time data movement between your services. MongoDB is the world’s most popular modern database built for handling massive volumes of heterogeneous data. Together MongoDB and Kafka make up the heart of many modern data architectures today.

Integrating Kafka with external systems like MongoDB is done through the use of Kafka Connect. This API enables users to leverage ready-to-use components that can stream data from external systems into Kafka topics, and stream data from Kafka topics into external systems. The official MongoDB Connector for Apache Kafka® is developed and supported by MongoDB engineers and verified by Confluent. The connector, now released in Beta, enables MongoDB to be configured as both a sink and a source for Apache Kafka.

MongoDB and Kafka working together
Figure 1: MongoDB and Kafka working together

Getting Started

In the following sections we will walk you through installing and configuring the MongoDB Connector for Apache Kafka followed by two scenarios. First we will show MongoDB used as a source to Kafka with data flowing from a MongoDB collection to a Kafka topic. Next we will show MongoDB used as sink where data flows from the Kafka topic to MongoDB. To get started, you will need access to a Kafka deployment as well as a MongoDB database. The easiest and fastest way to spin up a MongoDB database is to use the managed service, MongoDB Atlas. No more fumbling around with docker containers, config files, and replica sets just pick a cloud provider, a cluster size and get a connection string! It is that easy.

Viewing a free MongoDB Atlas Cluster
Figure 2: MongoDB Atlas free cluster

If you do not have an existing MongoDB Atlas database, you can easily provision one by visiting https://www.mongodb.com/cloud and clicking “Get started free.” For a detailed walkthrough of creating a MongoDB Atlas cluster see Getting started with MongoDB Atlas.

Deploying Apache Kafka

If you do not already have an Apache Kafka deployment you can either download it from Confluent or from the Apache Kafka Downloads page. Confluent is the company behind Apache Kafka and their download includes the same Kafka deployment found on the Apache website, but with additional tooling that is beneficial to enterprise and production deployments of Kafka. The MongoDB Connector for Apache Kafka can be used with any of these Kafka deployments. Follow the download instructions for your target platform using the instructions provided on either page.

Install the MongoDB Connector for Apache Kafka

At this point you should have access to a MongoDB database and Kafka deployment and are now we are ready to install the MongoDB Connector for Apache Kafka from the Confluent Hub website. There are two options to install the connector: automated through the Confluent Hub and manually by downloading the .zip file.

Let’s take a look at both of these options.

Option 1: Automated install through the Confluent Hub

Navigate to the Confluent Hub - MongoDB Connector website. At the time of this post the most recent version of the connector is 0.1. Check the URL for the latest version.

To install using confluent hub, issue the following command:

confluent-hub install mongodb/kafka-connect-mongodb:latest

This will install the connector files to /usr/share/confluent-hub-components.

Option 2: Manual install

To manually install the connector, perform the following steps:

  1. Download the MongoDB Connector for Apache Kafka .zip file from the Confluent Hub website.
  2. Extract the ZIP file contents and copy them to the desired location. For example, you can create a directory named <path-to-confluent>/share/kafka/plugins then copy the connector plugin contents.
  3. Add this to the plugin path in your Connect properties file. For example, plugin.path=/usr/local/share/kafka/plugins. Kafka Connect finds the plugins using its plugin path. A plugin path is a comma-separated list of directories defined in the Kafka Connect's worker configuration.
  4. Start the Connect workers with that configuration. Connect will discover all connectors defined within those plugins.
  5. Repeat these steps for each machine where Connect is running. Each connector must be available on each worker.

You will need your connection string to MongoDB in order to configure the connector. To obtain the connection string if using MongoDB Atlas, click on the “Connect” button for your cluster. This will show a page with connection strings that are premade for any driver combination. For Kafka Connector select Java and Version 3.4 or later. Copy the Connection String Only and use that for the connection.url in the MongoSinkConnector.properties file and be sure to replace the template with your actual password for this account. Remember that if your password includes any symbols such as @ use percent encoding as recommended in the MongoDB manual.

Connect page in MongoDB Atlas
Figure 3: Connect page in MongoDB Atlas

Using MongoDB as a source to a Kafka topic

Consider the use case of an ecommerce website where the inventory data is stored in MongoDB. When the inventory of any product goes below a certain threshold the company would like more product to be automatically ordered. This ordering is done by other systems outside of MongoDB and using Kafka as the messaging system to notify other systems is a great example of the power of MongoB and Kafka when used together.

Let’s set up the connector to monitor the quantity field and raise a change stream event when the quantity is less than or equal to 5. Under the covers the connector is using MongoDB Change Streams and the pipeline parameter defines the filter used to generate the event notifications.

curl -X PUT http://localhost:8083/connectors/source-mongodb-inventory/config -H "Content-Type: application/json" -d '{
      "tasks.max":1,
      "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connection.uri":<<YOUR MONGODB CONNECTION STRING GOES HERE>>,
      "database":"BigBoxStore",
      "collection":"inventory",
      "pipeline":"[{\"$match\": { \"$and\": [ { \"updateDescription.updatedFields.quantity\" : { \"$lte\": 5 } }, {\"operationType\": \"update\"}]}}]", 
      "topic.prefix": ""
}'

In the example we provided a pipeline as a parameter. This defines the criteria for documents that are to be consumed by the connector. Since the pipeline contains quotations we need to escape these in the above example so they work with our curl statement. For clarity, the pipeline value is as follows:

[
  {
    "$match": {
      "$and": [
        {
          "updatedDescription.updatedFields.quantity": {
            "$lte": 5
          }
        },
        {
          "operationType": "update"
        }
      ]
    }
  }
]

For a complete list of the connectors config options check out the documentation at https://github.com/mongodb/mongo-kafka/tree/master/docs.

To test out our scenario we will use the open source tool Kafkacat available at this URL: https://github.com/edenhill/kafkacat. The parameters below tell the tool to connect to the BigBoxStore.inventory topic as a Kafka consumer.

kafkacat -b localhost:9092 -t BigBoxStore.inventory -C

Next, we want to connect to the MongoDB cluster and update the inventory of an item in the inventory collection.

 db.inventory.insert ( { "SKU" : 1, "item_name":"Tickle Me Elmo", "quantity" : 10 } 

Now imagine the holiday season has come, Tickle Me Elmo has made a comeback and it is flying off the shelves. The backend inventory updates the quantity as follows:

db.inventory.updateOne({"SKU":1},{ $set: { "quantity" : 2} } )

Now look at the kafkacat output and you will see the change stream event made it into the Kafka topic:

{
  "_id": {
    "_data": "825D1640BF000000012B022C0100296E5A1004E407DAB9B92B498CBFF2B621AAD032C046645F696400645D163AA63827D21F38DA958E0004"
  },
  "operationType": "update",
  "clusterTime": {
    "$timestamp": {
      "t": 1561739455,
      "i": 1
    }
  },
  "ns": {
    "db": "BigBoxStore",
    "coll": "inventory"
  },
  "documentKey": {
    "_id": {
      "$oid": "5d163aa63827d21f38da958e"
    }
  },
  "updateDescription": {
    "updatedFields": {
      "quantity": 2
    },
    "removedFields": []
  }
}

With this message in the Kafka Topic, other systems can be notified and process the ordering of more inventory to satisfy the shopping demand for Elmo.

Using MongoDB as a sink from a Kafka Topic

Continue the ecommerce scenario, suppose when a new user was created on the website their contact information is needed by multiple business systems. This contact information is placed in a Kafka topic, “newuser” for shared use and we would configure MongoDB as a sink to the Kafka Topic. This would allow new users information to propagate to a “users” collection in MongoDB. To configure the connector for this scenario we can issue a REST API call to the Connector service as follows:

curl -X PUT http://localhost:8083/connectors/sink-mongodb-users/config -H "Content-Type: application/json" -d ' {
      "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics":"newuser",
      "connection.uri":"<<YOUR MONGODB CONNECTION STRING GOES HERE>>",
      "database":"BigBoxStore",
      "collection":"users",
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":false
}' 

To test our scenario let’s use kafkacat to push a message that simulates the inventory system saying there is more inventory.

kafkacat -b localhost:9092 -t newuser -P <<EOF
{ "name": "Rob Walters", "Twitter" : "@RobsCranium" } 
EOF

To confirm the message made it all the way through to your MongoDB database, make a connection to MongoDB using your client tool of choice and issue a db.users.find() command. If using MongoDB Atlas, you can click on the Collections tab to show the databases and collections that are in your cluster.

Viewing a record in Atlas
Figure 4: Collections tab in MongoDB Atlas

Summary

MongoDB and Apache Kafka together make up the heart of many modern data architectures today. The MongoDB Connector for Apache Kafka is the official Kafka connector. The sink connector was originally written by H.P. Grahsl and the source connector originally developed by MongoDB. These efforts were combined into a single connector and that is now maintained by MongoDB Inc. This single connector allows MongoDB to be used as both a sink and a source for Apache Kafka, opening the door to many scenarios ranging from event-driven architectures to microservices patterns. For more information on the connector, check out the MongoDB Connector for Apache Kafka page.