Overview
Siga este tutorial para aprender a usar un controlador de captura de datos de cambios (CDC) para replicar datos con el conector MongoDB Kafka. Un controlador CDC es una aplicación que traduce eventos CDC en operaciones de escritura de MongoDB. Úselo cuando necesite reproducir los cambios de un almacén de datos en otro.
En este tutorial, configurarás y ejecutarás conectores de origen y sumidero de MongoDB Kafka para que dos colecciones de MongoDB contengan los mismos documentos mediante CDC. El conector de origen escribe datos del flujo de cambios desde la colección original en un tema de Kafka, y el conector de sumidero toma los datos del tema de Kafka y los escribe en la colección de MongoDB de destino.
Si desea obtener más información sobre cómo trabajan los manipuladores de CDC, consulte la Guía de controladores de Capture Data Capture.
Replicar datos con un Manejador CDC
Completa la configuración del tutorial
Completa los pasos en la configurar del tutorial de Kafka Connector para iniciar el entorno de Confluent Kafka Connect y MongoDB.
Iniciar shells interactivos
Inicie dos shells interactivos en el contenedor Docker en ventanas separadas. En el tutorial, puedes utilizar los shells para ejecutar y observar diferentes tareas.
Ejecute el siguiente comando desde una terminal para iniciar un shell interactivo.
docker exec -it mongo1 /bin/bash
Nos referiremos a esta shell interactiva como CDC Shell 1 a lo largo de este tutorial.
Ejecute el siguiente comando en un segundo terminal para iniciar un shell interactivo:
docker exec -it mongo1 /bin/bash
A lo largo de este tutorial nos referiremos a este shell interactivo como CDCShell2.
Organiza las dos ventanas en tu pantalla para mantenerlas visibles y ver actualizaciones en tiempo real.
Usa CDCShell1 para configurar tus conectores y supervisar tu tema de Kafka. Usa CDCShell2 para realizar operaciones de guardar en MongoDB.
Configura el conector de origen
En CDCShell,1 configure un conector de origen para leer desde el
CDCTutorial.Source Espacio de nombres MongoDB y escribir en el tema de Kafka CDCTutorial.Source.
Cree un archivo de configuración llamado cdc-source.json usando el siguiente comando:
nano cdc-source.json
Copie la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
Ejecute el siguiente comando en CDCShell1 para iniciar el conector de origen usando el archivo de configuración que creó:
cx cdc-source.json
Nota
El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:
status
Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
Configura el Sink Connector
En CDCShell1, configure un conector sink para copiar datos del tema Kafka CDCTutorial.Source al namespace MongoDB CDCTutorial.Destination.
Cree un archivo de configuración llamado cdc-sink.json usando el siguiente comando:
nano cdc-sink.json
Copie la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
Ejecute el siguiente comando en la shell para iniciar el sink connector utilizando el archivo de configuración que creó:
cx cdc-sink.json
Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:
status
Si tu conector sink se inició correctamente, deberías ver la siguiente salida:
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
Monitorear el tema de Kafka
En CDCShell1, monitoree el tema de Kafka para detectar eventos entrantes. Ejecute el siguiente comando para iniciar la kafkacat aplicación que genera los datos publicados en el tema:
kc CDCTutorial.Source
Nota
El comando kc es un script personalizado incluido en el entorno de desarrollo del tutorial que llama a la aplicación kafkacat con opciones para conectarse a Kafka y formatear la salida del tema especificado.
Una vez iniciado, debería ver el siguiente resultado que indica que actualmente no hay datos para leer:
% Reached end of topic CDCTutorial.Source [0] at offset 0
Guarda datos en la fuente y observa el flujo de datos.
En CDCShell2, conéctese a MongoDB utilizando mongosh, la shell de MongoDB ejecutando el siguiente comando:
mongosh "mongodb://mongo1"
Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:
rs0 [direct: primary] test>
En el indicador, introduce los siguientes comandos para insertar un nuevo documento en el espacio de nombres de MongoDB CDCTutorial.Source:
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
Una vez que MongoDB completa el comando de inserción, deberías recibir una confirmación que se asemeje al siguiente texto:
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
El conector de origen toma el cambio y lo publica en el tema de Kafka. Deberías ver el siguiente mensaje de tema en tu ventana CDCShell1:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
El conector sink recoge el mensaje de Kafka y hunde los datos en MongoDB. Puedes recuperar el documento del namespace CDCTutorial.Destination en MongoDB ejecutando el siguiente comando en la shell de MongoDB que iniciaste en CDCShell2:
db.Destination.find()
Debería ver que el siguiente documento se devuelve como resultado:
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(Opcional) Generar cambios adicionales
Prueba remover documentos del namespace CDCTutorial.Source ejecutando el siguiente comando desde la shell de MongoDB:
db.Source.deleteMany({})
Debería ver el siguiente mensaje de tema en su ventana CDCShell1:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
Ejecuta el siguiente comando para recuperar el número actual de documentos en la colección:
db.Destination.count()
Esto devuelve la siguiente salida, indicando que la colección está vacía:
0
Ejecute el siguiente comando para salir del shell de MongoDB:
exit
Resumen
En este tutorial, se configura un conector de origen para capturar cambios en una colección de MongoDB y enviarlos a Apache Kafka. También configuraste un sink connector con un MongoDB CDC Handler para mover los datos desde Apache Kafka a una colección de MongoDB.
Obtén más información
Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial: