Los flujos de cambio permiten que las aplicaciones accedan a cambios de datos en tiempo real sin la complejidad y el riesgo previos de seguir manualmente los cambios. Registro de operaciones. Las aplicaciones pueden usar flujos de cambios para suscribirse a todos los cambios de datos en una sola colección, una base de datos o una implementación completa, y reaccionar inmediatamente a ellos. Dado que los flujos de cambios utilizan el marco de agregación, las aplicaciones también pueden filtrar cambios específicos o transformar las notificaciones a voluntad.
Nota
Los flujos de cambios están limitados a eventos de bases de datos. Atlas Stream Processing ofrece una funcionalidad ampliada, que incluye la gestión de varios tipos de eventos de datos y el procesamiento de flujos de datos complejos mediante la misma API de query que utilizan las bases de datos de Atlas. Para obtener más información, consulta Atlas Stream Processing.
A partir de MongoDB 5.1, los flujos de cambios están optimizados, proporcionando una utilización de recursos más eficiente y una ejecución más rápida de algunas etapas de la canalización de agregación.
Disponibilidad
Los flujos de cambios están disponibles para Sets de réplicas y clústeres fragmentados:
Motor de almacenamiento.
Los set de réplicas y los clústeres fragmentados deben utilizar el motor de almacenamiento WiredTiger. Los flujos de cambios también se pueden utilizar en implementaciones que incluyen la característica de cifrado en reposo de MongoDB.
Versión del protocolo de set de réplicas.
Los conjuntos de réplicas y los clústeres fragmentados deben usar la versión de protocolo del conjunto de réplicas 1 (
pv1).Activación del nivel de consistencia de lectura “majority”.
A partir de MongoDB,4.2 losflujos de cambio están disponibles independientemente del
"majority"soporte de lectura de preocupación; es decir, el soporte de lectura demajoritypreocupación puede estar habilitado (predeterminado) o deshabilitado para usar flujos de cambio.En MongoDB 4.0 y versiones anteriores, las change streams están disponibles sólo si está habilitada la compatibilidad con el nivel de consistencia de lectura
"majority"(por defecto).
Compatibilidad con Stable API
Los flujos de cambios están incluidos en la Stable API V1. Sin embargo, la opción showExpandedEvents no está incluida en la Stable API V1.
Conectar
Las conexiones para un flujo de cambios pueden usar listas de nodos iniciales DNS con la opción de conexión +srv o enumerar los servidores individualmente en la cadena de conexión.
Si el driver pierde la conexión con un flujo de cambios o la conexión se cae, intenta restablecer la conexión con el flujo de cambios a través de otro nodo del clúster que tenga una preferencia de lectura que coincida. Si el driver no puede encontrar un nodo con la preferencia de lectura adecuada, lanzará una excepción.
Para obtener más información, consulta Formato URI de la cadena de conexión.
Supervisa una colección, base de datos o implementación
Puedes abrir flujos de cambio para lo siguiente:
Objetivo | Descripción |
|---|---|
Una colección | Puedes abrir un cursor de flujo de cambios para una sola colección (excepto las colecciones Los ejemplos en esta página utilizan los drivers de MongoDB para abrir y trabajar con un cursor de flujo de cambios para una sola colección. Ve también el |
Una base de datos | Puede abrir un cursor de flujo de cambios para una sola base de datos (excluyendo las bases de datos Para el método del controlador de MongoDB, consulte la documentación de su controlador. Véase también el |
Una implementación | Puedes abrir un cursor de flujo de cambios para una implementación (ya sea un set de réplicas o un clúster particionado) para observar cambios en todas las colecciones ajenas al sistema en todas las bases de datos, excepto Para el método del controlador de MongoDB, consulte la documentación de su controlador. Véase también el |
Nota
Ejemplos de flujos de cambios
En los ejemplos de esta página, se utilizan los drivers de MongoDB para ilustrar cómo abrir un cursor de flujo de cambios para una colección y trabajar con él.
Consideraciones sobre el rendimiento de los flujos de cambios
Si la cantidad de flujos de cambios activos abiertos para una base de datos supera el tamaño del pool de conexiones, puedes experimentar latencia en las notificaciones. Cada flujo de cambios utiliza una conexión y una operación getMore en el flujo de cambios durante el período que espera el siguiente evento. Para evitar problemas de latencia, debes asegurarte de que el tamaño del pool sea mayor que el número de flujos de cambios abiertos. Para obtener más detalles, consulta la configuración maxPoolSize.
Abre un flujo de cambios
Para abrir un flujo de cambios:
Para un set de réplicas, puedes emitir la operación de flujo de cambios abierto desde cualquiera de los nodos que contienen datos.
Para un clúster fragmentado, debes emitir la operación de flujo de cambio abierto desde
mongos.
En el siguiente ejemplo, se abre un flujo de cambios para una colección y se recorre el cursor para recuperar los documentos del flujo de cambios. [1]
➤ Usar el menú desplegable Seleccionar lenguaje en la parte superior derecha para establecer el lenguaje de los ejemplos en esta página.
Los ejemplos en C a continuación asumen que te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección de inventory.
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Los ejemplos de C# a continuación suponen que se ha conectado a un conjunto de réplicas de MongoDB y ha accedido a una base de datosque contiene una inventory colección.
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
Los ejemplos de Go a continuación suponen que te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección inventory.
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Los ejemplos de Java a continuación suponen que se ha realizado la conexión a un set de réplicas de MongoDB y que se ha accedido a una base de datos que contiene una colección inventory.
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
Los siguientes ejemplos suponen que te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección inventory.
cursor = db.inventory.watch() document = await cursor.next()
Los ejemplos de Node.js a continuación suponen que te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección inventory.
El siguiente ejemplo utiliza transmitir para procesar los eventos de cambio.
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream .on('change', next => { // process next document }) .once('error', () => { // handle error });
Alternativamente, también puedes utilizar un iterador para procesar los eventos de cambio:
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
Los ejemplos siguientes asumen que se ha realizado la conexión a un set de réplicas de MongoDB y que se accedió a una base de datos que contiene una colección de inventory.
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Los ejemplos de Python a continuación asumen que se realizó la conexión a un set de réplicas de MongoDB y se accedió a una base de datos que contiene una colección de inventory.
cursor = db.inventory.watch() next(cursor)
Los ejemplos siguientes asumen que se ha realizado la conexión a un set de réplicas de MongoDB y que se accedió a una base de datos que contiene una colección de inventory.
cursor = inventory.watch.to_enum next_change = cursor.next
Los siguientes ejemplos de Swift (Async) asumen que te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección de inventory.
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
Los ejemplos de Swift (Sync) a continuación suponen que ya te has conectado a un set de réplicas de MongoDB y has accedido a una base de datos que contiene una colección inventory.
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
Para recuperar el evento de cambio de datos del cursor, itera el cursor del flujo de cambios. Para obtener información sobre el evento de flujo de cambios, consulta Eventos de cambio.
El cursor de flujo de cambios permanece abierto hasta que ocurra una de las siguientes situaciones:
El cursor se cierra explícitamente.
Se produce un evento de invalidación; por ejemplo, un descarte o cambio de nombre de una colección.
La conexión a la implementación de MongoDB se cierra o se agota el tiempo de espera. Consulta Comportamiento para obtener más información.
Si la implementación es un clúster fragmentado, la eliminación de un fragmento puede provocar que se cierre un cursor de flujo de cambio abierto y que el cursor de flujo de cambio cerrado no pueda reanudarse por completo.
Nota
El ciclo de vida de un cursor sin cerrar depende del lenguaje.
| [1] | Puede especificar un startAtOperationTime para abrir el cursor en un punto en el tiempo determinado. Si el punto de inicio especificado está en el pasado, debe encontrarse dentro del rango de tiempo del OpLog. |
Modifica la salida del flujo de cambios
➤ Usar el menú desplegable Seleccionar lenguaje en la parte superior derecha para establecer el lenguaje de los ejemplos en esta página.
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
pipeline := mongo.Pipeline{bson.D{{ "$match", bson.D{{ "$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}, }, }}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
La pipeline lista incluye una sola $match etapa de filtro que filtra cualquier operación donde el username alicesea, u operaciones donde el operationType deletesea.
Pasar el pipeline al método watch() dirige al flujo de cambios a devolver notificaciones después de pasarlas a través del pipeline especificado.
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
El siguiente ejemplo utiliza transmitir para procesar los eventos de cambio.
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
Alternativamente, también puedes utilizar un iterador para procesar los eventos de cambio:
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
Se puede controlar la salida del flujo de cambios proporcionando un arreglo de una o más de las siguientes etapas del pipeline al configurar el flujo de cambios:
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Tip
El campo _id del documento de evento de flujo de cambios actúa como token de reanudación. No utilices la pipeline para modificar o eliminar el campo _id del evento de flujo de cambios.
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación del flujo de cambios modifica el campo _id de un evento.
Consulta Eventos de cambio para obtener más información sobre el formato del documento de respuesta del flujo de cambios.
Revisa un documento completo para operaciones de actualización
Por defecto, los flujos de cambios solo devuelven el delta de campos durante la operación de actualización. Sin embargo, puedes configurar el flujo de cambios para que devuelva la versión más reciente confirmada por la mayoría del documento actualizado.
➤ Usar el menú desplegable Seleccionar lenguaje en la parte superior derecha para establecer el lenguaje de los ejemplos en esta página.
Para devolver la versión más reciente comprometida por mayoría del documento actualizado, pasa la opción "fullDocument" con el valor "updateLookup" al método mongoc_collection_watch.
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo fullDocument que representa la versión actual del documento afectado por la operación de la actualización.
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" al método db.collection.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo FullDocument que representa la versión actual del documento afectado por la operación de la actualización.
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
Para devolver la versión más reciente comprometida por la mayoría del documento actualizado, utiliza la opción de flujo de cambiosSetFullDocument(options.UpdateLookup).
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
Para devolver la versión más reciente comprometida por mayoría del documento actualizado, pase FullDocument.UPDATE_LOOKUP al método db.collection.watch.fullDocument().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo FullDocument que representa la versión actual del documento afectado por la operación de la actualización.
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa full_document='updateLookup' al método db.collection.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo `full_document que representa la versión actual del documento afectado por la operación de la actualización.
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa { fullDocument: 'updateLookup' } al método db.collection.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo fullDocument que representa la versión actual del documento afectado por la operación de la actualización.
El siguiente ejemplo utiliza transmitir para procesar los eventos de cambio.
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
Alternativamente, también puedes utilizar un iterador para procesar los eventos de cambio:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" al método db.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo fullDocument que representa la versión actual del documento afectado por la operación de la actualización.
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa full_document='updateLookup' al método db.collection.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo full_document que representa la versión actual del documento afectado por la operación de la actualización.
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
Para devolver la versión más reciente confirmada por la mayoría del documento actualizado, pasa full_document: 'updateLookup' al método db.watch().
En el ejemplo a continuación, todas las notificaciones de operaciones de actualización incluyen un campo full_document que representa la versión actual del documento afectado por la operación de la actualización.
cursor = inventory.watch([], full_document: 'updateLookup').to_enum next_change = cursor.next
Para devolver la versión más reciente comprometida por mayoría del documento actualizado, pase options:
ChangeStreamOptions(fullDocument: .updateLookup) al método watch().
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
Para devolver la versión más reciente comprometida por mayoría del documento actualizado, pase options:
ChangeStreamOptions(fullDocument: .updateLookup) al método watch().
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
Nota
Si hay una o más operaciones comprometidas por la mayoría que modificaron el documento actualizado después de la operación de actualización pero antes de la búsqueda, el documento completo devuelto puede ser significativamente diferente del documento en el momento de la operación de actualización.
Sin embargo, los deltas incluidos en el documento de flujo de cambios siempre describen correctamente los cambios de la colección supervisada que se aplicaron a ese evento de flujo de cambios.
Consulta Eventos de cambio para obtener más información sobre el formato del documento de respuesta del flujo de cambios.
Reanudar un flujo de cambios
Los flujos de cambios pueden reanudarse si se especifica un token de reanudación para resumeAfter o startAfter cuando se abre el cursor.
resumeAfter para Change Streams
Puede reanudar un flujo de cambios después de un evento específico pasando un token de reanudación a resumeAfter al abrir el cursor.
Consulta Tokens de reanudación para obtener más información sobre el token de reanudación.
Importante
El oplog debe tener suficiente historial para localizar la operación asociada con el token o la marca de tiempo, si la marca de tiempo está en el pasado.
No puedes usar
resumeAfterpara reanudar un flujo de cambios después de que un evento de invalidación (por ejemplo, un descarte o renombramiento de colección) cierre el flujo. En cambio, puedes usar startAfter para iniciar un nuevo flujo de cambios después de un evento de invalidación.
En el ejemplo a continuación, la opción resumeAfter se agrega a las opciones del flujo para recrear el flujo después de que se haya destruido. Al pasar el _id al flujo de cambios, se intenta reanudar las notificaciones comenzando después de la operación especificada.
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
En el ejemplo a continuación, el resumeToken se recupera del último documento de flujo de cambios y se pasa al método Watch() como una opción. Pasar el resumeToken al método Watch() dirige el flujo de cambios a intentar reanudar las notificaciones comenzando después de la operación especificada en el token de reanudación.
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
Puedes utilizar ChangeStreamOptions.SetResumeAfter para especificar el token de reanudación del flujo de cambios. Si se establece la opción resumeAfter, el flujo de cambios reanuda las notificaciones después de la operación especificada en el token de reanudación. El SetResumeAfter toma un valor que debe resolverse en un token de reanudación, como resumeToken en el ejemplo a continuación.
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
Puede usar el método resumeAfter() para reanudar las notificaciones después de la operación especificada en el token de reanudación. El método resumeAfter() toma un valor que debe resolverse en un token de reanudación, p. ej. resumeToken en el ejemplo siguiente.
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
Puedes utilizar el modificador resume_after para reanudar las notificaciones después de la operación especificada en el token de reanudación. El modificador resume_after toma un valor que debe resolverse en un token de reanudación, p. ej. resume_token en el ejemplo siguiente.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
Puedes usar la opción resumeAfter para reanudar las notificaciones después de la operación especificada en el token de reanudación. La opción resumeAfter toma un valor que debe resolverse en un token de reanudación, p. ej. resumeToken en el ejemplo siguiente.
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream .once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream .on('change', next => { processChange(next); }) .once('error', error => { // handle error }); }) .once('error', error => { // handle error });
Puedes usar la opción resumeAfter para reanudar las notificaciones después de la operación especificada en el token de reanudación. La opción resumeAfter toma un valor que debe resolverse en un token de reanudación, p. ej. $resumeToken en el ejemplo siguiente.
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
Puedes utilizar el modificador resume_after para reanudar las notificaciones después de la operación especificada en el token de reanudación. El modificador resume_after toma un valor que debe resolverse en un token de reanudación, p. ej. resume_token en el ejemplo siguiente.
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
Puedes utilizar el modificador resume_after para reanudar las notificaciones después de la operación especificada en el token de reanudación. El modificador resume_after toma un valor que debe resolverse en un token de reanudación, p. ej. resume_token en el ejemplo siguiente.
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next
Puedes usar la opción resumeAfter para reanudar las notificaciones después de la operación especificada en el token de reanudación. La opción resumeAfter toma un valor que debe resolverse en un token de reanudación, p. ej. resumeToken en el ejemplo siguiente.
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
Puedes usar la opción resumeAfter para reanudar las notificaciones después de la operación especificada en el token de reanudación. La opción resumeAfter toma un valor que debe resolverse en un token de reanudación, p. ej. resumeToken en el ejemplo siguiente.
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter para Change Streams
Puedes iniciar un nuevo flujo de cambios después de un evento específico si pasas un token de reanudación a startAfter cuando se abre el cursor. A diferencia de resumeAfter, startAfter puede reanudar las notificaciones después de un evento de invalidación mediante la creación de un nuevo flujo de cambios.
Consulta Tokens de reanudación para obtener más información sobre el token de reanudación.
Importante
El oplog debe tener suficiente historial para localizar la operación asociada con el token o la marca de tiempo, si la marca de tiempo está en el pasado.
Tokens de reanudación
El token de reanudación está disponible en varias fuentes:
Origen | Descripción |
|---|---|
Cada notificación de evento de cambio incluye un token de reanudación en el campo | |
La etapa de agregación Este campo solo aparece cuando se utiliza el comando | |
El comando |
A partir de MongoDB 4.2, los flujos de cambios lanzarán una excepción si la pipeline de agregación del flujo de cambios modifica el campo _id de un evento.
Tip
MongoDB proporciona un "fragmento", una extensión de mongosh, que descodifica los tokens de reanudación codificados en hexadecimal.
Puedes instalar y ejecutar el fragmento resumetoken desde mongosh:
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
También puede ejecutar resumetoken desde la línea de comandos (sin utilizar mongosh) si npm está instalado en su sistema:
npx mongodb-resumetoken-decoder <RESUME TOKEN>
Consulte lo siguiente para más detalles sobre:
Tokens de reanudación de eventos de cambios
Las notificaciones de eventos de cambio incluyen un token de reanudación en el campo _id:
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
Reanudar tokens desde aggregate
Al utilizar el comando aggregate, la etapa de agregación $changeStream incluye un token de reanudación en el campo cursor.postBatchResumeToken:
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
Reanudar tokens desde getMore
El comando getMore también incluye un token de reanudación en el campo cursor.postBatchResumeToken:
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
Casos de uso
Los flujos de cambios pueden beneficiar las arquitecturas con sistemas empresariales confiables, ya que modifican los sistemas posteriores una vez que los cambios en los datos son duraderos. Por ejemplo, los flujos de cambio pueden ahorrar tiempo a los desarrolladores cuando se implementan servicios de extracción, transformación y carga (ETL), sincronización multiplataforma, funcionalidad de colaboración y servicios de notificación.
Control de acceso
Para implementaciones que aplican autenticación en implementaciones autogestionadas y autorización:
Para abrir un flujo de cambios en una colección específica, las aplicaciones deben tener privilegios que otorguen acceso a las acciones
changeStreamyfinden la colección correspondiente.{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } Para abrir un flujo de cambios en una sola base de datos, las aplicaciones deben tener privilegios que otorguen acceso a las acciones
changeStreamyfinden todas las colecciones que no seansystemen la base de datos.{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } Para abrir un flujo de cambios en toda una implementación, las aplicaciones deben tener privilegios que otorguen acceso a las acciones
changeStreamyfinden todas las colecciones que no seansystempara todas las bases de datos de la implementación.{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
Notificación de evento
Los flujos de cambios solo notifican sobre los cambios de datos que han persistido en la mayoría de los nodos que contienen datos en el set de réplicas. Esto garantiza que las notificaciones se activen solo por cambios comprometidos por la mayoría, que sean duraderos en escenarios de fallo.
Por ejemplo, piensa en un set de réplicas de 3 nodos con un cursor de flujo de cambios abierto contra el primario. Si un cliente emite una operación de inserción, el flujo de cambios solo notifica a la aplicación del cambio de datos una vez que esa inserción haya persistido en la mayoría de los nodos portadores de datos.
Si una operación está asociada con una transacción, el documento de evento de cambio incluye el txnNumber y el lsid.
Intercalación
Los flujos de cambio utilizan comparaciones binarias simple a menos que se proporcione una intercalación explícita.
Change Streams y documentos huérfanos
A partir de MongoDB 5.3, durante la migración de rango, los eventos de flujo de cambios no se generan para las actualizaciones de documentos huérfanos.
Change Streams con imágenes previas y posteriores de los documentos
A partir de MongoDB 6.0, puedes utilizar eventos de flujo de cambios para generar la versión de un documento antes y después de los cambios (las imágenes previas y posteriores del documento):
La imagen previa es el documento antes de reemplazarlo, actualizarlo o borrarlo. No existe una imagen previa para un documento insertado.
La imagen posterior es el documento tras insertarse, sustituirse o actualizarse. No hay imagen posterior para un documento borrado.
Activa
changeStreamPreAndPostImagespara una colección condb.createCollection(),createocollMod.
Las imágenes previas y posteriores no están disponibles para un evento de flujo de cambios si las imágenes fueron:
No está habilitado en la colección en el momento de la operación de actualización o eliminación de un documento.
Eliminado después del tiempo de retención de imágenes previas y posteriores establecido en
expireAfterSeconds.El siguiente ejemplo configura
expireAfterSecondsen100segundos en todo el clúster:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) El siguiente ejemplo devuelve la configuración actual de
changeStreamOptions, incluyendoexpireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Configurar
expireAfterSecondsenoffutiliza la política de retención por defecto: las imágenes previas y posteriores se conservan hasta que los eventos correspondientes del flujo de cambios se eliminan del oplog.Si se elimina un evento de flujo de cambios del oplog, las imágenes previas y posteriores correspondientes también se eliminan independientemente del tiempo de retención de las imágenes previas y posteriores
expireAfterSeconds.
Consideraciones adicionales:
Habilitar las imágenes previas y posteriores consume espacio de almacenamiento y aumenta el tiempo de procesamiento. Habilita solo las imágenes previas y de publicación si las necesitas.
Limita el tamaño del evento del flujo de cambios a menos de 16 mebibytes. Para limitar el tamaño del evento, puedes:
Limita el tamaño del documento a 8 megabytes. Puedes solicitar imágenes previas y posteriores simultáneamente en la salida del flujo de cambios si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes posteriores en la salida del flujo de cambios para documentos de hasta 16 mebibytes si otros campos de eventos del flujo de cambios como
updateDescriptionno son grandes.Solicita solo imágenes previas en la salida del flujo de cambios para documentos de hasta 16 mebibytes si:
las actualizaciones de documento afectan solo a una pequeña fracción de la estructura o el contenido del documento, y
no cause un evento de cambio
replace. Un eventoreplacesiempre incluye la imagen de publicación.
Para realizar una solicitud de imagen previa, debes establecer
fullDocumentBeforeChangeenrequiredowhenAvailableendb.collection.watch(). Para solicitar una imagen posterior, establecefullDocumentmediante el mismo método.Las preimágenes se escriben en la colección
config.system.preimages.La colección
config.system.preimagespuede agrandarse. Para limitar el tamaño de la colección, puedes establecer el tiempo aexpireAfterSecondspara las imágenes previas como se mostró antes.Las imágenes previas se eliminan de forma asincrónica mediante un proceso en segundo plano.
Importante
Característica incompatible con versiones anteriores
A partir de MongoDB 6.0, si utilizas imágenes previas y posteriores de documentos para los flujos de cambios, debes deshabilitar changeStreamPreAndPostImages para cada colección mediante el comando collMod antes de poder volver a una versión anterior de MongoDB.
Tip
Para los eventos y resultados del flujo de cambios, consulta Eventos de cambio.
Para buscar cambios en una colección, consulta
db.collection.watch().Para obtener ejemplos completos con la salida del flujo de cambios, consulta Flujos de cambio con imágenes previas y posteriores de documentos.
Para obtener ejemplos completos con la salida del flujo de cambios, consulta Flujos de cambio con imágenes previas y posteriores de documentos.