Siga este tutorial para aprender a configurar un conector de origen MongoDB Kafka para leer datos de un flujo de cambios y publicarlos en un tema de Apache Kafka.
Introducción al conector de origen de MongoDB Kafka
Completar la configuración del tutorial
Complete los pasos en el Tutorial del conector de Kafka Configuración para iniciar el entorno de Confluent Kafka Connect y MongoDB.
Configurar el conector de origen
Cree una sesión de shell interactiva en el contenedor Docker del tutorial descargado para la configuración del tutorial utilizando el siguiente comando:
docker exec -it mongo1 /bin/bash
Cree un archivo de configuración de origen llamado simplesource.json con el siguiente comando:
nano simplesource.json
Pegue la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
Ejecute el siguiente comando en el shell para iniciar el conector de origen utilizando el archivo de configuración que creó:
cx simplesource.json
Nota
El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:
status
Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
Crear eventos de cambio
En el mismo shell, conéctese a MongoDB usando mongosh, el shell de MongoDB ejecutando el siguiente comando:
mongosh "mongodb://mongo1"
Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:
rs0 [direct: primary] test>
En el indicador, escriba los siguientes comandos para insertar un nuevo documento:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
Una vez que MongoDB completa el comando de inserción, deberías recibir una confirmación que se asemeje al siguiente texto:
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
Salga del shell de MongoDB ingresando el comando exit.
Verifique el estado de su entorno de Kafka utilizando el siguiente comando:
status
En la salida del comando anterior, deberías ver el nuevo tema que el conector de origen creó después de recibir el evento de cambio:
... "topic": "Tutorial1.orders", ...
Confirme el contenido de los datos del nuevo tema de Kafka ejecutando el siguiente comando:
kc Tutorial1.orders
Nota
El comando kc es un script auxiliar que genera el contenido de un tema de Kafka.
Debería ver los siguientes datos del tema de Kafka, organizados por secciones "Clave" y "Valor" cuando ejecute el comando anterior:
Desde la sección "Valor" de la salida, puede encontrar la parte de payload que incluye los datos fullDocument como se resalta en el siguiente documento JSON formateado:
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
Reconfigurar el flujo de cambios
Puede omitir los metadatos de los eventos creados por el flujo de cambios configurándolo para que solo devuelva el campo fullDocument.
Detenga el conector utilizando el siguiente comando:
del mongo-simple-source
Nota
El comando del es un script auxiliar que llama a la API REST de Kafka Connect para detener el conector y es equivalente al siguiente comando:
curl -X DELETE connect:8083/connectors/<parameter>
Edite el archivo de configuración de origen llamado simplesource.json con el siguiente comando:
nano simplesource.json
Elimine la configuración existente, agregue la siguiente configuración y guarde el archivo:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
Ejecute el siguiente comando en el shell para iniciar el conector de origen usando el archivo de configuración que actualizó:
cx simplesource.json
Conéctese a MongoDB usando mongosh con el siguiente comando:
mongosh "mongodb://mongo1"
En el indicador, escriba los siguientes comandos para insertar un nuevo documento:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
Salga de mongosh ejecutando el siguiente comando:
exit
Confirme el contenido de los datos del nuevo tema de Kafka ejecutando el siguiente comando:
kc Tutorial1.orders
El campo payload del documento "Valor" debe contener únicamente los siguientes datos del documento:
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(Opcional) Detener los contenedores Docker
Después de completar este tutorial, libere recursos en su equipo deteniendo o eliminando recursos de Docker. Puede eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, deberá descargarlos de nuevo para reiniciar su entorno de desarrollo del Conector Kafka de MongoDB, que tiene un tamaño aproximado de 2.4 GB. Si elimina solo los contenedores, puede reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de ejemplo.
Tip
Más tutoriales
Si planea completar más tutoriales del Conector Kafka de MongoDB, considere eliminar solo los contenedores. Si no planea completar más tutoriales del Conector Kafka de MongoDB, considere eliminar los contenedores y las imágenes.
Seleccione la pestaña que corresponde a la tarea de eliminación que desea ejecutar.
Ejecute el siguiente comando de shell para eliminar los contenedores y las imágenes de Docker del entorno de desarrollo:
docker-compose -p mongo-kafka down --rmi all
Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno de desarrollo:
docker-compose -p mongo-kafka down
Para reiniciar los contenedores, siga los mismos pasos necesarios para iniciarlos en la configuración del tutorial.
Resumen
En este tutorial, inició un conector de origen utilizando diferentes configuraciones para alterar los datos de eventos de flujo de cambios publicados en un tema de Kafka.
Obtén más información
Lea los siguientes recursos para obtener más información sobre los conceptos mencionados en este tutorial: