Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Introducción al conector de origen de MongoDB Kafka

Sigue este tutorial para aprender cómo configurar un conector fuente de MongoDB Kafka para leer datos de un flujo de cambios y publicarlos en un tema de Apache Kafka.

1

Complete los pasos en el Configurar el tutorial del conector Kafka para iniciar el entorno de Confluent Kafka Connect y MongoDB.

2

Crear una sesión de shell interactiva en el contenedor Docker descargado para el Tutorial configurar usando el siguiente comando:

docker exec -it mongo1 /bin/bash

Crea un archivo de configuración de origen denominado simplesource.json con el siguiente comando:

nano simplesource.json

Copie la siguiente información de configuración en el archivo y guarde los cambios:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "Tutorial1",
"collection": "orders"
}
}

Ejecute el siguiente comando en el shell para iniciar el conector de origen utilizando el archivo de configuración que creó:

cx simplesource.json

Nota

El comando cx es un script personalizado incluido en el entorno de desarrollo del tutorial. Este script ejecuta la siguiente solicitud equivalente a la API REST de Kafka Connect para crear un nuevo conector:

curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"

Ejecute el siguiente comando en el shell para comprobar el estado de los conectores:

status

Si tu conector de origen se inició correctamente, deberías ver la siguiente salida:

Kafka topics:
...
The status of the connectors:
source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-simple-source"
]
...
3

En el mismo shell, conecta a MongoDB usando mongosh, el shell de MongoDB, ejecutando el siguiente comando:

mongosh "mongodb://mongo1"

Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:

rs0 [direct: primary] test>

En el prompt, introduce los siguientes comandos para insertar un nuevo documento:

use Tutorial1
db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )

Una vez que MongoDB completa el comando de inserción, deberías recibir una confirmación que se asemeje al siguiente texto:

{
acknowledged: true,
insertedId: ObjectId("627e7e...")
}

Salga de la shell de MongoDB ingresando el comando exit.

Verifica el estado de tu entorno de Kafka utilizando el siguiente comando:

status

En la salida del comando anterior, deberías ver el nuevo tema que el conector de origen creó después de recibir el evento de cambio:

...
"topic": "Tutorial1.orders",
...

Confirme el contenido de los datos del nuevo tema de Kafka ejecutando el siguiente comando:

kc Tutorial1.orders

Nota

El comando kc es un script asistente que muestra el contenido de un tema de Kafka.

Debería ver los siguientes datos del tema de Kafka, organizados por secciones "Clave" y "Valor" cuando ejecute el comando anterior:

En la sección "Valor" del resultado, puede encontrar la parte del payload que incluye los datos de fullDocument, como se destaca en el siguiente documento JSON formateado:

{
"_id": {
"_data": "8262655A..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650809557,
"i": 2
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "62655a..."
},
"order_id": 1,
"item": "coffee"
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "62655a..."
}
}
}
4

Puedes omitir los metadatos de los eventos creados por el flujo de cambios configurándola para que solo devuelva el campo fullDocument .

Detén el conector utilizando el siguiente comando:

del mongo-simple-source

Nota

El comando del es un script asistente que llama a la API REST de Kafka Connect para detener el conector y es equivalente al siguiente comando:

curl -X DELETE connect:8083/connectors/<parameter>

Edite el archivo de configuración fuente llamado simplesource.json con el siguiente comando:

nano simplesource.json

Remueve la configuración existente, agrega la siguiente configuración y guarda el archivo:

{
"name": "mongo-simple-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"publish.full.document.only": true,
"database": "Tutorial1",
"collection": "orders"
}
}

Ejecuta el siguiente comando en la shell para iniciar el conector de origen utilizando el archivo de configuración que actualizaste:

cx simplesource.json

Conéctese a MongoDB usando mongosh con el siguiente comando:

mongosh "mongodb://mongo1"

En el prompt, introduce los siguientes comandos para insertar un nuevo documento:

use Tutorial1
db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )

Salga de mongosh ejecutando el siguiente comando:

exit

Confirme el contenido de los datos del nuevo tema de Kafka ejecutando el siguiente comando:

kc Tutorial1.orders

El campo payload en el documento "Valor" debe contener solo los siguientes datos del documento:

{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
5

Después de completar este tutorial, libere recursos en su computadora deteniendo o eliminando recursos de Docker. Puede eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, debe descargarlos de nuevo para reiniciar su entorno de desarrollo de MongoDB Kafka Connector, que ocupa aproximadamente 2,4 GB. Si elimina solo los contenedores, puede reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de ejemplo.

Tip

Más tutoriales

Si planea completar más tutoriales sobre el MongoDB Kafka Connector, considere remover solo los contenedores. Si no tienes previsto completar más tutoriales de MongoDB Kafka Connector, considera remover los contenedores e imágenes.

Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.

Ejecute el siguiente comando de shell para remover los contenedores y las imágenes de Docker del entorno de desarrollo:

docker-compose -p mongo-kafka down --rmi all

Ejecuta el siguiente comando Shell para remover los contenedores Docker, pero conserva las imágenes para el entorno de desarrollo:

docker-compose -p mongo-kafka down

Para reiniciar los contenedores, siga los mismos pasos necesarios para iniciarlos en la configuración del tutorial.

En este tutorial, inició un conector de origen utilizando diferentes configuraciones para alterar los datos de eventos de flujo de cambios publicados en un tema de Kafka.

Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial:

  • Propiedades de configuración del conector de origen

  • Kafka Connect REST API

Volver

Descubre los Change Streams de MongoDB

En esta página