Siga este tutorial para aprender a convertir una colección MongoDB existente en una colección de series de tiempo utilizando el conector MongoDB Kafka.
Las colecciones de series temporales almacenan eficientemente datos de series temporales. Estos datos constan de mediciones tomadas a intervalos de tiempo, metadatos que describen la medición y la hora de la medición.
Para convertir datos de una colección MongoDB a una colección de series temporales mediante el conector, debe realizar las siguientes tareas:
Identifique el campo de tiempo común a todos los documentos de la colección.
Configure un conector de origen para copiar los datos de recopilación existentes a un tema de Kafka.
Configure un conector de sumidero para copiar los datos del tema de Kafka a la colección de series de tiempo.
En este tutorial, realizará las tareas anteriores para migrar datos bursátiles de una colección a una colección de series temporales. La colección de series temporales almacena e indexa los datos de forma más eficiente y conserva la capacidad de analizar el rendimiento bursátil a lo largo del tiempo mediante operadores de agregación.
Migrar una colección a una colección de series temporales
Completar la configuración del tutorial
Complete los pasos en el Tutorial del conector de Kafka Configuración para iniciar el entorno de Confluent Kafka Connect y MongoDB.
Generar datos de muestra
Ejecute el siguiente comando para iniciar un script en su entorno Docker que genere una colección de muestra que contenga símbolos bursátiles fabricados y sus precios en su conjunto de réplicas MongoDB del tutorial:
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"
Una vez que el generador de datos comience a funcionar, debería ver datos generados similares a los siguientes:
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ...
Configurar el conector de origen
En una ventana de terminal independiente, cree una sesión de shell interactiva en el contenedor Docker del tutorial descargado para la configuración del tutorial utilizando el siguiente comando:
docker exec -it mongo1 /bin/bash
Cree un archivo de configuración de origen llamado stock-source.json con el siguiente comando:
nano stock-source.json
Pegue la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-source-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "publish.full.document.only": "true", "connection.uri": "mongodb://mongo1", "topic.prefix": "marketdata", "database": "Stocks", "collection": "PriceData", "copy.existing": "true" } }
Esta configuración indica al conector que copie los datos existentes de la colección MongoDB PriceData al tema Kafka marketdata.Stocks.PriceData, y una vez completado el traslado, todos los datos futuros insertados en esa colección también serán copiados.
Ejecute el siguiente comando en el shell para iniciar el conector de origen utilizando el archivo de configuración que creó:
cx stock-source.json
Nota
El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"
Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:
status
Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:
Kafka topics: ... The status of the connectors: source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ...
Una vez que se inicia el conector de origen, confirme que el tema de Kafka recibió los datos de recopilación ejecutando el siguiente comando:
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData
La salida debe mostrar los datos de los temas publicados por el conector de origen que se parezcan a lo siguiente:
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}
Puedes salir de kafkacat escribiendo CTRL+C.
Configurar el conector del sumidero
Configura un conector destinatario para leer datos del tema de Kafka y guardarlos en una colección de series de tiempo llamada StockDataMigrate en una base de datos llamada Stocks.
Cree un archivo de configuración de receptor llamado stock-sink.json con el siguiente comando:
nano stock-sink.json
Pegue la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-sink-marketdata", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "marketdata.Stocks.PriceData", "connection.uri": "mongodb://mongo1", "database": "Stocks", "collection": "StockDataMigrate", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "timeseries.timefield": "tx_time", "timeseries.timefield.auto.convert": "true", "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'" } }
Tip
La configuración del conector de receptor anterior utiliza el convertidor de formato de fecha del campo de hora. Como alternativa, puede usar la Transformación de Mensaje Único (SMT) TimestampConverter para convertir el campo tx_time de String a ISODate. Al usar la SMT TimestampConverter, debe definir un esquema para los datos en el tema de Kafka.
Para obtener información sobre cómo utilizar el SMT TimestampConverter, consulte la
Convertidor de marcas de tiempo
Documentación confluente.
Ejecute el siguiente comando en el shell para iniciar el conector del receptor usando el archivo de configuración que actualizó:
cx stock-sink.json
Una vez que el conector del receptor termina de procesar los datos del tema, los documentos en la colección de series de tiempo StockDataMigrate contienen el campo tx_time con un valor de tipo ISODate.
Verificar los datos de la recopilación de series temporales
Una vez que el conector del sumidero termina de procesar los datos del tema, la colección de series de tiempo StockDataMigrate debe contener todos los datos de mercado de su colección PriceData.
Para ver los datos en MongoDB, ejecute el siguiente comando para conectarse a su conjunto de réplicas usando mongosh:
mongosh "mongodb://mongo1"
En el indicador, escriba los siguientes comandos para recuperar todos los documentos en el espacio de nombres Stocks.StockDataMigrate de MongoDB:
use Stocks db.StockDataMigrate.find()
Debería ver una lista de documentos devueltos por el comando que se parecen al siguiente documento:
{ tx_time: ISODate("2022-05-25T21:16:35.983Z"), _id: ObjectId("628e9..."), symbol: 'FAV', price: 18.43, company_name: 'FUZZY ATTACK VENTURES' }
Resumen
En este tutorial, creó un generador de datos de cotizaciones bursátiles que escribía datos periódicamente en una colección de MongoDB. Configuró un conector de origen para copiar los datos en un tema de Kafka y un conector de destino para escribirlos en una nueva colección de series temporales de MongoDB.
Obtén más información
Lea los siguientes recursos para obtener más información sobre los conceptos mencionados en este tutorial: