Finding the Moving Average From Heterogeneous Data Sources Using Apache Kafka, MongoDB and R
Overview
- Pre-requisites
- Demo Installation and Configuration
- Demo Script
- MongoDB Connector for Apache Kafka Deep Dive
- Summary
We’ve recently announced the general availability of the MongoDB Connector for Apache Kafka which allows you to easily build robust and reactive data pipelines between datastores, applications, and services in real-time. In this blog post, we’ll be walking through a real-world example and show you how to leverage the Connector to take advantage of stream processing.
Let’s assume you work for a stock trading company. The business has requested that you create a report that shows a moving average of their securities like the one below:
The challenge for you is that financial security information is stored all over the place in both traditional relational databases (i.e. MySQL) and in modern data platforms (i.e. MongoDB).
How can you collect data from heterogeneous data platforms and perform analytics on these data?
We will solve this problem by using Apache Kafka to collect data from MySQL and from MongoDB. Data from these topics will then be written into a single MongoDB collection in a MongoDB Atlas cluster. Once in MongoDB Atlas we will use RStudio to perform moving average calculation and graphing.
This demo is run all within docker containers so you can easily spin it up yourself. A detailed data flow of the demo is as follows:
- Python apps from StockGenMySQL and StockGenMongo containers generate fictitious stock security data into MySQL and MongoDB, respectively.
- A web page hosted in Stockportal is used to show the randomly created stock symbols
- MongoDB Connector for Apache Kafka is added as a Kafka connector and configured as a source; moving data from the local MongoDB replica set into Kafka Topic.
- Debezium MySQL Connector is added as a Kafka connector and configured to move data from the local MySQL into a Kafka Topic
- MongoDB Connector for Apache Kafka is added as a Kafka connector and configured as a sink; moving data from both Kafka topics to MongoDB Atlas
- Client machine uses mongolite (R Driver for MongoDB) and RStudio to query data in MongoDB Atlas
Pre-requisites
The following are needed to be installed or available before running the demo:
- Connection to a MongoDB Atlas cluster (the free tier cluster works for this demo). See step 3 below for more information.
- Docker
- Kafkacat
- RStudio (optional) For this demo the free RStudio Open Source Edition works well.
- Mongolite R driver (optional)
Once RStudio is installed, you can install the Mongolite R driver directly from CRAN using the statement,
install.packages("mongolite")
. More details on the driver are located here.
Demo Installation and Configuration
-
Download/Clone the docker files from the GitHub repository
-
Build the demo images
Run the
build-images.sh
script from the command shell:sh Build-images.sh

Note: Make sure you are in the same directory as the build-images script file. Also, you may have to add execute permission via a chmod +x build-images.sh
to execute the script.
This shell script will build the following demo containers locally: mysqlimg
, stockgenmongo
, stockgenmysql
, stockportal
. You can confirm these four images were created by issuing a “docker images” command.
-
Copy the Atlas Connection String
If you do not have a MongoDB Atlas cluster, follow these instructions.
After you’ve created a cluster, you will need to define a database user for use by the Kafka Connector to connect to the MongoDB Atlas cluster. You will also have to whitelist the IP address of the docker host.
If you have not created a database user for the Kafka Connector:
Select “Database Access” from the Atlas menu and click the “Add New User” button. This will launch the new user dialog as shown below:

Provide a username and password and select, “Read and write to any database”. Remember the password.
If your docker host is not whitelisted:
Click, “Network Access” from the Atlas menu and click, “Add IP Address”. This will launch the Add Whitelist Entry dialog as shown below:

Here you can add your current IP address or any IP Address or block of addresses in CIDR format. Note: If you do not know or can not obtain the IP address of the docker host you can add, “0.0.0.0” as an entry which will allow connections from anywhere on the internet. This is not a recommended configuration.
Copy the MongoDB Atlas Connection String:
To copy the connection string select the “CONNECT” button on your Atlas cluster then choose “Connect your application”. This will pop up a dialog and display the connection string as follows:

Click the Copy button to copy the connection string to the clipboard.
-
Execute the RUN.SH
Pass the Atlas Connection string as a parameter to the RUN.SH script file as follows:
sh run.sh "<<paste in your Atlas Connection String here>>"

Verify the correct password is used in the connection string.
Demo Script
Once the docker images and containers are built and deployed, the demo can be run repeatedly by simply executing:
sh run.sh "<<paste in your Atlas Connection String here>>"

View the Generated Stock Entities
Open a web browser on your docker host and navigate to http://localhost:8888
The demo will randomly generate 10 securities, 5 for MySQL and 5 for MongoDB respectively. This web page simply connects to MySQL and MongoDB and shows the names of the stocks that will be used within the current iteration of the demo.

View the topic messages
Stockgenmongo and Stockgenmysql containers are running python apps that are pushing stock transactions into their respective databases. Messages in mysqlstock.Stocks.StockData topic are using the Debezium MySQL connector. Messages in the stockdata.Stocks.StockData topic came from the MongoDB Connector for Apache Kafka. You can view the messages in these Kafka topics using the Kafkacat tool. Messages present in these topics validates that our connectors are set up and working.
View messages from MySQL in mysqlstock.Stocks.StockData topic
kafkacat -b 127.0.0.1:9092 -t mysqlstock.Stocks.StockData -s avro -r 127.0.0.1:8081

...
{"before": null, 
"after": {"Value": {"company_symbol": {"string": "WTP"}, 
"company_name": {"string": "WHIMSICAL TAMBOUR PRODUCTIONS"}, 
"price": {"bytes": "%\u0017"}, 
"tx_time": {"string": "2020-01-28T18:30:34Z"}}}, 
"source": {"version": "0.10.0.Final", 
 "connector": "mysql", 
 "name": "mysqlstock", 
 "ts_ms": 1580236234000, 
 "snapshot": {"string": "false"}, 
"db": "Stocks",
 "table": {"string": "StockData"}, 
"server_id": 223344, 
"gtid": null, 
"file": "mysql-bin.000003",
 "pos": 1906765, 
"row": 0, 
"thread": {"long": 9},
"query": null}, 
"op": "c", 
"ts_ms": {"long": 1580236234223}}

View messages from MongoDB in stockdata.Stocks.StockData topic
kafkacat -b 127.0.0.1:9092 -t stockdata.Stocks.StockData 

…
"{\"_id\": {\"$oid\": \"5e307e3940bacb724265e4a8\"}, 
\"company_symbol\": \"ISH\", 
\"company_name\": \"ITCHY STANCE HOLDINGS\", 
\"price\": 35.02, 
\"tx_time\": \"2020-01-28T18:32:25Z\"}"

View the combined data in MongoDB Atlas
The MongoDB Connector for Apache Kafka is configured as a sink connector and writes data to MongoDB Atlas. Data is written to the StockData collection in the Stocks database. Click on "Collections" tab in your MongoDB Atlas portal to view the StockData collection. These data are from both the MySQL and MongoDB databases.

Calculate the moving average using R
The R language has many libraries that are useful for analytics. MongoDB has support for R via the mongolite R driver. The complete script "R-Demo-Script.txt" is located in this github repository.
Launch RStudio and install the ggplot2 and Mongolite driver:
install.packages("mongolite")
install.packages("ggplot2")
library(ggplot2)
library(mongolite)

Next, make a connection to MongoDB replacing the connection string placeholder with your connection to MongoDB Atlas.
ts=mongo(collection="StockData", db="Stocks", url="<<ATLAS CONNECTION STRING HERE>>")

Load the data set into memory by issuing a query and storing the results of the query in a dataframe.
tss <- ts$find(fields='{"company_symbol":true,"company_name":true,"price":true,"tx_time":true}')

tsdf<-data.frame(tss$tx_time,tss$price, tss$company_symbol)

Verify the data is loaded by typing the “tsdf” variable
tsdf

tss.tx_time tss.price tss.company_symbol
1 2020-01-28T18:12:01Z 74.57 FCI
2 2020-01-28T18:12:01Z 24.49 CCH
3 2020-01-28T18:12:01Z 29.36 FOP
4 2020-01-28T18:12:01Z 14.61 MIF

Now that the data is loaded in our client we can have fun making graphs, here is a boxplot that shows all the stock symbols with their max,min and averages:
ggplot(data=tsdf,mapping=aes(x=tss.company_symbol, 
 y=tss.price, color=tss.company_symbol)) + geom_boxplot()


Pick a stock security from your graph. In our example, we chose “EAV” and replaced the <
ggplot(data=subset(tsdf,tss.company_symbol == "<<YOUR_SYMBOL HERE>>"),
 mapping=aes(as.POSIXct.default(tss.tx_time,format = "%Y-%m-%dT%H:%M:%SZ", tz = "UTC"), 
 y=tss.price, color=tss.company_symbol)) + geom_point() + geom_smooth(method='loess', color = "#09557f",size = 0.6) + ggtitle("Moving average") + xlab("Time") + ylab("Price")


MongoDB Connector for Apache Kafka Deep Dive
Kafka Connectors make it easy to integrate heterogeneous data sources with your Kafka cluster. You can configure these connectors through REST API calls. The MongoDB Connector for Apache Kafka can be a source or sink. In this demo, we use it both ways. The magic of configuring the Kafka connectors in this demo resides in the run.sh file.
Using MongoDB as a Source
Within the run.sh file we can see the REST API call defining a MongoDB source connector.
curl -X POST -H "Content-Type: application/json" --data '
 {"name": "mongo-source-stockdata",
 "config": {
 "tasks.max":"1",
 "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
 "key.converter":"org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable":false,
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":false,
 "publish.full.document.only": true,
 "connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
 "topic.prefix":"stockdata",
 "database":"Stocks",
 "collection":"StockData"
}}' http://localhost:8083/connectors -w "\n"

As a source, data is obtained from a MongoDB cluster and sent to a Kafka topic. Let’s break down and highlight the interesting parts of this configuration:
"key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":false, "value.converter":"org.apache.kafka.connect.json.JsonConverter", "Value.converter.schemas.enable":false
Kafka messages are not specifically JSON or plain text, in fact, they are simply just bytes of data. Thus, Kafka needs to be told how to serialize the data. Since we are obtaining data from MongoDB we can use the JsonConverter. Check out the Confluent blog post, “Kafka Connect Deep Dive – Converters and Serialization Explained” for a deeper discussion on this topic.
"publish.full.document.only": true
The MongoDB Connector for Apache Kafka leverages the Change Stream feature within MongoDB, the document that is returned as part of the event contains metadata about the event such as operation type, document, and collection. In our case, we only want the actual document data itself to be sent to the Kafka Topic. For this reason, we set “publish.full.document.only” to be true.
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017"
In our demo, the docker compose stands up a MongoDB three-node replica set on the same local network as the other containers. The URI includes all the nodes of a replica set. Using a replica set is a requirement when using the MongoDB Connector as a source for a Kafka topic.
"topic.prefix":"stockdata"
By default the connector will publish into a topic name that is in the format, “database.collection”. By specifying the topic.prefix you can ensure a unique topic name. In our demo, we will write the data to the stockdata.Stocks.StockData kafka topic.
"database":"Stocks", "collection":"StockData"
This is the database and collection name we are watching. If we omit the collection then all collections within the database are watched. If we omit the database then all databases within MongoDB are watched.
Note that in this demo we are using only a subset of the possible configuration values. For a complete list, check out the MongoDB Connector for Apache Kafka documentation. One of the notable parameters that we did not use in the demo but is worth mentioning is the pipeline operation. Pipeline allows you to filter or modify the change stream document by specifying an aggregation pipeline. For example, if we added the following pipeline:
[{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]

The connector would configure the MongoDB change stream to only push documents that were inserted into MongoDB (as opposed to updated or deleted). In addition these documents would include a new custom field. Using a pipeline can be useful in reducing the amount of data that is flowing into your Kafka topic by only pushing data relevant to your application.
Using MongoDB as a Sink
Within the run.sh file we can see the REST API call defining a MongoDB sink connector.
curl -X POST -H "Content-Type: application/json" --data '
 {"name": "mongo-atlas-sink",
 "config": {
 "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
 "tasks.max":"1",
 "topics":"stockdata.Stocks.StockData",
"Connection.uri":"<<connection string to MongoDB Atlas>>",
 "database":"Stocks",
 "collection":"StockData",
 "key.converter":"org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable":false,
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":false
}}' http://localhost:8083/connectors -w "\n"

While the parameter names are similar to those in the source configuration, the sink configuration takes the perspective of data flowing from a kafka topic to a MongoDB cluster.
Highlights of this configuration are as follows:
“topics”:”stockdata.Stocks.StockData"
The topic parameter is the Kafka topic where we are sourcing the data. These data are written to the MongoDB Cluster defined in the Connection.url parameter.
"database":"Stocks", "collection":"StockData"
In the sink we define what database and collection to write the data into.
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":false
Summary
For this demo, we showcased using the official MongoDB Connector for Apache Kafka as both a source and sink. We leveraged a real-world example that took data from heterogeneous systems and used the power of Kafka to collect that data, and ultimately put it in MongoDB for long term storage, analysis, and reporting.
The MongoDB Connector for Apache Kafka is being actively developed and maintained by MongoDB. To learn more about the Connector visit our product page.