Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Replicar datos con un controlador de captura de datos modificados

Siga este tutorial para aprender a usar un controlador de captura de datos de cambios (CDC) para replicar datos con el conector MongoDB Kafka. Un controlador CDC es una aplicación que traduce eventos CDC en operaciones de escritura de MongoDB. Úselo cuando necesite reproducir los cambios de un almacén de datos en otro.

En este tutorial, configurarás y ejecutarás conectores de origen y sumidero de MongoDB Kafka para que dos colecciones de MongoDB contengan los mismos documentos mediante CDC. El conector de origen escribe datos del flujo de cambios desde la colección original en un tema de Kafka, y el conector de sumidero toma los datos del tema de Kafka y los escribe en la colección de MongoDB de destino.

Si desea obtener más información sobre cómo trabajan los manipuladores de CDC, consulte la Guía de controladores de Capture Data Capture.

1

Completa los pasos en la configurar del tutorial de Kafka Connector para iniciar el entorno de Confluent Kafka Connect y MongoDB.

2

Inicie dos shells interactivos en el contenedor Docker en ventanas separadas. En el tutorial, puedes utilizar los shells para ejecutar y observar diferentes tareas.

Ejecute el siguiente comando desde una terminal para iniciar un shell interactivo.

docker exec -it mongo1 /bin/bash

Nos referiremos a esta shell interactiva como CDC Shell 1 a lo largo de este tutorial.

Ejecute el siguiente comando en un segundo terminal para iniciar un shell interactivo:

docker exec -it mongo1 /bin/bash

A lo largo de este tutorial nos referiremos a este shell interactivo como CDCShell2.

Organiza las dos ventanas en tu pantalla para mantenerlas visibles y ver actualizaciones en tiempo real.

Usa CDCShell1 para configurar tus conectores y supervisar tu tema de Kafka. Usa CDCShell2 para realizar operaciones de guardar en MongoDB.

3

En CDCShell,1 configure un conector de origen para leer desde el CDCTutorial.Source Espacio de nombres MongoDB y escribir en el tema de Kafka CDCTutorial.Source.

Cree un archivo de configuración llamado cdc-source.json usando el siguiente comando:

nano cdc-source.json

Copie la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

Ejecute el siguiente comando en CDCShell1 para iniciar el conector de origen usando el archivo de configuración que creó:

cx cdc-source.json

Nota

El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

En CDCShell1, configure un conector sink para copiar datos del tema Kafka CDCTutorial.Source al namespace MongoDB CDCTutorial.Destination.

Cree un archivo de configuración llamado cdc-sink.json usando el siguiente comando:

nano cdc-sink.json

Copie la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

Ejecute el siguiente comando en la shell para iniciar el sink connector utilizando el archivo de configuración que creó:

cx cdc-sink.json

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si tu conector sink se inició correctamente, deberías ver la siguiente salida:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

En CDCShell1, monitoree el tema de Kafka para detectar eventos entrantes. Ejecute el siguiente comando para iniciar la kafkacat aplicación que genera los datos publicados en el tema:

kc CDCTutorial.Source

Nota

El comando kc es un script personalizado incluido en el entorno de desarrollo del tutorial que llama a la aplicación kafkacat con opciones para conectarse a Kafka y formatear la salida del tema especificado.

Una vez iniciado, debería ver el siguiente resultado que indica que actualmente no hay datos para leer:

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

En CDCShell2, conéctese a MongoDB utilizando mongosh, la shell de MongoDB ejecutando el siguiente comando:

mongosh "mongodb://mongo1"

Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:

rs0 [direct: primary] test>

En el indicador, introduce los siguientes comandos para insertar un nuevo documento en el espacio de nombres de MongoDB CDCTutorial.Source:

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

Una vez que MongoDB completa el comando de inserción, deberías recibir una confirmación que se asemeje al siguiente texto:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

El conector de origen toma el cambio y lo publica en el tema de Kafka. Deberías ver el siguiente mensaje de tema en tu ventana CDCShell1:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

El conector sink recoge el mensaje de Kafka y hunde los datos en MongoDB. Puedes recuperar el documento del namespace CDCTutorial.Destination en MongoDB ejecutando el siguiente comando en la shell de MongoDB que iniciaste en CDCShell2:

db.Destination.find()

Debería ver que el siguiente documento se devuelve como resultado:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

Prueba remover documentos del namespace CDCTutorial.Source ejecutando el siguiente comando desde la shell de MongoDB:

db.Source.deleteMany({})

Debería ver el siguiente mensaje de tema en su ventana CDCShell1:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

Ejecuta el siguiente comando para recuperar el número actual de documentos en la colección:

db.Destination.count()

Esto devuelve la siguiente salida, indicando que la colección está vacía:

0

Ejecute el siguiente comando para salir del shell de MongoDB:

exit

En este tutorial, se configura un conector de origen para capturar cambios en una colección de MongoDB y enviarlos a Apache Kafka. También configuraste un sink connector con un MongoDB CDC Handler para mover los datos desde Apache Kafka a una colección de MongoDB.

Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial:

Volver

Comenzar con el Connector de salida MongoDB Kafka

En esta página