Nota
Atlas Stream Processing proporciona herramientas nativas de MongoDB para procesar continuamente datos de transmisión, validar esquemas y materializar vistas en colecciones de bases de datos Atlas o temas de Apache Kafka.
Para obtener más información sobre Atlas Stream Processing, consulte Atlas Stream Processing.
Overview
Esta guía le muestra cómo configurar el conector MongoDB Kafka para enviar datos entre MongoDB y Apache Kafka.
Después de completar esta guía, debería comprender cómo usar la API REST de Kafka Connect para configurar los conectores Kafka de MongoDB para leer datos de MongoDB y escribirlos en un tema de Kafka, y para leer datos de un tema de Kafka y escribirlos en MongoDB.
Para completar los pasos de esta guía, debe descargar y trabajar en un entorno sandbox, un entorno de desarrollo en contenedores que incluye los servicios necesarios para crear una canalización de datos de muestra.
Lea las siguientes secciones para configurar su entorno aislado y su canalización de datos de muestra.
Nota
Después de completar esta guía, puedes remover el entorno siguiendo las instrucciones en la sección Remover el entorno de pruebas.
Instalar los paquetes necesarios
Descargue e instale los siguientes paquetes:
Tip
Lea la documentación de Docker
Esta guía utiliza la siguiente terminología específica de Docker:
Obtenga más información sobre Docker en la Guía de introducción oficial de Docker.
El entorno de pruebas utiliza Docker para mayor comodidad y consistencia. Para obtener más información sobre las opciones de implementación de Apache Kafka, consulte los siguientes recursos:
Descargar el Sandbox
Creamos un entorno sandbox que incluye los servicios que necesita en este tutorial para construir su canalización de datos de muestra.
Para descargar el entorno de pruebas, clone el repositorio del tutorial en su entorno de desarrollo. Luego, navegue al directorio correspondiente al tutorial de inicio rápido. Si usa bash o un shell similar, use los siguientes comandos:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
Iniciar el Sandbox
El sandbox inicia los siguientes servicios en contenedores Docker:
MongoDB, configurado como un conjunto de réplicas
Apache Kafka
Kafka Connect con el conector Kafka de MongoDB instalado
Apache Zookeeper que administra la configuración de Apache Kafka
Para iniciar el sandbox, ejecute el siguiente comando desde el directorio del tutorial:
docker compose -p mongo-kafka up -d --force-recreate
Cuando inicia el sandbox, Docker descarga todas las imágenes que necesita para ejecutarse.
Nota
¿Cuanto tiempo tarda la descarga?
En total, las imágenes de Docker para este tutorial requieren aproximadamente 2.4 GB de espacio. La siguiente lista muestra el tiempo de descarga de las imágenes con diferentes velocidades de internet:
40 megabits por segundo: 8 minutos
20 megabits por segundo: 16 minutos
10 megabits por segundo: 32 minutos
Después de que Docker descargue y compile las imágenes, debería ver la siguiente salida en su entorno de desarrollo:
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
Nota
Asignaciones de puertos
El entorno sandbox asigna los siguientes servicios a los puertos de su máquina host:
El servidor sandbox MongoDB se asigna al puerto
35001en su máquina hostEl servidor sandbox Kafka Connect JMX está asignado al puerto
35000de tu máquina host
Estos puertos deben estar libres para iniciar el sandbox.
Agregar conectores
Para completar la canalización de datos de ejemplo, debe agregar conectores a Kafka Connect para transferir datos entre Kafka Connect y MongoDB. Agregue un conector de origen para transferir datos de MongoDB a Apache Kafka. Agregue un conector de destino para transferir datos de Apache Kafka a MongoDB.
Para agregar conectores en el entorno sandbox, primero inicie un shell bash interactivo en su contenedor Docker usando el siguiente comando:
docker exec -it mongo1 /bin/bash
Después de que se inicie su sesión de shell, debería ver el siguiente mensaje:
MongoDB Kafka Connector Sandbox $
Agregar un conector de origen
Usa la shell en tu contenedor Docker para agregar un conector de origen usando la API REST de Kafka Connect.
La siguiente solicitud de API agrega un conector de origen configurado con las siguientes propiedades:
La clase que Kafka Connect utiliza para crear el conector
La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB desde el que el conector lee los datos
Una canalización de agregación que agrega un campo
travelcon el valor"MongoDB Kafka Connector"a los documentos insertados que el conector lee desde MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
Nota
¿Por qué veo el mensaje 'Error al conectar'?
La API REST de Kafka Connect tarda hasta tres minutos en iniciarse. Si recibe el siguiente error, espere tres minutos y vuelva a ejecutar el comando anterior:
... curl: (7) Failed to connect to connect port 8083: Connection refused
Para confirmar que agregó el conector de origen, ejecute el siguiente comando:
curl -X GET http://connect:8083/connectors
El comando anterior debería mostrar los nombres de los conectores en ejecución:
["mongo-source"]
Para obtener más información sobre las propiedades del conector de origen, consulte la página en Propiedades de configuración del conector de origen.
Para obtener más información sobre las canalizaciones de agregación, consulte la página del manual de MongoDB sobre canalizaciones de agregación.
Agregar un conector de lavabo
Utilice el shell en su contenedor Docker para agregar un conector de receptor mediante la API REST de Kafka Connect.
La siguiente solicitud de API agrega un conector de receptor configurado con las siguientes propiedades:
La clase que Kafka Connect utiliza para crear el conector
La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB en el que el conector escribe datos
El tema de Apache Kafka desde el que el conector lee los datos
Un controlador de captura de datos modificados para documentos de eventos de cambio de MongoDB
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
Para confirmar que agregó tanto el conector de origen como el de receptor, ejecute el siguiente comando:
curl -X GET http://connect:8083/connectors
El comando anterior debería mostrar los nombres de los conectores en ejecución:
["mongo-source", "mongo-sink"]
Para obtener más información sobre las propiedades del conector de sumidero, consulte la página Propiedades de configuración del conector de sumidero.
Para obtener más información sobre los eventos de captura de datos de cambio, consulte la guía Controladores de captura de datos de cambio.
Enviar el contenido de un documento a través de sus conectores
Para enviar el contenido de un documento a través de sus conectores, inserte un documento en la colección MongoDB desde la cual su conector de origen lee los datos.
Para insertar un nuevo documento en su colección, ingrese al shell de MongoDB desde el shell en su contenedor Docker usando el siguiente comando:
mongosh mongodb://mongo1:27017/?replicaSet=rs0
Después de ejecutar el comando anterior, debería ver el siguiente mensaje:
rs0 [primary] test>
Desde el shell de MongoDB, inserte un documento en la colección sampleData de la base de datos quickstart utilizando los siguientes comandos:
use quickstart db.sampleData.insertOne({"hello":"world"})
Después de insertar un documento en la colección sampleData, confirme que sus conectores procesaron el cambio. Compruebe el contenido de la colección topicData con el siguiente comando:
db.topicData.find()
Debería ver un resultado similar al siguiente:
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
Salga del shell de MongoDB con el siguiente comando:
exit
Quitar la zona de pruebas
Para conservar recursos en su entorno de desarrollo, elimine la zona protegida.
Antes de eliminar el entorno limitado, salga de la sesión de shell en su contenedor Docker ejecutando el siguiente comando:
exit
Puede optar por eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, deberá descargarlos de nuevo para reiniciar su entorno de pruebas, que tiene un tamaño aproximado de 2.4 GB. Si elimina solo los contenedores, podrá reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de muestra.
Seleccione la pestaña que corresponde a la tarea de eliminación que desea ejecutar.
Ejecute el siguiente comando de shell para eliminar los contenedores y las imágenes de Docker del entorno aislado:
docker-compose -p mongo-kafka down --rmi all
Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno aislado:
docker-compose -p mongo-kafka down
Próximos pasos
Para aprender cómo instalar el conector MongoDB Kafka, consulte la guía Instalar el conector MongoDB Kafka.
Para obtener más información sobre cómo procesar y mover datos de Apache Kafka a MongoDB, consulte la guía del conector Sink.
Para obtener más información sobre cómo procesar y transferir datos de MongoDB a Apache Kafka, consulta la guía Conector de origen.