MongoDB Developer Center
Developer Topics

Go to MongoDB Using Kafka Connectors - Ultimate Agent Guide

Pavel DuchovnyPublished Feb 08, 2022 • Updated May 09, 2022
facebook icontwitter iconlinkedin icon
random alt
Rate this tutorial
Go is a modern language built on typed and native code compiling concepts while feeling and utilizing some benefits of dynamic languages. It is fairly simple to install and use, as it provides readable and robust code for many application use cases.
One of those use cases is building agents that report to a centralized data platform via streaming. A widely accepted approach is to communicate the agent data through subscription of distributed queues like Kafka. The Kafka topics can then propagate the data to many different sources, such as a
MongoDB Atlas
Having a Go agent allows us to utilize the same code base for various operating systems, and the fact that it has good
integration with JSON data
and packages such as a
MongoDB driver
Confluent Go Kafka Client
makes it a compelling candidate for the presented use case.
This article will demo how file size data on a host is monitored from a cross-platform agent written in Golang via a Kafka cluster using a Confluent hosted sink connector to MongoDB Atlas. MongoDB Atlas stores the data in a
time series collection
. The
MongoDB Charts
product is a convenient way to show the gathered data to the user. Architecture Overview

Preparing the Golang project, Kafka cluster, and MongoDB Atlas

Configuring a Go project
Our agent is going to run Go. Therefore, you will need to install the Go language software on your host.
Once this step is done, we will create a Go module to begin our project in our working directory:
Now we will need to add the Confluent Kafka dependency to our Golang project:
Configuring a Kafka cluster
Creating a Confluent Kafka Cluster is done via the Confluent UI. Start by
a basic Kafka cluster in the
Confluent Cloud
. Once ready, create a topic to be used in the Kafka cluster. I created one named “files.”
an api-key and api-secret to interact with this Kafka cluster. For the simplicity of this tutorial, I have selected the “Global Access” api-key. For production, it is recommended to give as minimum permissions as possible for the api-key used. Get a hold of the generated keys for future use.
Obtain the Kafka cluster connection string via Cluster Overview > Cluster Settings > Identification > Bootstrap server for future use. Basic clusters are open to the internet and in production, you will need to amend the access list for your specific hosts to connect to your cluster via advanced cluster ACLs.
Important: The Confluent connector requires that the Kafka cluster and the Atlas cluster are deployed in the same region.
Configuring Atlas project and cluster
Create a project and cluster
or use an existing Atlas cluster in your project. Atlas Cluster Since we are using a time series collection, the clusters must use a 5.0+ version. Prepare your Atlas cluster for a Confluent sink Atlas
. Inside your project’s access list, enable user and relevant IP addresses of your connector IPs. The access list IPs should be associated to the Atlas Sink Connector, which we will configure in a following section. Finally, get a hold of the Atlas
connection string
and the main cluster DNS. For more information about best securing and getting the relevant IPs from your Confluent connector, please read the following article:
MongoDB Atlas Sink Connector for Confluent Cloud

Adding agent main logic

Now that we have our Kafka cluster and Atlas clusters created and prepared, we can initialize our agent code by building a small main file that will monitor my ./files directory and capture the file names and sizes. I’ve added a file called test.txt with some data in it to bring it to ~200MB.
Let’s create a file named main.go and write a small logic that performs a constant loop with a 1 min sleep to walk through the files in the files folder:
The above code simply imports helper modules to traverse the directories and for JSON documents out of the files found.
Since we need the data to be marked with the time of the sample, it is a great fit for time series data and therefore should eventually be stored in a time series collection on Atlas. If you want to learn more about time series collection and data, please read our article,
MongoDB Time Series Data
We can test this agent by running the following command:
The agent will produce JSON documents similar to the following format:

Creating a Confluent MongoDB connector for Kafka

Now we are going to create a Kafka Sink connector to write the data coming into the “files” topic to our Atlas Cluster’s time series collection.
Confluent Cloud has a very popular integration running MongoDB’s Kafka connector as a hosted solution integrated with their Kafka clusters. Follow these
to initiate a connector deployment.
The following are the inputs provided to the connector: Connector Setup 1 Connector Setup 2 Connector Setup 3 Once you set it up, following the guide, you will eventually have a similar launch summary page: Connector Final Summary After provisioning every populated document into the files queue will be pushed to a time series collection hostMonitor.files where the date field is Time and metadata field is Name.

Pushing data to Kafka

Now let’s edit the main.go file to use a Kafka client and push each file measurement into the “files” queue.
Add the client library as an imported module:
Add the Confluent cloud credentials and cluster DNS information. Replace <CONFLUENT-SERVER>:<CONFLUENT-PORT> found on the Kafka Cluster details page and the <ACCESS-KEY> , <SECRET-KEY> generated in the Kafka Cluster:
The following code will initiate the producer and produce a message out of the marshaled JSON document:
The entire main.go file will look as follows:
Now when we run the agent while the Confluent Atlas sink connector is fully provisioned, we will see messages produced into the hostMonitor.files time series collection: Atlas Data

Analyzing the data using MongoDB Charts

To put our data into use, we can create some beautiful charts on top of the time series data. In a
line graph
, we configure the X axis to use the Time field, the Y axis to use the Size field, and the series to use the Name field. The following graph shows the colored lines represented as the evolution of the different file sizes over time. Line Graph Chart Now we have an agent and a fully functioning Charts dashboard to analyze growing files trends. This architecture allows big room for extensibility as the Go agent can have further functionalities, more subscribers can consume the monitored data and act upon it, and finally, MongoDB Atlas and Charts can be used by various applications and embedded to different platforms.

Wrap Up

Building Go applications is simple yet has big benefits in terms of performance, cross platform code, and a large number of supported libraries and clients. Adding MongoDB Atlas via a Confluent Cloud Kafka service makes the implementation a robust and extensible stack, streaming data and efficiently storing and presenting it to the end user via Charts.
In this tutorial, we have covered all the basics you need to know in order to start using Go, Kafka, and MongoDB Atlas in your next streaming projects.
MongoDB Atlas
and Go today!

Copy Link
facebook icontwitter iconlinkedin icon
Rate this tutorial
Tuning the MongoDB Connector for Apache Kafka

Mar 01, 2022
MongoDB Podcast Interview with Connectors and Translators Team

Jan 10, 2022
Streaming Data with Apache Spark and MongoDB

May 05, 2022
Measuring MongoDB Kafka Connector Performance

Feb 15, 2022
Table of Contents