Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Guía rápida para Kafka Connector

Esta guía muestra cómo configurar el Kafka Connector de MongoDB para enviar datos entre MongoDB y Apache Kafka.

Después de completar esta guía, deberías entender cómo usar la REST API de Kafka Connect para configurar conectores de Kafka de MongoDB para leer datos de MongoDB y escribirlos en un tema de Kafka, así como para leer datos de un tema de Kafka y escribirlos en MongoDB.

Para completar los pasos de esta guía, debes descargar y trabajar en un sandbox, un entorno de desarrollo de contenedores que incluye los servicios necesarios para compilar una muestra de pipeline de datos.

Lee las siguientes secciones para configurar tu área de pruebas y la pipeline de datos de muestra.

Nota

Después de completar esta guía, puedes remover el entorno siguiendo las instrucciones en la Remover el Sandbox sección.

Descarga e instala los siguientes paquetes:

Tip

Lee la documentación de Docker

Esta guía utiliza la terminología específica de Docker que se describe a continuación:

Aprende más sobre Docker en la Guía de Inicio Oficial de Docker.

El entorno sandbox utiliza Docker por conveniencia y coherencia. Para más información sobre las opciones de implementación de Apache Kafka, consulte los siguientes recursos:

Creamos un entorno aislado que incluye los servicios que necesitas en este tutorial para construir tu pipeline de datos de muestra.

Para descargar el sandbox, clona el repositorio del tutorial a tu entorno de desarrollo. Luego navega al directorio que corresponde al tutorial de introducción rápida. Si utilizas bash o un shell similar, usa los siguientes comandos:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

El sandbox inicia los siguientes servicios en contenedores Docker:

  • MongoDB, configurado como un set de réplicas

  • Apache Kafka

  • Kafka Connect con el Kafka Connector de MongoDB instalado

  • Apache Zookeeper, que gestiona la configuración de Apache Kafka

Para iniciar el entorno de pruebas, ejecuta el siguiente comando desde el directorio del tutorial:

docker compose -p mongo-kafka up -d --force-recreate

Cuando inicies el sandbox, Docker descargará todas las imágenes que necesite para ejecutarse.

Nota

¿Cuánto tiempo tarda la descarga?

En total, las imágenes de Docker para este tutorial requieren alrededor de 2,4 GB de espacio. La siguiente lista muestra cuánto tiempo se tarda en descargar las imágenes con diferentes velocidades de Internet:

  • 40 megabits por segundo: 8 minutos

  • 20 megabits por segundo: 16 minutos

  • 10 megabits por segundo: 32 minutos

Después de que Docker descargue y compile las imágenes, debería ver la siguiente salida en su entorno de desarrollo:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

Nota

Mapeos de puertos

El sandbox asigna los siguientes servicios a puertos en tu host:

  • El servidor MongoDB sandbox se asigna al puerto 35001 en tu máquina host

  • El servidor sandbox Kafka Connect JMX está asignado al puerto 35000 de tu máquina host

Estos puertos deben estar libres para iniciar el sandbox.

Para completar el pipeline de datos de la muestra, debe agregar conectores a Kafka Connect para transferir datos entre Kafka Connect y MongoDB. Agregue un conector fuente para transferir datos desde MongoDB a Apache Kafka. Agrega un conector de sink para transferir datos de Apache Kafka a MongoDB.

Para agregar conectores en el entorno sandbox, primero inicia un shell bash interactivo en tu contenedor Docker usando el siguiente comando:

docker exec -it mongo1 /bin/bash

Después de que se inicie su sesión de shell, debería ver el siguiente mensaje:

MongoDB Kafka Connector Sandbox $

Usa la shell en tu contenedor Docker para agregar un conector de origen usando la API REST de Kafka Connect.

La siguiente solicitud API añade un conector de origen configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • El URI de conexión, la base de datos y la colección del set de réplicas de MongoDB desde las que el conector lee datos

  • Una pipeline de agregación que agrega un campo travel con el valor "MongoDB Kafka Connector" a los documentos insertados que el conector lee desde MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

Nota

¿Por qué veo el mensaje 'No se pudo conectar'?

Se tarda hasta tres minutos en iniciar la REST API de Kafka Connect. Si recibes el siguiente error, espera tres minutos y ejecuta el comando anterior nuevamente:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

Para confirmar que agregaste el conector de origen, ejecuta el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debe mostrar los nombres de los conectores en ejecución:

["mongo-source"]

Para aprender más sobre las propiedades del conector de origen, consulte la página sobre Propiedades de configuración del Connector fuente.

Para aprender más sobre pipelines de agregación, ver la página del manual de MongoDB sobre Pipelines de agregación.

Utiliza la shell en tu contenedor Docker para agregar un conector de sumidero usando la REST API de Kafka Connect.

La siguiente solicitud de API agrega un conector de sink configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • El URI de conexión, la base de datos y la colección del set de réplicas de MongoDB en el que el conector escribe los datos

  • El tema de Apache Kafka del que el conector lee datos

  • Un controlador de captura de cambios de datos para documentos de eventos de cambios en MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

Para confirmar que ha agregado tanto el conector de origen como el de destino, ejecute el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debe mostrar los nombres de los conectores en ejecución:

["mongo-source", "mongo-sink"]

Para obtener más información sobre las propiedades del Connector de destino, consulta la página sobre Propiedades de configuración del Connector de destino.

Para obtener más información sobre los eventos de captura de datos de cambios, consulta la guía Manejadores de captura de datos de cambios.

Para enviar el contenido de un documento a través de tus conectores, inserta un documento en la colección de MongoDB desde la que tu conector de origen lee datos.

Para insertar un nuevo documento en tu colección, puedes ingresar al shell de MongoDB desde el shell de tu contenedor Docker usando el siguiente comando:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

Después de ejecutar el comando anterior, deberías ver el siguiente prompt:

rs0 [primary] test>

Desde la shell de MongoDB, inserte un documento en la colección sampleData de la base de datos quickstart utilizando los siguientes comandos:

use quickstart
db.sampleData.insertOne({"hello":"world"})

Después de insertar un documento en la colección sampleData, confirme que sus conectores hayan procesado el cambio. Verifique el contenido de la colección topicData usando el siguiente comando:

db.topicData.find()

Debes ver un resultado que se asemeje al siguiente:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

Sal de la shell de MongoDB con el siguiente comando:

exit

Para conservar los recursos en su entorno de desarrollo, remueva el entorno de pruebas.

Antes de remover el sandbox, sal de la sesión de shell en tu contenedor Docker ejecutando el siguiente comando:

exit

Puedes optar por remover tanto los contenedores de Docker como las imágenes, o únicamente los contenedores. Si remueves los contenedores e imágenes, tendrás que volver a descargarlos para reiniciar tu sandbox, que tiene un tamaño aproximado de 2,4 GB. Si remueves exclusivamente los contenedores, puedes reutilizar las imágenes y evitar descargar la mayoría de los archivos grandes en el pipeline de datos de muestra.

Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.

Ejecuta el siguiente comando de shell para eliminar los contenedores e imágenes de Docker del sandbox:

docker-compose -p mongo-kafka down --rmi all

Ejecuta el siguiente comando de shell para remover los contenedores de Docker, pero mantén las imágenes del entorno sandbox:

docker-compose -p mongo-kafka down

Para aprender cómo instalar el MongoDB Kafka Connector, consulta la guía Instalar el MongoDB Kafka Connector.

Para obtener más información sobre cómo procesar y mover datos de Apache Kafka a MongoDB, consulta la guía del Sink Connector.

Para obtener más información sobre cómo procesar y transferir datos de MongoDB a Apache Kafka, consulta la guía Conector de origen.

Volver

Novedades

En esta página