Overview
En esta guía, puedes aprender a utilizar un flujo de cambios para supervisar los cambios en tiempo real de tu base de datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan el sample_restaurants.restaurants colección de los conjuntos de datos de muestra de Atlas. Para aprender cómo crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulta
Comience a utilizar PyMongo.
Abre un flujo de cambios
Para abrir un flujo de cambios, llama al método watch(). La instancia en la que se llama al método watch() determina el alcance de los eventos para los que el flujo de cambios escucha. Puedes llamar al método watch() en las siguientes clases:
MongoClient:Para monitorear todos los cambios en la implementación de MongoDBDatabase: Para supervisar los cambios en todas las colecciones de la base de datosCollection:Para monitorear cambios en la colección
El siguiente ejemplo abre un flujo de cambios en la colección restaurants y genera los cambios a medida que ocurren. Seleccione el Synchronous o la pestaña Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch() as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch() as stream: async for change in stream: print(change)
Para empezar a observar los cambios, ejecute la aplicación. Luego, en una aplicación o shell independiente, modifique la colección restaurants. El siguiente ejemplo actualiza un documento con un valor de campo name de Blarney Castle. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = await collection.update_one(query_filter, update_operation)
Cuando actualizas la colección, la aplicación del flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se parece al siguiente:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Modificar la salida del flujo de cambios
Puedes pasar el parámetro pipeline al método watch() para modificar la salida del flujo de cambios. Este parámetro te permite monitorear solo los eventos de cambio especificados. Da formato al parámetro como una lista de objetos que representen cada uno una etapa de agregación.
Puede especificar las siguientes etapas en el parámetro pipeline:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
El siguiente ejemplo utiliza el parámetro pipeline para abrir un flujo de cambios que registra solo las operaciones de actualización. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente:
change_pipeline = { "$match": { "operationType": "update" }}, with collection.watch(pipeline=change_pipeline) as stream: for change in stream: print(change)
change_pipeline = { "$match": { "operationType": "update" }}, async with await collection.watch(pipeline=change_pipeline) as stream: async for change in stream: print(change)
Para obtener más información sobre cómo modificar la salida de tu flujo de cambios, consulta la sección Modificar salida del flujo de cambio en el manual del MongoDB Server.
Modificar el comportamiento de watch()
El método watch() acepta parámetros opcionales, que representan opciones que puedes usar para configurar la operación. Si no especificas ninguna opción, el driver no personaliza la operación.
La siguiente tabla describe las opciones que puedes configurar para personalizar el comportamiento de watch():
Propiedad | Descripción |
|---|---|
| A list of aggregation pipeline stages that modify the output of the
change stream. |
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Directs watch() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.resume_after is mutually exclusive with start_after and start_at_operation_time. |
| Directs watch() to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.start_after is mutually exclusive with resume_after and start_at_operation_time. |
| Directs watch() to return only events that occur after the
specified timestamp.start_at_operation_time is mutually exclusive with resume_after and start_after. |
| The maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to True. |
| The maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
| The collation to use for the change stream cursor. |
| An instance of ClientSession. |
| A comment to attach to the operation. |
Incluir imágenes preoperativas y postoperatorias
Importante
Puede habilitar pre imágenes y post imágenes en colecciones solo si su implementación utiliza MongoDB v6.0 o posterior.
De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, especifique los parámetros full_document_before_change o full_document en el método watch().
La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, establece el parámetro full_document_before_change en uno de los siguientes valores:
whenAvailable:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.requiredEl evento de cambio incluye una preimagen del documento modificado para los eventos de cambio. Si la preimagen no está disponible, el driver genera un error.
La post-imagen es la versión completa de un documento después de un cambio. Para incluir la imagen posterior en el evento del flujo de cambios, configure el parámetro full_document con uno de los siguientes valores:
updateLookup: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.whenAvailable: El evento de cambio incluye una postimagen del documento modificado para eventos de cambio solo si la postimagen está disponible.required: El evento de cambio incluye una imagen posterior del documento modificado para eventos de cambio. Si la imagen posterior no está disponible, el driver genera un error.
El siguiente ejemplo invoca el método watch() en una colección e incluye las versiones posteriores a la actualización de los documentos al especificar el parámetro fullDocument. Selecciona la pestaña Synchronous o Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch(full_document='updateLookup') as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch(full_document='updateLookup') as stream: async for change in stream: print(change)
Con la aplicación de flujo de cambios ejecutándose, actualizar un documento en la colección restaurants usando el ejemplo anterior de actualización imprime un evento de cambio semejante al siguiente:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens', 'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'}, 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.
Información Adicional
Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: