Sigue este tutorial para aprender a configurar un conector de sink de MongoDB Kafka para leer datos de un tema de Apache Kafka y guardarlos en una colección de MongoDB.
Comienza con el Conector Sink MongoDB Kafka
Completa la configuración del tutorial
Completa los pasos en el Tutorial de configuración del conector Kafka para iniciar el entorno de Confluent Kafka Connect y MongoDB.
Configura el Sink Connector
Crea una sesión de shell interactiva en el tutorial del Contenedor Docker utilizando el siguiente comando:
docker exec -it mongo1 /bin/bash
Crea un archivo de configuración de origen denominado simplesink.json con el siguiente comando:
nano simplesink.json
Copie la siguiente información de configuración en el archivo y guarde los cambios:
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
Nota
Las líneas resaltadas en las propiedades de configuración especifican convertidores que indican al conector cómo traducir los datos de Kafka.
Ejecute el siguiente comando en la shell para iniciar el sink connector utilizando el archivo de configuración que creó:
cx simplesink.json
Nota
El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
Ejecute el siguiente comando en el shell para verificar el estado de los conectores:
status
Si tu conector sink se inició correctamente, deberías ver la siguiente salida:
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
Guardar datos en un tema de Kafka
En el mismo shell, crea un script de Python para escribir datos en un tema de Kafka.
nano kafkawrite.py
Pega el siguiente código en el archivo y guarda los cambios:
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
Ejecute el script de Python:
python3 kafkawrite.py
Ver los datos en la colección de MongoDB
En el mismo shell, conecta a MongoDB usando mongosh, el shell de MongoDB, ejecutando el siguiente comando:
mongosh "mongodb://mongo1"
Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:
rs0 [direct: primary] test>
En el mensaje, escriba los siguientes comandos para recuperar todos los documentos en el espacio de nombres Tutorial2.pets de MongoDB:
use Tutorial2 db.pets.find()
Debería ver que el siguiente documento se devuelve como resultado:
{ _id: ObjectId("62659..."), name: 'roscoe' }
Salga de la shell de MongoDB ingresando el comando exit.
(Opcional) Detener los contenedores Docker
Después de completar este tutorial, libera recursos en tu computadora deteniendo o removiendo activos de Docker. Puedes elegir remover tanto los contenedores de Docker como las imágenes, o exclusivamente los contenedores. Si remueve los contenedores y las imágenes, debe volver a descargarlos para reiniciar su entorno de desarrollo de MongoDB Kafka Connector, que tiene un tamaño aproximado de 2,4 GB. Si eliminas exclusivamente los contenedores, puedes reutilizar las imágenes y evitar descargar la mayoría de los archivos grandes en la pipeline de datos de muestra.
Tip
Más tutoriales
Si planea completar más tutoriales sobre el MongoDB Kafka Connector, considere remover solo los contenedores. Si no tienes previsto completar más tutoriales de MongoDB Kafka Connector, considera remover los contenedores e imágenes.
Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.
Ejecute el siguiente comando de shell para remover los contenedores y las imágenes de Docker del entorno de desarrollo:
docker-compose -p mongo-kafka down --rmi all
Ejecuta el siguiente comando Shell para remover los contenedores Docker, pero conserva las imágenes para el entorno de desarrollo:
docker-compose -p mongo-kafka down
Para reiniciar los contenedores, sigue los mismos pasos necesarios para ponerlos en marcha en la Configuración del tutorial.
Resumen
En este tutorial, configuraste un conector de destino para guardar datos de un tema de Kafka en una colección en un clúster de MongoDB.
Obtén más información
Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial: