Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Guía de inicio rápido del conector Kafka

Esta guía le muestra cómo configurar el conector MongoDB Kafka para enviar datos entre MongoDB y Apache Kafka.

Después de completar esta guía, debería comprender cómo usar la API REST de Kafka Connect para configurar los conectores Kafka de MongoDB para leer datos de MongoDB y escribirlos en un tema de Kafka, y para leer datos de un tema de Kafka y escribirlos en MongoDB.

Para completar los pasos de esta guía, debes descargar y trabajar en un sandbox, un entorno de desarrollo de contenedores que incluye los servicios necesarios para compilar una muestra de pipeline de datos.

Lea las siguientes secciones para configurar su entorno aislado y su canalización de datos de muestra.

Nota

Después de completar esta guía, puedes remover el entorno siguiendo las instrucciones en la Eliminar la sección Sandbox.

Descarga e instala los siguientes paquetes:

Tip

Lea la documentación de Docker

Esta guía utiliza la terminología específica de Docker que se describe a continuación:

Obtenga más información sobre Docker en la Guía de introducción oficial de Docker.

El entorno sandbox utiliza Docker por conveniencia y coherencia. Para más información sobre las opciones de implementación de Apache Kafka, consulte los siguientes recursos:

Creamos un entorno sandbox que incluye los servicios que necesita en este tutorial para construir su canalización de datos de muestra.

Para descargar el sandbox, clona el repositorio del tutorial a tu entorno de desarrollo. Luego navega al directorio que corresponde al tutorial de introducción rápida. Si utilizas bash o un shell similar, usa los siguientes comandos:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

El sandbox inicia los siguientes servicios en contenedores Docker:

  • MongoDB, configurado como un set de réplicas

  • Apache Kafka

  • Kafka Connect con el Kafka Connector de MongoDB instalado

  • Apache Zookeeper que administra la configuración de Apache Kafka

Para iniciar el sandbox, ejecute el siguiente comando desde el directorio del tutorial:

docker compose -p mongo-kafka up -d --force-recreate

Cuando inicies el sandbox, Docker descargará todas las imágenes que necesite para ejecutarse.

Nota

¿Cuanto tiempo tarda la descarga?

En total, las imágenes de Docker para este tutorial requieren aproximadamente 2,4 GB de espacio. La siguiente lista muestra el tiempo de descarga de las imágenes con diferentes velocidades de internet:

  • 40 megabits por segundo: 8 minutos

  • 20 megabits por segundo: 16 minutos

  • 10 megabits por segundo: 32 minutos

Después de que Docker descargue y compile las imágenes, debería ver la siguiente salida en su entorno de desarrollo:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

Nota

Asignaciones de puertos

El sandbox asigna los siguientes servicios a puertos en tu host:

  • El servidor MongoDB sandbox se asigna al puerto 35001 en tu máquina host

  • El servidor sandbox Kafka Connect JMX está asignado al puerto 35000 de tu máquina host

Estos puertos deben estar libres para iniciar el sandbox.

Para completar la canalización de datos de ejemplo, debe agregar conectores a Kafka Connect para transferir datos entre Kafka Connect y MongoDB. Agregue un conector de origen para transferir datos de MongoDB a Apache Kafka. Agregue un conector de destino para transferir datos de Apache Kafka a MongoDB.

Para agregar conectores en el entorno sandbox, primero inicia un shell bash interactivo en tu contenedor Docker usando el siguiente comando:

docker exec -it mongo1 /bin/bash

Después de que se inicie su sesión de shell, debería ver el siguiente mensaje:

MongoDB Kafka Connector Sandbox $

Usa la shell en tu contenedor Docker para agregar un conector de origen usando la API REST de Kafka Connect.

La siguiente solicitud API añade un conector de origen configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • El URI de conexión, la base de datos y la colección del set de réplicas de MongoDB desde las que el conector lee datos

  • Una pipeline de agregación que agrega un campo travel con el valor "MongoDB Kafka Connector" a los documentos insertados que el conector lee desde MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

Nota

¿Por qué veo el mensaje 'No se pudo conectar'?

Se tarda hasta tres minutos en iniciar la REST API de Kafka Connect. Si recibes el siguiente error, espera tres minutos y ejecuta el comando anterior nuevamente:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

Para confirmar que agregaste el conector de origen, ejecuta el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debe mostrar los nombres de los conectores en ejecución:

["mongo-source"]

Para aprender más sobre las propiedades del conector de origen, consulte la página sobre Propiedades de configuración del Connector fuente.

Para aprender más sobre pipelines de agregación, ver la página del manual de MongoDB sobre Pipelines de agregación.

Utiliza la shell en tu contenedor Docker para agregar un conector de sumidero usando la REST API de Kafka Connect.

La siguiente solicitud de API agrega un conector de receptor configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB en el que el conector escribe datos

  • El tema de Apache Kafka del que el conector lee datos

  • Un controlador de captura de datos modificados para documentos de eventos de cambio de MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

Para confirmar que ha agregado tanto el conector de origen como el de destino, ejecute el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debe mostrar los nombres de los conectores en ejecución:

["mongo-source", "mongo-sink"]

Para obtener más información sobre las propiedades del Connector de destino, consulta la página sobre Propiedades de configuración del Connector de destino.

Para obtener más información sobre los eventos de captura de datos de cambios, consulta la guía Manejadores de captura de datos de cambios.

Para enviar el contenido de un documento a través de tus conectores, inserta un documento en la colección de MongoDB desde la que tu conector de origen lee datos.

Para insertar un nuevo documento en su colección, ingrese al shell de MongoDB desde el shell en su contenedor Docker usando el siguiente comando:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

Después de ejecutar el comando anterior, deberías ver el siguiente prompt:

rs0 [primary] test>

Desde la shell de MongoDB, inserte un documento en la colección sampleData de la base de datos quickstart utilizando los siguientes comandos:

use quickstart
db.sampleData.insertOne({"hello":"world"})

Después de insertar un documento en la colección sampleData, confirme que sus conectores procesaron el cambio. Compruebe el contenido de la colección topicData con el siguiente comando:

db.topicData.find()

Debes ver un resultado que se asemeje al siguiente:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

Salga del shell de MongoDB con el siguiente comando:

exit

Para conservar los recursos en su entorno de desarrollo, remueva el entorno de pruebas.

Antes de eliminar el entorno limitado, salga de la sesión de shell en su contenedor Docker ejecutando el siguiente comando:

exit

Puedes optar por remover tanto los contenedores de Docker como las imágenes, o únicamente los contenedores. Si remueves los contenedores e imágenes, tendrás que volver a descargarlos para reiniciar tu sandbox, que tiene un tamaño aproximado de 2,4 GB. Si remueves exclusivamente los contenedores, puedes reutilizar las imágenes y evitar descargar la mayoría de los archivos grandes en el pipeline de datos de muestra.

Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.

Ejecuta el siguiente comando de shell para eliminar los contenedores e imágenes de Docker del sandbox:

docker-compose -p mongo-kafka down --rmi all

Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno aislado:

docker-compose -p mongo-kafka down

Para aprender cómo instalar el MongoDB Kafka Connector, consulta la guía Instalar el MongoDB Kafka Connector.

Para obtener más información sobre cómo procesar y mover datos de Apache Kafka a MongoDB, consulte la guía del conector Sink.

Para obtener más información sobre cómo procesar y transferir datos de MongoDB a Apache Kafka, consulta la guía Conector de origen.

Volver

Novedades

En esta página