Streaming Time-Series Data Using Apache Kafka and MongoDB

There is one thing the world agrees on and it is the concept of time. Many applications are heavily time-based. Consider solar field power generation, stock trading, and health monitoring. These are just a few of the plethora of applications that produce and use data that contains a critical time component. In general, time-series data applications are heavy on inserts, rarely perform updates and are even more unlikely to delete the data. These applications generate a tremendous amount of data and need a robust data platform to effectively manage and query data. With MongoDB, you can easily:

  • Pre-aggregate data using the MongoDB Query language and window functions

  • Optimally store large amounts of time-series data with MongoDB time-series collections

  • Archive data to cost effective storage using MongoDB Atlas Online Archive

Apache Kafka is often used as an ingestion point for data due to its scalability. Through the use of the MongoDB Connector for Apache Kafka and the Apache Kafka Connect service, it is easy to transfer data between Kafka topics and MongoDB clusters. Starting in the 1.6 release of the MongoDB Connector for Apache Kafka, you can configure kafka topic data to be written directly into a time-series collection in MongoDB. This configuration happens in the sink.

Configuring time series collections in the sink

With MongoDB, applications do not need to create the database and collection before they start writing data. These objects are created automatically upon first arrival of data into MongoDB. However, a time-series collection type needs to be created first before you start writing data. To make it easy to ingest time-series data into MongoDB from Kafka, these collection options are exposed as sink parameters and the time-series collection is created by the connector if it doesn’t already exist . Some of the new parameters are defined as follows:

timeseries.timefield Name of the top level field used for time.

timeseries.expire.after.seconds This optional field determines the amount of time the data will be in MongoDB before being automatically deleted. Omitting this field means data will not be deleted automatically. If you are familiar with TTL indexes in MongoDB, setting this field provides a similar behavior.

timeseries.timefield.auto.convert This optional field tells the connector to convert the data in the field into a BSON Date format. Supported formats include integer, long, and string.

For a complete list of the new time-seris parameters check out the MongoDB Sink connector online documentation. When data is stored in time-series collections, MongoDB optimizes the storage and bucketization of your data behind the scenes. This saves a tremendous amount of storage space compared to the typical one document per data point data structure in regular collections. You can also explore the many new time and window functionalities within the MongoDB Query Language. For example, consider this sample document structure:

{
  tx_time: 2021-06-30T15:47:31.000Z,
  _id: '60dc921372f0f39e2cd6cba5',
  company_name: 'SILKY CORNERSTONE LLC',
  price: 94.0999984741211,
  company_symbol: 'SCL'
}

You can use the new $setWindowFields pipeline to define the window of documents to perform an operation on then perform rankings, cumulative totals, and other analytics of complex time series data. For example, using the data generated in the tutorial, let’s determine the rolling average to the data as follows:

db.StockDataTS.aggregate(
[
       {
            $match: {company_symbol: 'SCL'}
       },
       {
            $setWindowFields:
                  {
                        partitionBy: '$company_name',
                        sortBy: { 'tx_time': 1 },
                        output:
                               {
                                        averagePrice:
                                        {
                                              $avg: "$price",
                                              window:
                                                    {
                                                    Documents:
                                                         [ "unbounded", "current" ]
                                                    }
                                        }
                               }
                  }
        }
])

A sample of the result set is as follows:

{
    tx_time: 2021-06-30T15:47:45.000Z,
    _id: '60dc922172f0f39e2cd6cbeb',
    company_name: 'SILKY CORNERSTONE LLC',
    price: 94.06999969482422,
    company_symbol: 'SCL',
    averagePrice: 94.1346669514974
  },
  {
    tx_time: 2021-06-30T15:47:47.000Z,
    _id: '60dc922372f0f39e2cd6cbf0',
    company_name: 'SILKY CORNERSTONE LLC',
    price: 94.1500015258789,
    company_symbol: 'SCL',
    averagePrice: 94.13562536239624
  },
  {
    tx_time: 2021-06-30T15:47:48.000Z,
    _id: '60dc922472f0f39e2cd6cbf5',
    company_name: 'SILKY CORNERSTONE LLC',
    price: 94.0999984741211,
    company_symbol: 'SCL',
    averagePrice: 94.13352966308594
  }

Notice the additional “averagePrice” field is now populated with a rolling average. For more information on time-series collection in MongoDB check out the online documentation.

Migrating existing collections

To convert an existing MongoDB collection to a time-series collection you can use the MongoDB Connector for Apache Kafka. Simply configure the source connection to your existing collection and configure the sink connector to write to a MongoDB time series collection by using the “timeseries.timefield” parameter. You can configure the source connector to copy existing data by setting the “copy.existing” parameter to true. This will create insert events for all existing documents in the source. Any documents that were inserted during the copying process will be inserted once the copying process has finished. While not always possible, it is recommended to pause writes to the source data while the copy process is running. To see when it finishes, you can view the logs for the message, “Finished copying existing data from the collection(s).”.

For example, consider a source document that has this structure:

{
	company_symbol: (STRING),
	company_name: (STRING),
 	price: (DECIMAL),
	tx_time: (STRING)
}

For the initial release of MongoDB Time series collections, the field that represents the time is required to be stored as a Date. In our example, we are using a string to showcase the ability for the connector to automatically convert from a string to a Date. If you chose to perform the conversion outside of the connector you could use a Single Message Transform in Kafka Connect to convert the string into a Date at the Sink. However, certain SMTs like Timestampconverter require schemas to be defined for the data in the Kafka topic in order to work. This may add some complexity to the configuration. Instead of using an SMT you can automatically convert into Dates using the new timeseries.timefield.auto.convert, and timeseries.timefield.auto.convert.date.format options.

Here is a sample source configuration that will copy all the existing data from the StockData collection then continue to push data changes to the stockdata.Stocks.StockData topic:

{"name": "mongo-source-stockdata",
        "config": {
          "tasks.max":"1",
          "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
          "key.converter":"org.apache.kafka.connect.storage.StringConverter",
          "value.converter":"org.apache.kafka.connect.json.JsonConverter",
          "publish.full.document.only": true,
          "connection.uri":(MONGODB SOURCE CONNECTION STRING),
          "topic.prefix":"stockdata",
          "database":"Stocks",
          "collection":"StockData",
          "copy.existing":"true"
     }}

This is a sample configuration for the sink to write the data from the stockdata.Stocks.StockData topic to a MongoDB time series collection:

 {"name": "mongo-sink-stockdata",
        "config": {
          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
          "tasks.max":"1",
          "topics":"stockdata.Stocks.StockData",
          "connection.uri":(MONGODB SINK CONNECTION STRING),
          "database":"Stocks",
          "collection":"StockDataMigrate",
          "key.converter":"org.apache.kafka.connect.storage.StringConverter",
          "value.converter":"org.apache.kafka.connect.json.JsonConverter",
          "timeseries.timefield":"tx_time",
          "timeseries.timefield.auto.convert":"true",
          "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'"
          
     }}

In this sink example, the connector will convert the data in the “tx_time” field into a Date and parse it expecting the string format yyyy-MM-ddTHH:mm:ssZ (e.g. '2021-07-06T12:25:45Z')

Note that in the initial version of time-series collections, only insert into a time-series collection is supported. Updating or deleting documents on the source will not propagate to the destination. Also, you can not use the MongoDB CDC Handler in this scenario because the handler uses ReplaceOne which is a type of update command. These are limitations of the initial release of time-series in MongoDB and may be irrelevant by the time you read this post. Check the online documentation for the latest information.

The MongoDB Connector for Apache Kafka version 1.6 is available to download from GitHub. Look for it on the Confluent Hub later this week!