Docs Menu
Docs Home
/
MongoDB Kafka Connector
/

Migrate an Existing Collection to a Time Series Collection

On this page

  • Migrate a Collection to a Time Series Collection
  • Summary
  • Learn More

Follow this tutorial to learn how to convert an existing MongoDB collection to a time series collection using the MongoDB Kafka Connector.

Time series collections efficiently store time series data. Time series data consists of measurements taken at time intervals, metadata that describes the measurement, and the time of the measurement.

To convert data from a MongoDB collection to a time series collection using the connector, you must perform the following tasks:

  1. Identify the time field common to all documents in the collection.

  2. Configure a source connector to copy the existing collection data to a Kafka topic.

  3. Configure a sink connector to copy the Kafka topic data to the time series collection.

In this tutorial, you perform these preceding tasks to migrate stock data from a collection to a time series collection. The time series collection stores and indexes the data more efficiently and retains the ability to analyze stock performance over time using aggregation operators.

1

Complete the steps in the Kafka Connector Tutorial Setup to start the the Confluent Kafka Connect and MongoDB environment.

2

Run the following command to start a script in your Docker environment that generates a sample collection containing fabricated stock symbols and their prices in your tutorial MongoDB replica set:

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

Once the data generator starts running, you should see the generated data that resembles the following:

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

In a separate terminal window, create an interactive shell session on the tutorial Docker container downloaded for the Tutorial Setup using the following command:

docker exec -it mongo1 /bin/bash

Create a source configuration file called stock-source.json with the following command:

nano stock-source.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

This configuration instructs the connector to copy existing data from the PriceData MongoDB collection to the marketdata.Stocks.PriceData Kafka topic, and once complete, any future data inserted in that collection.

Run the following command in the shell to start the source connector using the configuration file you created:

cx stock-source.json

Note

The cx command is a custom script included in the tutorial development environment. This script runs the following equivalent request to the Kafka Connect REST API to create a new connector:

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

Run the following command in the shell to check the status of the connectors:

status

If your source connector started successfully, you should see the following output:

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

Once the source connector starts up, confirm the Kafka topic received the collection data by running the following command:

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

The output should show topic data as it is published by the source connector that resembles the following:

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

You can exit kafkacat by typing CTRL+C.

4

Configure a sink connector to read data from the Kafka topic and write it to a time series collection named StockDataMigrate in a database named Stocks.

Create a sink configuration file called stock-sink.json with the following command:

nano stock-sink.json

Paste the following configuration information into the file and save your changes:

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

Tip

The sink connector configuration above uses the time field date format converter. Alternatively, you can use the TimestampConverter Single Message Transform (SMT) to convert the tx_time field from a String to an ISODate. When using the TimestampConverter SMT, you must define a schema for the data in the Kafka topic.

For information on how to use the TimestampConverter SMT, see the TimestampConverter Confluent documentation.

Run the following command in the shell to start the sink connector using the configuration file you updated:

cx stock-sink.json

After your sink connector finishes processing the topic data, the documents in the StockDataMigrate time series collection contain the tx_time field with an ISODate type value.

5

Once the sink connector completes processing the topic data, the StockDataMigrate time series collection should contain all the market data from your PriceData collection.

To view the data in MongoDB, run the following command to connect to your replica set using mongosh:

mongosh "mongodb://mongo1"

At the prompt, type the following commands to retrieve all the documents in the Stocks.StockDataMigrate MongoDB namespace:

use Stocks
db.StockDataMigrate.find()

You should see a list of documents returned from the command that resemble the following document:

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

In this tutorial, you created a stock ticker data generator that periodically wrote data into a MongoDB collection. You configured a source connector to copy the data into a Kafka topic and configured a sink connector to write that data into a new MongoDB time series collection.

Read the following resources to learn more about concepts mentioned in this tutorial:

Back

Replicate Data with a Change Data Capture Handler

Next

Sink Connector