Overview
En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en tiempo real en su base de datos. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan el sample_restaurants.restaurants Colección de los conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte
Comience a utilizar PyMongo.
Abre un flujo de cambios
Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que el flujo de cambios escucha. Puede llamar al método watch() en las siguientes clases:
MongoClient:Para monitorear todos los cambios en la implementación de MongoDBDatabase:Para monitorear los cambios en todas las colecciones de la base de datosCollection:Para monitorear cambios en la colección
El siguiente ejemplo abre un flujo de cambios en la colección restaurants y genera los cambios a medida que ocurren. Seleccione el Synchronous o pestaña Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch() as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch() as stream: async for change in stream: print(change)
Para empezar a observar los cambios, ejecute la aplicación. Luego, en una aplicación o shell independiente, modifique la colección restaurants. El siguiente ejemplo actualiza un documento con un valor de campo name de Blarney Castle. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = await collection.update_one(query_filter, update_operation)
Al actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que se produce. El evento de cambio impreso se parece al siguiente:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Modificar la salida del flujo de cambios
Puede pasar el parámetro pipeline al método watch() para modificar la salida del flujo de cambios. Este parámetro le permite observar únicamente los eventos de cambio especificados. Formatee el parámetro como una lista de objetos que representen cada uno una etapa de agregación.
Puede especificar las siguientes etapas en el parámetro pipeline:
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
El siguiente ejemplo utiliza el parámetro pipeline para abrir un flujo de cambios que registra únicamente las operaciones de actualización. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente:
change_pipeline = { "$match": { "operationType": "update" }}, with collection.watch(pipeline=change_pipeline) as stream: for change in stream: print(change)
change_pipeline = { "$match": { "operationType": "update" }}, async with await collection.watch(pipeline=change_pipeline) as stream: async for change in stream: print(change)
Para obtener más información sobre cómo modificar la salida de su flujo de cambios, consulte la sección Modificar la salida del flujo de cambios en el manual de MongoDB Server.
Modificar watch() el comportamiento
El método watch() acepta parámetros opcionales, que representan opciones que se pueden usar para configurar la operación. Si no se especifica ninguna opción, el controlador no personaliza la operación.
La siguiente tabla describe las opciones que puede configurar para personalizar el comportamiento de watch():
Propiedad | Descripción |
|---|---|
| A list of aggregation pipeline stages that modify the output of the
change stream. |
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Directs watch() to resume returning changes after the
operation specified in the resume token.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.resume_after is mutually exclusive with start_after and start_at_operation_time. |
| Directs watch() to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.start_after is mutually exclusive with resume_after and start_at_operation_time. |
| Directs watch() to return only events that occur after the
specified timestamp.start_at_operation_time is mutually exclusive with resume_after and start_after. |
| The maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to True. |
| The maximum number of change events to return in each batch of the
response from the MongoDB cluster. |
| The collation to use for the change stream cursor. |
| An instance of ClientSession. |
| A comment to attach to the operation. |
Incluir imágenes previas y posteriores
Importante
Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.
De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, especifique los parámetros full_document_before_change o full_document en el método watch().
La preimagen es la versión completa de un documento antes de un cambio. Para incluirla en el evento de flujo de cambios, configure el full_document_before_change parámetro con uno de los siguientes valores:
whenAvailable:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.requiredEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el controlador genera un error.
La imagen posterior es la versión completa de un documento tras un cambio. Para incluirla en el evento de flujo de cambios, configure el full_document parámetro con uno de los siguientes valores:
updateLookup:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.whenAvailable:El evento de cambio incluye una imagen posterior del documento modificado solo para eventos de cambio si la imagen posterior está disponible.requiredEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el controlador genera un error.
El siguiente ejemplo llama al método watch() en una colección e incluye la imagen posterior de los documentos actualizados mediante el parámetro fullDocument. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch(full_document='updateLookup') as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch(full_document='updateLookup') as stream: async for change in stream: print(change)
Con la aplicación de flujo de cambios en ejecución, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio similar al siguiente:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens', 'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'}, 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.
Información Adicional
Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: