Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Descubre los Change Streams de MongoDB

Sigue este tutorial para aprender a crear un flujo de cambios en una colección de MongoDB y observar los eventos de cambio que crea.

1

Complete los pasos en el Configurar el tutorial del conector Kafka para iniciar el entorno de Confluent Kafka Connect y MongoDB.

2

Crea dos sesiones interactivas de shell en el contenedor Docker del tutorial, cada una en una ventana independiente.

Ejecute el siguiente comando desde una terminal para iniciar un shell interactivo.

docker exec -it mongo1 /bin/bash

Nos referiremos a esta shell interactiva como ChangeStreamShell1 a lo largo de este tutorial.

Ejecute el siguiente comando en un segundo terminal para iniciar un shell interactivo:

docker exec -it mongo1 /bin/bash

Nos referiremos a esta shell interactiva como ChangeStreamShell2 a lo largo de este tutorial.

3

En ChangeStreamShell1, crea un script en Python para abrir un flujo de cambios usando el controlador PyMongo.

nano openchangestream.py

Pegué el siguiente código en el archivo y guardé los cambios:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
with db.orders.watch() as stream:
print('\nA change stream is open on the Tutorial1.orders namespace. Currently watching ...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Ejecute el script de Python:

python3 openchangestream.py

El script muestra el siguiente mensaje después de iniciarse correctamente:

Change Stream is opened on the Tutorial1.orders namespace. Currently watching ...
4

En ChangeStreamShell2, conecta a MongoDB utilizando mongosh, el shell de MongoDB, utilizando el siguiente comando:

mongosh "mongodb://mongo1"

Luego de conectarse correctamente, deberías ver el siguiente mensaje en la shell de MongoDB:

rs0 [direct: primary] test>

En el aviso, escribe los siguientes comandos:

use Tutorial1
db.orders.insertOne( { 'test' : 1 } )

Después de ingresar los comandos anteriores, cambia a ChangeStreamShell1 para ver la salida del flujo de cambios, que debe parecerse a lo siguiente:

{
"_id": {
"_data": "826264..."
},
"operationType": "insert",
"clusterTime": {
"$timestamp": {
"t": 1650754657,
"i": 1
}
},
"wallTime": {
"$date": "2022-10-13T17:06:23.409Z"
},
"fullDocument": {
"_id": {
"$oid": "<_id value of document>"
},
"test": 1
},
"ns": {
"db": "Tutorial1",
"coll": "orders"
},
"documentKey": {
"_id": {
"$oid": "<_id value of document>"
}
}
}

Para detener el script, pulse Ctrl+C.

Al final de este paso, habrás activado y observado con éxito un evento de flujo de cambios.

5

Puede aplicar un filtro a un flujo de cambios pasándolo a través de una canalización de agregación.

En ChangeStreamShell1, crea un nuevo script en Python para abrir un flujo de cambios filtrado utilizando el driver PyMongo.

nano pipeline.py

Pegué el siguiente código en el archivo y guardé los cambios:

import pymongo
from bson.json_util import dumps
client = pymongo.MongoClient('mongodb://mongo1')
db = client.get_database(name='Tutorial1')
pipeline = [ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]
with db.sensors.watch(pipeline=pipeline) as stream:
print('\nChange Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...\n\n')
for change in stream:
print(dumps(change, indent = 2))

Ejecute el script de Python:

python3 pipeline.py

El script muestra el siguiente mensaje después de iniciarse correctamente:

Change Stream is opened on the Tutorial1.sensors namespace. Currently watching for values > 100...
6

Vuelve a tu sesión de ChangeStreamShell2 que debería estar conectada a MongoDB usando mongosh.

En el aviso, escribe los siguientes comandos:

use Tutorial1
db.sensors.insertOne( { 'type' : 'temp', 'value':101 } )

Como lo indica la salida del script, el flujo de cambios crea un evento de cambio porque coincide con la siguiente pipeline:

[ { "$match": { "$and": [ { "fullDocument.type": "temp" }, { "fullDocument.value": { "$gte": 100 } } ] } } ]

Intente insertar los siguientes documentos en CambioStreamShell2 para verificar que el flujo de cambios solo produce eventos cuando los documentos coinciden con el filtro:

db.sensors.insertOne( { 'type' : 'temp', 'value': 99 } )
db.sensors.insertOne( { 'type' : 'pressure', 'value': 22 } )
7

Después de completar este tutorial, libere recursos en su computadora deteniendo o eliminando recursos de Docker. Puede eliminar tanto los contenedores como las imágenes de Docker, o solo los contenedores. Si elimina los contenedores y las imágenes, debe descargarlos de nuevo para reiniciar su entorno de desarrollo de MongoDB Kafka Connector, que ocupa aproximadamente 2,4 GB. Si elimina solo los contenedores, puede reutilizar las imágenes y evitar la descarga de la mayoría de los archivos grandes en la canalización de datos de ejemplo.

Tip

Más tutoriales

Si planea completar más tutoriales sobre el MongoDB Kafka Connector, considere remover solo los contenedores. Si no tienes previsto completar más tutoriales de MongoDB Kafka Connector, considera remover los contenedores e imágenes.

Selecciona la pestaña que corresponde a la tarea de eliminación que deseas ejecutar.

Ejecute el siguiente comando de shell para remover los contenedores y las imágenes de Docker del entorno de desarrollo:

docker-compose -p mongo-kafka down --rmi all

Ejecuta el siguiente comando Shell para remover los contenedores Docker, pero conserva las imágenes para el entorno de desarrollo:

docker-compose -p mongo-kafka down

Para reiniciar los contenedores, siga los mismos pasos necesarios para iniciarlos en la configuración del tutorial.

En este tutorial, creaste un flujo de cambios en MongoDB y observaste el resultado. El MongoDB Kafka source connector lee los eventos de cambio desde un flujo de cambios que configuras y los escribe en un tema de Kafka.

Para aprender a configurar un flujo de cambios y un tema de Kafka para un Connector fuente, continúa con el tutorial Comenzar con el Connector fuente MongoDB Kafka.

Lea los siguientes recursos para aprender más sobre los conceptos mencionados en este tutorial:

  • Change Streams y el Connector de origen

  • Modifica la salida del flujo de cambios

  • Shell de MongoDB (mongosh)

Volver

Configuración del tutorial

En esta página