Docs Menu
Docs Home
/ /

Guía de inicio rápido del conector Kafka

Nota

Atlas Stream Processing proporciona herramientas nativas de MongoDB para procesar continuamente datos de transmisión, validar esquemas y materializar vistas en colecciones de bases de datos Atlas o temas de Apache Kafka.

Para obtener más información sobre Atlas Stream Processing, consulte Atlas Stream Processing.

Esta guía le muestra cómo configurar el conector MongoDB Kafka para enviar datos entre MongoDB y Apache Kafka.

Después de completar esta guía, debería comprender cómo usar la API REST de Kafka Connect para configurar los conectores Kafka de MongoDB para leer datos de MongoDB y escribirlos en un tema de Kafka, y para leer datos de un tema de Kafka y escribirlos en MongoDB.

Para completar los pasos de esta guía, debe descargar y trabajar en un entorno sandbox, un entorno de desarrollo en contenedores que incluye los servicios necesarios para crear una canalización de datos de muestra.

Lea las siguientes secciones para configurar su entorno aislado y su canalización de datos de muestra.

Nota

Después de completar esta guía, puedes remover el entorno siguiendo las instrucciones en la sección Remover el entorno de pruebas.

Descargue e instale los siguientes paquetes:

Tip

Lea la documentación de Docker

Esta guía utiliza la siguiente terminología específica de Docker:

Obtenga más información sobre Docker en la Guía de introducción oficial de Docker.

El entorno de pruebas utiliza Docker para mayor comodidad y consistencia. Para obtener más información sobre las opciones de implementación de Apache Kafka, consulte los siguientes recursos:

Creamos un entorno sandbox que incluye los servicios que necesita en este tutorial para construir su canalización de datos de muestra.

Para descargar el entorno de pruebas, clone el repositorio del tutorial en su entorno de desarrollo. Luego, navegue al directorio correspondiente al tutorial de inicio rápido. Si usa bash o un shell similar, use los siguientes comandos:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

El sandbox inicia los siguientes servicios en contenedores Docker:

  • MongoDB, configurado como un conjunto de réplicas

  • Apache Kafka

  • Kafka Connect con el conector Kafka de MongoDB instalado

  • Apache Zookeeper que administra la configuración de Apache Kafka

Para iniciar el sandbox, ejecute el siguiente comando desde el directorio del tutorial:

docker compose -p mongo-kafka up -d --force-recreate

Cuando inicia el sandbox, Docker descarga todas las imágenes que necesita para ejecutarse.

Nota

¿Cuanto tiempo tarda la descarga?

En total, las imágenes de Docker para este tutorial requieren aproximadamente 2.4 GB de espacio. La siguiente lista muestra el tiempo de descarga de las imágenes con diferentes velocidades de internet:

  • 40 megabits por segundo: 8 minutos

  • 20 megabits por segundo: 16 minutos

  • 10 megabits por segundo: 32 minutos

Después de que Docker descargue y compile las imágenes, debería ver la siguiente salida en su entorno de desarrollo:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

Nota

Asignaciones de puertos

El entorno sandbox asigna los siguientes servicios a los puertos de su máquina host:

  • El servidor sandbox MongoDB se asigna al puerto 35001 en su máquina host

  • El servidor sandbox Kafka Connect JMX está asignado al puerto 35000 de tu máquina host

Estos puertos deben estar libres para iniciar el sandbox.

Para completar la canalización de datos de ejemplo, debe agregar conectores a Kafka Connect para transferir datos entre Kafka Connect y MongoDB. Agregue un conector de origen para transferir datos de MongoDB a Apache Kafka. Agregue un conector de destino para transferir datos de Apache Kafka a MongoDB.

Para agregar conectores en el entorno sandbox, primero inicie un shell bash interactivo en su contenedor Docker usando el siguiente comando:

docker exec -it mongo1 /bin/bash

Después de que se inicie su sesión de shell, debería ver el siguiente mensaje:

MongoDB Kafka Connector Sandbox $

Usa la shell en tu contenedor Docker para agregar un conector de origen usando la API REST de Kafka Connect.

La siguiente solicitud de API agrega un conector de origen configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB desde el que el conector lee los datos

  • Una canalización de agregación que agrega un campo travel con el valor "MongoDB Kafka Connector" a los documentos insertados que el conector lee desde MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

Nota

¿Por qué veo el mensaje 'Error al conectar'?

La API REST de Kafka Connect tarda hasta tres minutos en iniciarse. Si recibe el siguiente error, espere tres minutos y vuelva a ejecutar el comando anterior:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

Para confirmar que agregó el conector de origen, ejecute el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debería mostrar los nombres de los conectores en ejecución:

["mongo-source"]

Para obtener más información sobre las propiedades del conector de origen, consulte la página en Propiedades de configuración del conector de origen.

Para obtener más información sobre las canalizaciones de agregación, consulte la página del manual de MongoDB sobre canalizaciones de agregación.

Utilice el shell en su contenedor Docker para agregar un conector de receptor mediante la API REST de Kafka Connect.

La siguiente solicitud de API agrega un conector de receptor configurado con las siguientes propiedades:

  • La clase que Kafka Connect utiliza para crear el conector

  • La URI de conexión, la base de datos y la colección del conjunto de réplicas de MongoDB en el que el conector escribe datos

  • El tema de Apache Kafka desde el que el conector lee los datos

  • Un controlador de captura de datos modificados para documentos de eventos de cambio de MongoDB

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

Para confirmar que agregó tanto el conector de origen como el de receptor, ejecute el siguiente comando:

curl -X GET http://connect:8083/connectors

El comando anterior debería mostrar los nombres de los conectores en ejecución:

["mongo-source", "mongo-sink"]

Para obtener más información sobre las propiedades del conector de sumidero, consulte la página Propiedades de configuración del conector de sumidero.

Para obtener más información sobre los eventos de captura de datos de cambio, consulte la guía Controladores de captura de datos de cambio.

Para enviar el contenido de un documento a través de sus conectores, inserte un documento en la colección MongoDB desde la cual su conector de origen lee los datos.

Para insertar un nuevo documento en su colección, ingrese al shell de MongoDB desde el shell en su contenedor Docker usando el siguiente comando:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

Después de ejecutar el comando anterior, debería ver el siguiente mensaje:

rs0 [primary] test>

Desde el shell de MongoDB, inserte un documento en la colección sampleData de la base de datos quickstart utilizando los siguientes comandos:

use quickstart
db.sampleData.insertOne({"hello":"world"})

Después de insertar un documento en la colección sampleData, confirme que sus conectores procesaron el cambio. Compruebe el contenido de la colección topicData con el siguiente comando:

db.topicData.find()

Debería ver un resultado similar al siguiente:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

Salga del shell de MongoDB con el siguiente comando:

exit

Para conservar recursos en su entorno de desarrollo, elimine la zona protegida.

Antes de eliminar el entorno limitado, salga de la sesión de shell en su contenedor Docker ejecutando el siguiente comando:

exit

Puede optar por eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, deberá descargarlos de nuevo para reiniciar su entorno de pruebas, que tiene un tamaño aproximado de 2.4 GB. Si elimina solo los contenedores, podrá reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de muestra.

Seleccione la pestaña que corresponde a la tarea de eliminación que desea ejecutar.

Ejecute el siguiente comando de shell para eliminar los contenedores y las imágenes de Docker del entorno aislado:

docker-compose -p mongo-kafka down --rmi all

Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno aislado:

docker-compose -p mongo-kafka down

Para aprender cómo instalar el conector MongoDB Kafka, consulte la guía Instalar el conector MongoDB Kafka.

Para obtener más información sobre cómo procesar y mover datos de Apache Kafka a MongoDB, consulte la guía del conector Sink.

Para obtener más información sobre cómo procesar y transferir datos de MongoDB a Apache Kafka, consulta la guía Conector de origen.

Volver

Novedades

En esta página