MongoDB and IIoT: Data Streaming With Kafka

Joy Ike

#IoT

Event streaming has become a cornerstone of the industrial internet of things (IIoT) because it allows people to unleash the power of real-time operational data to drive applications and analytics. In this article, we share how MongoDB Atlas helps you move data seamlessly from the MQTT protocol into MongoDB time series collections using the Apache Kafka MQTT source and MongoDB sink connectors deployed in a cloud environment.

Read the first and second articles in this four-part series on MongoDB and IIoT.

Data streaming is the second step in our framework for end-to-end data integration in the manufacturing sector. The “connect” step of this framework deals with establishing an interface for interaction with IoT devices. The methodology discussed in this blog was developed and tested using a model factory created by Fischertechnik, but these steps are applicable to any environment that uses the standard MQTT protocol. All the source code for this project, along with a detailed deployment guide, can be found on our public Github repository.

Visualization of the process for end-to-end data integration framework
Figure 1. Step 2 of the end-to-end data integration framework.

The challenge of collecting data

On the shop floor, devices and components are continuously generating data related to their activity and environmental conditions at regular time intervals, typically known as time series data. In our factory model production line, there are a variety of sensors collecting data about temperature, pressure, humidity, brightness, camera positions, device/inventory status, and movements. This data is vital to monitor the health and effectiveness of factory equipment and its ability to continue to function without failure. The resulting datasets are often huge and must be efficiently stored and analyzed to detect anomalies or provide insight into overall equipment efficiency.

With the advent of powerful event streaming platforms like Apache Kafka — and the wide variety of connectors for all sorts of protocols — it has become increasingly simple to handle the consolidation and export of real-time data feeds. However, dealing with such large volumes of data comes with added challenges regarding scalable storage, cost implications, and data archiving.

This is where MongoDB’s time series collections come into play. Time series collections are a distinct type of MongoDB collections, optimized to efficiently store and process time series data by leveraging clustered indexes, columnar compression, and aggregation pipeline stages to facilitate real-time analytics.

Learn more about time series collections on our tutorial page.

Dream team: MQTT + Kafka + MongoDB

Our recipe for collecting real-time sensor data (using the MQTT protocol) combines an MQTT source connector developed by Confluent and a native MongoDB sink connector deployed in a containerized environment.

Figure 2. The components of the data streaming methodology.

In this instance, we used a similar stack that includes Kafka Connect, a Kafka broker, and ZooKeeper deployed as containers in a single Docker compose file. This setup can be deployed locally, on a serverless backend or even Confluent Cloud. In our case, we have it deployed on an AWS EC2 Linux instance.

Here’s a brief explanation of what each container does in this environment:

  1. Zookeeper: Acts a centralized controller that manages and organizes all the Kafka brokers.

  2. Kafka broker: Allows Kafka consumers to fetch messages by topic, partition, and offset. Kafka brokers can create a Kafka cluster by sharing information between each other.

  3. Kafka Connect: Serves as the runtime environment where you can configure connectors to ingest data into Kafka topics, making the data available for stream processing with low latency.

It is worth noting that Kafka allows any number of sink and source connectors to be created in its environment as long as there are no server resource restrictions. Once the development environment is set up, all the necessary parameters are configured in the source and sink connectors.

The source connector

The source connector allows the Kafka broker to subscribe to MQTT topics. It serves to map the MQTT topics that contain the desired data parameters to a chosen Kafka topic. For simplicity, we’ve used Confluent’s MQTT source connector, which supports any kind of MQTT broker connection (self-hosted or otherwise). We’ve also used a managed MQTT service from HiveMQ as our remote broker.

In the sample source connector configuration below, we’ve streamed sensor readings from multiple MQTT topics on the factory to a single Kafka topic called sensors using a string list of MQTT topics. We added the necessary access details to the remote broker from which Kafka will consume messages from the MQTT topic and save them as JSON values. Mapping several MQTT topics to the same Kafka topic does not affect the performance of the connector.

{ 
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "ssl://<REMOTE BROKER ADDRESS>:8883",
"mqtt.username": "<REMOTE BROKER CLIENT>",
"mqtt.password": "<REMOTE BROKER CLIENT PASSWORD>",
"mqtt.topics": "i/ldr,i/bme680,i/cam",
"kafka.topic": "sensors",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "broker:9092",
"confluent.license": "",
"topic.creation.enable": true,
"topic.creation.default.replication.factor": -1,
"topic.creation.default.partitions": -1 
}}
Figure 3. Sensor readings from multiple MQTT topics to a single Kafka topic.

The sink connector

While the source connector specifies the location from which data is retrieved, the sink connector specifies the destination to which data is sent. We used the MongoDB Kafka Sink Connector, which allowed us to connect to a MongoDB Atlas cluster with the right access information and choose which database and collection the streaming data was stored in. To receive the brightness readings captured in the source connector, the topics property in this connector must be set to match the name of the kafka.topic property in the former.

{ 
"name": "mongodb-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":1,
"topics":"sensors",
"connection.uri":"mongodb+srv://user:password@address.mongodb.net/database?retryWrites=true&w=majority",
"database":"<database name>",
"collection":"<collection name>",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"timeseries.timefield":"ts",
"timeseries.timefield.auto.convert":"true",
"timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'",
"transforms": "RenameField,InsertTopic",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "h:humidity, p:pressure, t:temperature”,
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"Source" 
}}
Figure 4.  The converter properties instruct the connector on how to translate data from Kafka.

The converter properties in Figure 4 instruct the connector on how to translate data from Kafka. This configuration also automatically creates a time series collection in the requested database using the timeseries.timefield properties, which allowed us to choose which field in the original MQTT message qualifies as the timestamp and auto-convert that to a MongoDB-compatible date/time value format. Find out more about the configurable properties of our Kafka connectors in our detailed documentation.

Smooth sailing with MongoDB Atlas

Once the connectors have been configured and launched, Kafka listens on the mapped topics for any change events and translates this to documents in a time series collection. As long as the Kafka environment is running and connection with the MQTT broker remains unbroken, the time series collection is updated in real time and highly compressed (often more than 90%) to accommodate the continuous influx of data. See a sample of the time series collection we created in Figure 5.

Figure 5.  Streamed data saved in a MongoDB Atlas time series collection.

As the expectations of consumability vary across organizations and personas, the underlying data structure can be further tailored for different use cases by using materialized views and simple aggregations.

Read the first and second articles in this four-part series on MongoDB and IIoT.

Since time series data is hardly ever changed and “cools down” over time, storing it in a hot data tier can become costly. To optimize costs, MongoDB Atlas provides Online Archive, which allows you to configure filter criteria to trigger automatic offloading of “cold” data to cheaper storage while maintaining its queryability.

Once you start to receive accurate real-time data from the factory floor, a world of opportunity opens up in terms of getting insights from the collected data.

In our next post, we will show you how to leverage the rest of the MongoDB Atlas product suite to run analytics on operational data, including using Atlas Charts for instant seamless data visualizations (see Figure 6).

Figure 6.  A sample dashboard created from factory sensor data in Atlas Charts.

All the source code used in this project, along with a detailed deployment guide, is available on our public Github repo. Feel free to clone it and play around with configuring Kafka and its connectors. Most of the principles discussed in this post are applicable to any device that uses MQTT as a communication protocol.

To learn more, watch our webinar session to see the full reference architecture, get tips for configuring your Kafka connectors, and see a live demonstration. If you have questions about other communication protocols or would like to consult with someone from our team about your project, please contact us.

Read Part 2 of this series on a 4-Step Data Integration.