Overview
Esta guía le muestra cómo configurar el conector MongoDB Kafka para enviar datos entre MongoDB y Apache Kafka.
Después de completar esta guía, debería comprender cómo usar la API REST de Kafka Connect para configurar los conectores Kafka de MongoDB para leer datos de MongoDB y escribirlos en un tema de Kafka, y para leer datos de un tema de Kafka y escribirlos en MongoDB.
Para completar los pasos de esta guía, debes descargar y trabajar en un sandbox, un entorno de desarrollo de contenedores que incluye los servicios necesarios para compilar una muestra de pipeline de datos.
Lea las siguientes secciones para configurar su entorno aislado y su canalización de datos de muestra.
Nota
Después de completar esta guía, puedes remover el entorno siguiendo las instrucciones en la Eliminar la sección Sandbox.
Instalar los paquetes necesarios
Descarga e instala los siguientes paquetes:
Tip
Lea la documentación de Docker
Esta guía utiliza la terminología específica de Docker que se describe a continuación:
Obtenga más información sobre Docker en la Guía de introducción oficial de Docker.
El entorno sandbox utiliza Docker por conveniencia y coherencia. Para más información sobre las opciones de implementación de Apache Kafka, consulte los siguientes recursos:
Descarga el Sandbox
Creamos un entorno sandbox que incluye los servicios que necesita en este tutorial para construir su canalización de datos de muestra.
Para descargar el sandbox, clona el repositorio del tutorial a tu entorno de desarrollo. Luego navega al directorio que corresponde al tutorial de introducción rápida. Si utilizas bash o un shell similar, usa los siguientes comandos:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
Iniciar el Sandbox
El sandbox inicia los siguientes servicios en contenedores Docker:
MongoDB, configurado como un set de réplicas
Apache Kafka
Kafka Connect con el Kafka Connector de MongoDB instalado
Apache Zookeeper que administra la configuración de Apache Kafka
Para iniciar el sandbox, ejecute el siguiente comando desde el directorio del tutorial:
docker compose -p mongo-kafka up -d --force-recreate
Cuando inicies el sandbox, Docker descargará todas las imágenes que necesite para ejecutarse.
Nota
¿Cuanto tiempo tarda la descarga?
En total, las imágenes de Docker para este tutorial requieren aproximadamente 2,4 GB de espacio. La siguiente lista muestra el tiempo de descarga de las imágenes con diferentes velocidades de internet:
40 megabits por segundo: 8 minutos
20 megabits por segundo: 16 minutos
10 megabits por segundo: 32 minutos
Después de que Docker descargue y compile las imágenes, debería ver la siguiente salida en su entorno de desarrollo:
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
Nota
Asignaciones de puertos
El sandbox asigna los siguientes servicios a puertos en tu host:
El servidor MongoDB sandbox se asigna al puerto
35001en tu máquina hostEl servidor sandbox Kafka Connect JMX está asignado al puerto
35000de tu máquina host
Estos puertos deben estar libres para iniciar el sandbox.
Agregar conectores
Para completar la canalización de datos de ejemplo, debe agregar conectores a Kafka Connect para transferir datos entre Kafka Connect y MongoDB. Agregue un conector de origen para transferir datos de MongoDB a Apache Kafka. Agregue un conector de destino para transferir datos de Apache Kafka a MongoDB.
Para agregar conectores en el entorno sandbox, primero inicia un shell bash interactivo en tu contenedor Docker usando el siguiente comando:
docker exec -it mongo1 /bin/bash
Después de que se inicie su sesión de shell, debería ver el siguiente mensaje:
MongoDB Kafka Connector Sandbox $
Agregar un Connector de origen
Usa la shell en tu contenedor Docker para agregar un conector de origen usando la API REST de Kafka Connect.
La siguiente solicitud API añade un conector de origen configurado con las siguientes propiedades:
La clase que Kafka Connect utiliza para crear el conector
El URI de conexión, la base de datos y la colección del set de réplicas de MongoDB desde las que el conector lee datos
Una pipeline de agregación que agrega un campo
travelcon el valor"MongoDB Kafka Connector"a los documentos insertados que el conector lee desde MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
Nota
¿Por qué veo el mensaje 'No se pudo conectar'?
Se tarda hasta tres minutos en iniciar la REST API de Kafka Connect. Si recibes el siguiente error, espera tres minutos y ejecuta el comando anterior nuevamente:
... curl: (7) Failed to connect to connect port 8083: Connection refused
Para confirmar que agregaste el conector de origen, ejecuta el siguiente comando:
curl -X GET http://connect:8083/connectors
El comando anterior debe mostrar los nombres de los conectores en ejecución:
["mongo-source"]
Para aprender más sobre las propiedades del conector de origen, consulte la página sobre Propiedades de configuración del Connector fuente.
Para aprender más sobre pipelines de agregación, ver la página del manual de MongoDB sobre Pipelines de agregación.
Agregar un Connector de sumidero
Utiliza la shell en tu contenedor Docker para agregar un conector de sumidero usando la REST API de Kafka Connect.
La siguiente solicitud de API agrega un conector de receptor configurado con las siguientes propiedades:
La clase que Kafka Connect utiliza para crear el conector
La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB en el que el conector escribe datos
El tema de Apache Kafka del que el conector lee datos
Un controlador de captura de datos modificados para documentos de eventos de cambio de MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
Para confirmar que ha agregado tanto el conector de origen como el de destino, ejecute el siguiente comando:
curl -X GET http://connect:8083/connectors
El comando anterior debe mostrar los nombres de los conectores en ejecución:
["mongo-source", "mongo-sink"]
Para obtener más información sobre las propiedades del Connector de destino, consulta la página sobre Propiedades de configuración del Connector de destino.
Para obtener más información sobre los eventos de captura de datos de cambios, consulta la guía Manejadores de captura de datos de cambios.
Enviar el contenido de un documento a través de sus conectores
Para enviar el contenido de un documento a través de tus conectores, inserta un documento en la colección de MongoDB desde la que tu conector de origen lee datos.
Para insertar un nuevo documento en su colección, ingrese al shell de MongoDB desde el shell en su contenedor Docker usando el siguiente comando:
mongosh mongodb://mongo1:27017/?replicaSet=rs0
Después de ejecutar el comando anterior, deberías ver el siguiente prompt:
rs0 [primary] test>
Desde la shell de MongoDB, inserte un documento en la colección sampleData de la base de datos quickstart utilizando los siguientes comandos:
use quickstart db.sampleData.insertOne({"hello":"world"})
Después de insertar un documento en la colección sampleData, confirme que sus conectores procesaron el cambio. Compruebe el contenido de la colección topicData con el siguiente comando:
db.topicData.find()
Debes ver un resultado que se asemeje al siguiente:
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
Salga del shell de MongoDB con el siguiente comando:
exit
Remover la Sandbox
Para conservar los recursos en su entorno de desarrollo, remueva el entorno de pruebas.
Antes de eliminar el entorno limitado, salga de la sesión de shell en su contenedor Docker ejecutando el siguiente comando:
exit
Puedes optar por remover tanto los contenedores de Docker como las imágenes, o únicamente los contenedores. Si remueves los contenedores e imágenes, tendrás que volver a descargarlos para reiniciar tu sandbox, que tiene un tamaño aproximado de 2,4 GB. Si remueves exclusivamente los contenedores, puedes reutilizar las imágenes y evitar descargar la mayoría de los archivos grandes en el pipeline de datos de muestra.
Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.
Ejecuta el siguiente comando de shell para eliminar los contenedores e imágenes de Docker del sandbox:
docker-compose -p mongo-kafka down --rmi all
Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno aislado:
docker-compose -p mongo-kafka down
Próximos pasos
Para aprender cómo instalar el MongoDB Kafka Connector, consulta la guía Instalar el MongoDB Kafka Connector.
Para obtener más información sobre cómo procesar y mover datos de Apache Kafka a MongoDB, consulte la guía del conector Sink.
Para obtener más información sobre cómo procesar y transferir datos de MongoDB a Apache Kafka, consulta la guía Conector de origen.