Migrate an Existing Collection to a Time Series Collection
In this tutorial, you can learn how to convert an existing MongoDB collection to a time series collection using the MongoDB Kafka Connector.
Time series collections efficiently store sequences of measurements over a period of time. Time series data consists of measurement data collected over time, metadata that describes the measurement, and the time of the measurement.
You can configure the source connector to read your existing MongoDB collection and the sink connector to write Kafka topic data into a MongoDB time series collection.
To learn more about MongoDB time series collections, see the MongoDB manual page on Time Series Collections.
Scenario
Suppose you accumulated stock price data in a MongoDB collection and have the following needs:
- More efficient storage of the price data
- Maintain the ability to analyze stock performance over time using aggregation operators
After reading about MongoDB time series collections, you decide to migrate your existing collection into a time series one. Learn how to perform this migration in the following sections.
Steps to Migrate to a Time Series Collection
To migrate an existing MongoDB collection to a time series collection, you need to perform the following tasks:
- Identify the time field in the existing stock price data document.
- Configure a source connector to copy the existing collection data to a Kafka topic.
- Configure a sink connector to copy the Kafka topic data to the time series collection.
- Verify the connector migrated the data to the time series collection.
Identify the Time Field
Before you create a time series collection, you need to identify the
time field. The time field is the document field that MongoDB uses to
distinguish the time of the measurement. The value of this field can be
a string, integer, or ISO date. Make sure to set the
timeseries.timefield.auto.convert
setting to instruct the connector to
automatically convert the value to a date.
The following document shows the format of stock price data documents in the existing MongoDB collection:
{ tx_time: 2021-07-12T05:20:35Z, symbol: 'WSV', company_name: 'WORRIED SNAKEBITE VENTURES', price: 21.22, _id: ObjectId("...") }
For this scenario, assume you stored these documents in a collection named
PriceData
in the Stocks
database.
You identify that the tx_time
field distinguishes the time of the
measurements, and specify it as your time field in your sink connector
configuration.
Learn how to set your time field and field conversion in the time series configuration guide.
Configure the Source Connector
To copy data from the PriceData
MongoDB collection data and publish it
to the marketdata.Stocks.PriceData
Kafka topic, create a source
connector with the following configuration:
{ "name": "mongo-source-marketdata", "config": { "tasks.max":"1", "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only":"true", "connection.uri":"<your connection uri>", "topic.prefix":"marketdata", "database":"Stocks", "collection":"PriceData", "copy.existing":"true" }}
If you insert documents into a collection during the copying process, the connector inserts them after the process is complete.
After you start your source connector with the preceding configuration, the connector starts the copying process. Once the process is complete, you should see the following message in the log:
Finished copying existing data from the collection(s).
Your data from the PriceData
MongoDB collection is now available in
the marketdata.Stocks.PriceData
Kafka topic.
Configure the Sink Connector
To consume data from the marketdata.Stocks.PriceData
Kafka topic and write
it to a time series collection named StockDataMigrate
in a database
named Stocks
, you can create the following source connector configuration:
{ "name": "mongo-sink-marketdata", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "topics":"marketdata.Stocks.PriceData", "connection.uri":"<your connection uri>", "database":"Stocks", "collection":"StockDataMigrate", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield":"tx_time", "timeseries.timefield.auto.convert":"true", "timeseries.timefield.auto.convert.date.format":"yyyy-MM-dd'T'HH:mm:ss'Z'" }}
The sink connector configuration above uses the time field date
format converter. Alternatively, you can use the TimestampConverter
Single Message Transform (SMT) to convert the tx_time
field from a
String
to an ISODate
. When using the TimestampConverter
SMT,
you must define a schema for the data in the Kafka topic.
For information on how to use the TimestampConverter
SMT, see the
TimestampConverter
Confluent documentation.
After your sink connector finishes processing the topic data, the documents
in the StockDataMigrate
time series collection contain the tx_time
field with an ISODate
type value.
Verify the Collection Data
By this step, your time series collection should contain all the market data
from your PriceData
collection. The following shows the format of the
documents in the StockDataMigrate
time series collection:
{ tx_time: 2021-07-12T20:05:35.000Z, symbol: 'WSV', company_name: 'WORRIED SNAKEBITE VENTURES', price: 21.22, _id: ObjectId("...") }
To learn how to verify a collection is of type timeseries, see the instructions on how to Check if a Collection is of Type Time Series in the MongoDB manual.