Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Migrar una colección existente a una colección de series de tiempo

Sigue este tutorial para aprender cómo convertir una colección existente de MongoDB en una colección de series de tiempo usando el MongoDB Kafka Connector.

Las colecciones de series temporales almacenan eficientemente datos de series temporales. Estos datos constan de mediciones tomadas a intervalos de tiempo, metadatos que describen la medición y la hora de la medición.

Para convertir datos de una colección de MongoDB a una colección de series de tiempo utilizando el conector, debe realizar las siguientes tareas:

  1. Identificar el campo de tiempo común a todos los documentos de la colección.

  2. Configura un conector de origen para copiar los datos de la colección existente a un tema de Kafka.

  3. Configura un conector de sink para copiar los datos del tema de Kafka a la colección de series de tiempo.

En este tutorial, realizas estas tareas previas para migrar datos de acciones de una colección a una colección de series temporales. La colección de series de tiempo almacena e indexa los datos de manera más eficiente y conserva la capacidad de analizar el rendimiento de las acciones en el tiempo utilizando operadores de agregación.

1

Complete los pasos en el Configurar el tutorial del conector Kafka para iniciar el entorno de Confluent Kafka Connect y MongoDB.

2

Ejecute el siguiente comando para iniciar un script en su entorno Docker que genere una colección de muestra que contenga símbolos bursátiles fabricados y sus precios en su conjunto de réplicas MongoDB del tutorial:

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

Una vez que el generador de datos comience a funcionar, debería ver datos generados similares a los siguientes:

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

En una ventana de terminal independiente, cree una sesión de shell interactiva en el contenedor Docker del tutorial descargado para la configuración del tutorial utilizando el siguiente comando:

docker exec -it mongo1 /bin/bash

Crea un archivo de configuración de origen denominado stock-source.json con el siguiente comando:

nano stock-source.json

Copie la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

Esta configuración indica al conector que copie los datos existentes de la colección MongoDB PriceData al tema Kafka marketdata.Stocks.PriceData, y una vez completado el traslado, todos los datos futuros insertados en esa colección también serán copiados.

Ejecute el siguiente comando en el shell para iniciar el conector de origen utilizando el archivo de configuración que creó:

cx stock-source.json

Nota

El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

Una vez que se inicie el conector de origen, confirmar que el tema de Kafka recibió los datos de la colección ejecutando el siguiente comando:

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

La salida debe mostrar los datos de los temas publicados por el conector de origen que se parezcan a lo siguiente:

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

Puedes salir de kafkacat escribiendo CTRL+C.

4

Configura un conector destinatario para leer datos del tema de Kafka y guardarlos en una colección de series de tiempo llamada StockDataMigrate en una base de datos llamada Stocks.

Cree un archivo de configuración de receptor llamado stock-sink.json con el siguiente comando:

nano stock-sink.json

Copie la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

Tip

La configuración del conector de destino anterior utiliza el convertidor de formato de fecha de campo de tiempo. Como alternativa, se puede utilizar la TimestampConverter Transformación de Mensaje Único (SMT) para convertir el campo tx_time de un String a un ISODate. Al utilizar el TimestampConverter SMT, debes definir un esquema para los datos en el tema de Kafka.

Para obtener información sobre cómo usar la TimestampConverter SMT, consulta la TimestampConverter Documentación de Confluent.

Ejecuta el siguiente comando en el shell para iniciar el conector de sumidero usando el archivo de configuración que actualizaste:

cx stock-sink.json

Después de que el conector de destino termine el procesamiento de los datos del tema, los documentos en la colección de series de tiempo StockDataMigrate contienen el campo tx_time con un valor de tipo ISODate.

5

Una vez que el conector de sumidero complete el procesamiento de los datos del tema, la colección de series de tiempo StockDataMigrate debería contener todos los datos de mercado de tu colección PriceData.

Para ver los datos en MongoDB, ejecute el siguiente comando para conectarse a su conjunto de réplicas usando mongosh:

mongosh "mongodb://mongo1"

En el mensaje, escriba los siguientes comandos para recuperar todos los documentos en el espacio de nombres Stocks.StockDataMigrate de MongoDB:

use Stocks
db.StockDataMigrate.find()

Deberías ver una lista de documentos que se devuelven del comando y que se asemejan al siguiente documento:

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

En este tutorial, creó un generador de datos de cotizaciones bursátiles que escribía datos periódicamente en una colección de MongoDB. Configuró un conector de origen para copiar los datos en un tema de Kafka y un conector de destino para escribirlos en una nueva colección de series temporales de MongoDB.

Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial:

Volver

Replicar datos con un controlador de captura de datos modificados

En esta página