Docs Menu
Docs Home
/ /

Replicar datos con un controlador de captura de datos modificados

Siga este tutorial para aprender a usar un controlador de captura de datos de cambios (CDC) para replicar datos con el conector MongoDB Kafka. Un controlador CDC es una aplicación que traduce eventos CDC en operaciones de escritura de MongoDB. Úselo cuando necesite reproducir los cambios de un almacén de datos en otro.

En este tutorial, configurarás y ejecutarás conectores de origen y sumidero de MongoDB Kafka para que dos colecciones de MongoDB contengan los mismos documentos mediante CDC. El conector de origen escribe datos del flujo de cambios desde la colección original en un tema de Kafka, y el conector de sumidero toma los datos del tema de Kafka y los escribe en la colección de MongoDB de destino.

Si desea obtener más información sobre cómo trabajan los manipuladores de CDC, consulte la Guía demanejadores de captura de datos modificados.

1

Complete los pasos del Tutorial de configuración de Kafka Connector para iniciar el entorno de Confluent Kafka Connect y MongoDB.

2

Inicie dos shells interactivos en el contenedor Docker en ventanas separadas. En el tutorial, puedes utilizar los shells para ejecutar y observar diferentes tareas.

Ejecute el siguiente comando desde una terminal para iniciar un shell interactivo.

docker exec -it mongo1 /bin/bash

A lo largo de este tutorial nos referiremos a este shell interactivo como CDCShell.1

Ejecute el siguiente comando en una segunda terminal para iniciar un shell interactivo:

docker exec -it mongo1 /bin/bash

A lo largo de este tutorial nos referiremos a este shell interactivo como CDCShell.2

Organiza las dos ventanas en tu pantalla para mantenerlas visibles y ver actualizaciones en tiempo real.

Usa CDCShell1 para configurar tus conectores y supervisar tu tema de Kafka. Usa CDCShell2 para realizar operaciones de guardar en MongoDB.

3

En CDCShell,1 configure un conector de origen para leer desde el CDCTutorial.Source Espacio de nombres MongoDB y escribir en el tema de Kafka CDCTutorial.Source.

Cree un archivo de configuración llamado cdc-source.json usando el siguiente comando:

nano cdc-source.json

Pegue la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

Ejecute el siguiente comando en CDCShell1 para iniciar el conector de origen utilizando el archivo de configuración que creó:

cx cdc-source.json

Nota

El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

En CDCShell,1 configure un conector de receptor para copiar datos del CDCTutorial.Source tema de Kafka al espacio de CDCTutorial.Destination nombres de MongoDB.

Cree un archivo de configuración llamado cdc-sink.json usando el siguiente comando:

nano cdc-sink.json

Pegue la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

Ejecute el siguiente comando en el shell para iniciar el conector del receptor usando el archivo de configuración que creó:

cx cdc-sink.json

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si el conector del sumidero se inició correctamente, debería ver el siguiente resultado:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

En CDCShell,1 monitoree el tema de Kafka para detectar eventos entrantes. Ejecute el siguiente comando para iniciar la kafkacat aplicación que genera los datos publicados en el tema:

kc CDCTutorial.Source

Nota

El comando kc es un script personalizado incluido en el entorno de desarrollo del tutorial que llama a la aplicación kafkacat con opciones para conectarse a Kafka y formatear la salida del tema especificado.

Una vez iniciado, debería ver el siguiente resultado que indica que actualmente no hay datos para leer:

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

En CDCShell,2 conéctese a MongoDB mongosh usando, el shell de MongoDB, ejecutando el siguiente comando:

mongosh "mongodb://mongo1"

Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:

rs0 [direct: primary] test>

En el indicador, introduce los siguientes comandos para insertar un nuevo documento en el espacio de nombres de MongoDB CDCTutorial.Source:

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

Una vez que MongoDB completa el comando de inserción, deberías recibir una confirmación que se asemeje al siguiente texto:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

El conector de origen detecta el cambio y lo publica en el tema de Kafka. Debería ver el siguiente mensaje de tema en la ventana de CDCShell:1

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

El conector de recepción recibe el mensaje de Kafka y envía los datos a MongoDB. Puede recuperar el documento del espacio de nombres CDCTutorial.Destination en MongoDB ejecutando el siguiente comando en el shell de MongoDB que inició en CDCShell:2

db.Destination.find()

Debería ver el siguiente documento devuelto como resultado:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

Intente eliminar documentos del espacio de nombres CDCTutorial.Source ejecutando el siguiente comando desde el shell de MongoDB:

db.Source.deleteMany({})

Debería ver el siguiente mensaje de tema en su ventana CDCShell:1

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

Ejecute el siguiente comando para recuperar el número actual de documentos en la colección:

db.Destination.count()

Esto devuelve la siguiente salida, indicando que la colección está vacía:

0

Ejecute el siguiente comando para salir del shell de MongoDB:

exit

En este tutorial, configurará un conector de origen para capturar cambios en una colección de MongoDB y enviarlos a Apache Kafka. También configurará un conector de destino con un controlador CDC de MongoDB para transferir los datos de Apache Kafka a una colección de MongoDB.

Lea los siguientes recursos para obtener más información sobre los conceptos mencionados en este tutorial:

Volver

Introducción al conector de sumidero de Kafka de MongoDB

En esta página