Siga este tutorial para aprender a crear un flujo de cambios en una colección MongoDB y observar los eventos de cambio que crea.
Explorar los flujos de cambio
Completar la configuración del tutorial
Complete los pasos en el Tutorial del conector de Kafka Configuración para iniciar el entorno de Confluent Kafka Connect y MongoDB.
Conectarse al contenedor Docker
Cree dos sesiones de shell interactivas en el tutorial Docker Container, cada una en una ventana independiente.
Ejecute el siguiente comando desde una terminal para iniciar un shell interactivo.
docker exec -it mongo1 /bin/bash
A lo largo de este tutorial nos referiremos a este shell interactivo como ChangeStreamShell.1
Ejecute el siguiente comando en una segunda terminal para iniciar un shell interactivo:
docker exec -it mongo1 /bin/bash
A lo largo de este tutorial nos referiremos a este shell interactivo como ChangeStreamShell.2
Abre un flujo de cambios
En ChangeStreamShell,1 cree un script de Python para abrir un flujo de cambios utilizando el controlador de PyMongo.
nano openchangestream.py
Pegue el siguiente código en el archivo y guarde los cambios:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') with db.orders.watch() as stream: print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n') for change in stream: print(dumps(change, indent = 2))
Ejecute el script de Python:
python3 openchangestream.py
El script genera el siguiente mensaje después de iniciarse exitosamente:
Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
Desencadenar un evento de cambio
En ChangeStreamShell,2 conéctese a MongoDB usando mongosh, el shell de MongoDB, utilizando el siguiente comando:
mongosh "mongodb://mongo1"
Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:
rs0 [direct: primary] test>
En el indicador, escriba los siguientes comandos:
use Tutorial1 db.orders.insertOne( { 'test' : 1 } )
Después de ingresar los comandos anteriores, cambie a ChangeStreamShell1 para ver la salida del flujo de cambios, que debería parecerse a lo siguiente:
{ "_id": { "_data": "826264..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650754657, "i": 1 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "<_id value of document>" }, "test": 1 }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "<_id value of document>" } } }
Para detener el script, presione Ctrl+C.
Al finalizar este paso, habrás activado y observado con éxito un evento de flujo de cambios.
Abrir un flujo de cambios filtrado
Puede aplicar un filtro a un flujo de cambios pasándolo a través de una canalización de agregación.
En ChangeStreamShell,1 cree un nuevo script de Python para abrir un flujo de cambios filtrado utilizando el controlador de PyMongo.
nano pipeline.py
Pegue el siguiente código en el archivo y guarde los cambios:
import pymongo from bson.json_util import dumps client = pymongo.MongoClient('mongodb://mongo1') db = client.get_database(name='Tutorial1') pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ] with db.sensors.watch(pipeline=pipeline) as stream: print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n') for change in stream: print(dumps(change, indent = 2))
Ejecute el script de Python:
python3 pipeline.py
El script genera el siguiente mensaje después de iniciarse exitosamente:
Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
Observar el flujo de cambios filtrado
Regrese a su sesión ChangeStreamShell2 que debería estar conectada a MongoDB mongosh mediante.
En el indicador, escriba los siguientes comandos:
use Tutorial1 db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )
Como lo indica la salida del script, el flujo de cambio crea un evento de cambio porque coincide con la siguiente canalización:
[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
Intente insertar los siguientes documentos en ChangeStreamShell2 para verificar que el flujo de cambios solo produzca eventos cuando los documentos coincidan con el filtro:
db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } ) db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
(Opcional) Detener los contenedores Docker
Después de completar este tutorial, libere recursos en su equipo deteniendo o eliminando recursos de Docker. Puede eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, deberá descargarlos de nuevo para reiniciar su entorno de desarrollo del Conector Kafka de MongoDB, que tiene un tamaño aproximado de 2.4 GB. Si elimina solo los contenedores, puede reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de ejemplo.
Tip
Más tutoriales
Si planea completar más tutoriales del Conector Kafka de MongoDB, considere eliminar solo los contenedores. Si no planea completar más tutoriales del Conector Kafka de MongoDB, considere eliminar los contenedores y las imágenes.
Seleccione la pestaña que corresponde a la tarea de eliminación que desea ejecutar.
Ejecute el siguiente comando de shell para eliminar los contenedores y las imágenes de Docker del entorno de desarrollo:
docker-compose -p mongo-kafka down --rmi all
Ejecute el siguiente comando de shell para eliminar los contenedores Docker pero conservar las imágenes para el entorno de desarrollo:
docker-compose -p mongo-kafka down
Para reiniciar los contenedores, siga los mismos pasos necesarios para iniciarlos en la configuración del tutorial.
Resumen
En este tutorial, creó un flujo de cambios en MongoDB y observó el resultado. El conector de origen de MongoDB Kafka lee los eventos de cambio de un flujo de cambios que usted configura y los escribe en un tema de Kafka.
Para aprender a configurar un flujo de cambios y un tema de Kafka para un conector de origen, consulte el tutorial Introducción al conector de origen de Kafka de MongoDB.
Obtén más información
Lea los siguientes recursos para obtener más información sobre los conceptos mencionados en este tutorial: