Joy Ike

2 results

MongoDB and IIoT: Data Streaming With Kafka

Event streaming has become a cornerstone of the industrial internet of things (IIoT) because it allows people to unleash the power of real-time operational data to drive applications and analytics. In this article, we share how MongoDB Atlas helps you move data seamlessly from the MQTT protocol into MongoDB time series collections using the Apache Kafka MQTT source and MongoDB sink connectors deployed in a cloud environment. Read the first and second articles in this four-part series on MongoDB and IIoT. Data streaming is the second step in our framework for end-to-end data integration in the manufacturing sector. The “connect” step of this framework deals with establishing an interface for interaction with IoT devices . The methodology discussed in this blog was developed and tested using a model factory created by Fischertechnik , but these steps are applicable to any environment that uses the standard MQTT protocol. All the source code for this project, along with a detailed deployment guide, can be found on our public Github repository. Figure 1. &nbsp;Step 2 of the end-to-end data integration framework. The challenge of collecting data On the shop floor, devices and components are continuously generating data related to their activity and environmental conditions at regular time intervals, typically known as time series data. In our factory model production line, there are a variety of sensors collecting data about temperature, pressure, humidity, brightness, camera positions, device/inventory status, and movements. This data is vital to monitor the health and effectiveness of factory equipment and its ability to continue to function without failure. The resulting datasets are often huge and must be efficiently stored and analyzed to detect anomalies or provide insight into overall equipment efficiency. With the advent of powerful event streaming platforms like Apache Kafka — and the wide variety of connectors for all sorts of protocols — it has become increasingly simple to handle the consolidation and export of real-time data feeds. However, dealing with such large volumes of data comes with added challenges regarding scalable storage, cost implications, and data archiving. This is where MongoDB’s time series collections come into play. Time series collections are a distinct type of MongoDB collections, optimized to efficiently store and process time series data by leveraging clustered indexes, columnar compression, and aggregation pipeline stages to facilitate real-time analytics. Learn more about time series collections on our tutorial page . Dream team: MQTT + Kafka + MongoDB Our recipe for collecting real-time sensor data (using the MQTT protocol) combines an MQTT source connector developed by Confluent and a native MongoDB sink connector deployed in a containerized environment. Figure 2. &nbsp;The components of the data streaming methodology. In this instance, we used a similar stack that includes Kafka Connect , a Kafka broker, and ZooKeeper deployed as containers in a single Docker compose file. This setup can be deployed locally, on a serverless backend or even Confluent Cloud. In our case, we have it deployed on an AWS EC2 Linux instance. Read our tutorial on how to set up a Kafka development environment with MongoDB connectors . Here’s a brief explanation of what each container does in this environment: Zookeeper: Acts a centralized controller that manages and organizes all the Kafka brokers. Kafka broker: Allows Kafka consumers to fetch messages by topic, partition, and offset. Kafka brokers can create a Kafka cluster by sharing information between each other. Kafka Connect: Serves as the runtime environment where you can configure connectors to ingest data into Kafka topics, making the data available for stream processing with low latency. It is worth noting that Kafka allows any number of sink and source connectors to be created in its environment as long as there are no server resource restrictions. Once the development environment is set up, all the necessary parameters are configured in the source and sink connectors. The source connector The source connector allows the Kafka broker to subscribe to MQTT topics. It serves to map the MQTT topics that contain the desired data parameters to a chosen Kafka topic. For simplicity, we’ve used Confluent’s MQTT source connector , which supports any kind of MQTT broker connection (self-hosted or otherwise). We’ve also used a managed MQTT service from HiveMQ as our remote broker. In the sample source connector configuration below, we’ve streamed sensor readings from multiple MQTT topics on the factory to a single Kafka topic called sensors using a string list of MQTT topics. We added the necessary access details to the remote broker from which Kafka will consume messages from the MQTT topic and save them as JSON values. Mapping several MQTT topics to the same Kafka topic does not affect the performance of the connector. { "name": "mqtt-source", "config": { "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max": "1", "mqtt.server.uri": "ssl://<REMOTE BROKER ADDRESS>:8883", "mqtt.username": "<REMOTE BROKER CLIENT>", "mqtt.password": "<REMOTE BROKER CLIENT PASSWORD>", "mqtt.topics": "i/ldr,i/bme680,i/cam", "kafka.topic": "sensors", "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "confluent.topic.bootstrap.servers": "broker:9092", "confluent.license": "", "topic.creation.enable": true, "topic.creation.default.replication.factor": -1, "topic.creation.default.partitions": -1 }} Figure 3. &nbsp;Sensor readings from multiple MQTT topics to a single Kafka topic. The sink connector While the source connector specifies the location from which data is retrieved, the sink connector specifies the destination to which data is sent. We used the MongoDB Kafka Sink Connector , which allowed us to connect to a MongoDB Atlas cluster with the right access information and choose which database and collection the streaming data was stored in. To receive the brightness readings captured in the source connector, the topics property in this connector must be set to match the name of the kafka.topic property in the former. { "name": "mongodb-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":1, "topics":"sensors", "connection.uri":"mongodb+srv://user:password@address.mongodb.net/database?retryWrites=true&w=majority", "database":"<database name>", "collection":"<collection name>", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "timeseries.timefield":"ts", "timeseries.timefield.auto.convert":"true", "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'", "transforms": "RenameField,InsertTopic", "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "h:humidity, p:pressure, t:temperature”, "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertTopic.topic.field":"Source" }} Figure 4. &nbsp; The converter properties instruct the connector on how to translate data from Kafka. The converter properties in Figure 4 instruct the connector on how to translate data from Kafka. This configuration also automatically creates a time series collection in the requested database using the timeseries.timefield properties, which allowed us to choose which field in the original MQTT message qualifies as the timestamp and auto-convert that to a MongoDB-compatible date/time value format. Find out more about the configurable properties of our Kafka connectors in our detailed documentation . Smooth sailing with MongoDB Atlas Once the connectors have been configured and launched, Kafka listens on the mapped topics for any change events and translates this to documents in a time series collection. As long as the Kafka environment is running and connection with the MQTT broker remains unbroken, the time series collection is updated in real time and highly compressed (often more than 90%) to accommodate the continuous influx of data. See a sample of the time series collection we created in Figure 5. Figure 5. &nbsp; Streamed data saved in a MongoDB Atlas time series collection. As the expectations of consumability vary across organizations and personas, the underlying data structure can be further tailored for different use cases by using materialized views and simple aggregations. Read the first and second articles in this four-part series on MongoDB and IIoT. Since time series data is hardly ever changed and “cools down” over time, storing it in a hot data tier can become costly. To optimize costs, MongoDB Atlas provides Online Archive , which allows you to configure filter criteria to trigger automatic offloading of “cold” data to cheaper storage while maintaining its queryability. Once you start to receive accurate real-time data from the factory floor, a world of opportunity opens up in terms of getting insights from the collected data. In our next post, we will show you how to leverage the rest of the MongoDB Atlas product suite to run analytics on operational data, including using Atlas Charts for instant seamless data visualizations (see Figure 6). Figure 6. &nbsp; A sample dashboard created from factory sensor data in Atlas Charts. All the source code used in this project, along with a detailed deployment guide, is available on our public Github repo . Feel free to clone it and play around with configuring Kafka and its connectors. Most of the principles discussed in this post are applicable to any device that uses MQTT as a communication protocol. To learn more, watch our webinar session to see the full reference architecture, get tips for configuring your Kafka connectors, and see a live demonstration. If you have questions about other communication protocols or would like to consult with someone from our team about your project, please contact us . Read Part 2 of this series on a 4-Step Data Integration .

August 10, 2022

MongoDB & IIoT: A 4-Step Data Integration

The Industrial Internet of Things (IIoT) is driving a new era of manufacturing, unlocking powerful new use cases to forge new revenue streams, create holistic business insights, and provide agility based on global and consumer demands. In Part 1 of this series, “ Manufacturing at Scale: MongoDB & IIoT ,” we gave an overview of the adoption and implementation of IIoT in manufacturing processes, testing various use cases with a model-size smart factory (Figure 1). In this post, we’ll look at how MongoDB’s flexible, highly available, and scalable data platform allows for end-to-end data integration using a four-step framework. Figure 1: Architecture diagram of MongoDB's platform with MQTT-enabled devices. 4-step framework for end-to-end data integration The four stages of this framework (Figure 2) are: Connect: Establish an interface to “listen” and “talk” to the device(s). Collect: Gather and store data from devices in an efficient and reliable manner. Compute: Process and analyze data generated by IoT devices. Create: Create unique solutions (or applications) through access to transformational data. Figure 2: The four-step framework for shop floor data integration During the course of this series, we will explore each of the four steps in detail, covering the tools and methodology and providing a walkthrough of our implementation process, using the Fischertechnik model as a basis for testing and development. All of the steps, however, are applicable to any environment that uses a Message Queuing Telemetry Transport (MQTT) API. The first step of the process is Connect. The first step: Connect The model factory contains a variety of sensors that are generating data on everything from the camera angle to the air quality and temperature — all in real time. The factory uses the MQTT protocol to send and receive input, output, and status messages related to the different factory components. You may wonder why we don’t immediately jump to the data collection stage. The reason is simple; we must first be able to “see” all of the data coming from the factory, which will allow us to select the metrics we are interested in capturing and configure our database appropriately. As a quick refresher on the architecture diagram of the factory, we see in Figure 3 that any messages transmitted in or out of the factory are routed through the Remote MQTT Broker. The challenge is to successfully read and write messages to and from the factory, respectively. Figure 3: Architecture diagram of the model smart factory It is important to remember that the method of making this connection between the devices and MongoDB depends on the communication protocols the device is equipped with. On the shop floor, multiple protocols are used for device communication, such as MQTT and OPC-UA, which may require different connector technologies, such as Kafka, among other off-the-shelf IoT connectors. In most scenarios, MongoDB can be integrated easily, regardless of the communication protocol, by adding the appropriate connector configuration. (We will discuss more about that implementation in our next blog post.) For this specific scenario, we will focus on MQTT. Figure 4 shows a simplified version of our connection diagram. Figure 4: Connecting the factory's data to MongoDB Atlas and Realm Because the available communication protocol for the factory is MQTT, we will do the following: Set up a remote MQTT broker and test its connectivity. Create an MQTT bridge. Send MQTT messages to the device(s). Note that these steps can be applied to any devices, machinery, or environment that come equipped with MQTT, so you can adapt this methodology to your specific project. Let’s get started. 1. Set up a remote MQTT broker To focus on the connection of the brokers, we used a managed service from HiveMQ to create a broker and the necessary hosting environment. However, this setup would work just as well with any self-managed MQTT broker. HiveMQ Cloud has a free tier, which is a great option for practice and for testing the desired configuration. You can create an account to set up a free cluster and add users to it. These users will function as clients of the remote broker. We recommend using different users for different purposes. Test the remote broker connectivity We used the Mosquitto CLI client to directly access the broker(s) from the command line. Then, we connected to the same network used by the factory, opened a terminal window, and started a listener on the local TXT broker using this command: mosquito_sub -h 192.168.0.10 -p 1883 -u txt -P xtx -t f/o/# Next, in a new terminal window, we published a message to the remote broker on the same topic as the listener. A complete list of all topics configured on the factory can be found in the Fischertechnik documentation . You can fill in the command below with the information of your remote broker. mosquitto_pub -h <hivemq-cloud-host-address> -p 8883 -u <hivemq-client-username> -P <hivemq-client-password> -t f/o/# -m "Hello" If the bridge has been configured correctly, you will see the message “Hello” displayed on the first terminal window that contains your local broker listener. Now we get to the good part. We want to see all the messages that the factory is generating for all of the topics. Because we are a bit more familiar with the Mosquitto CLI, we started a listener on the local TXT broker using this command: mosquitto_sub -h 192.168.0.10 -p 1883 -u txt -P xtx -t # Where the topic “#” essentially means “everything.” And just like that, we can get a sense of which parameters we can hope to extract from the factory into our database. As an added bonus, the data is already in JSON. This will simplify the process of streaming the data into MongoDB Atlas once we reach the data collection stage, because MongoDB runs on the document model , which is also JSON-based. The following screen recording shows the data stream that results from starting a listener on all topics to which the devices publish while running. You will notice giant blocks of data, which are the encoding of the factory camera images taken every second, as well as other metrics, such as stock item positions in the warehouse and temperature sensor data, all of which is sent at regular time intervals. This is a prime example of time series data, which we will describe how to store and process in a future article. Video: Results of viewing all device messages on all topics 2. Create a MQTT bridge An MQTT bridge (Figure 5) is a uni/bi-directional binding of topics between two MQTT brokers, such that messages published to one broker are relayed seamlessly to clients subscribed to that same topic on the other broker. Figure 5: Message relays between MQTT brokers In our case, the MQTT broker on the main controller is configured to forward/receive messages to/from the remote MQTT broker via the following MQTT bridge configuration: connection remote-broker address <YOUR REMOTE MQTT BROKER IP ADDRESS:PORT> bridge_capath /etc/ssl/certs notifications false cleansession true remote_username <HIVEMQ CLIENT USERNAME> remote_password <HIVEMQ CLIENT PASSWORD> local_username txt local_password xtx topic i/# out 1 "" "" topic o/# in 1 "" "" topic c/# out 1 "" "" topic f/i/# out 1 "" "" topic f/o/# in 1 "" "" try_private false bridge_attempt_unsubscribe false This configuration file is created and loaded directly into the factory broker via SSH. 3. Send MQTT messages to the device(s) We can test our bridge configuration by sending a meaningful MQTT message to the factory through the HiveMQ websocket client (Figure 6). We signed into the console with one of the users (clients) previously created and sent an order message to the “f/o/order” topic used in the previous step. Figure 6: Sending a test message using the bridged broker The format for the order message is: {"type":"WHITE","ts":"2022-03-23T13:54:02.085Z"} “Type” refers to the color of the workpiece to order. We have a choice of three workpiece colors: RED, WHITE, BLUE; “ts” refers to the timestamp of when the message is published. This determines its place in the message queue and when the order process will actually be started. Once the bridge is configured correctly, the factory will start to process the order according to the workpiece color specified in the message. Thanks for sticking with us through to the end of this process. We hope this methodology provides fresh insight for your IoT projects. Find a detailed tutorial and all the source code for this project on GitHub. Read Part 1 of this series on manufacturing and IIoT .

May 20, 2022