Definición
$changeStreamDevuelve un(a) Change Stream cursor en una colección, una base de datos o en todo un clúster. Debe usarse como la primera etapa en un pipeline de agregación.
La
$changeStreamla etapa tiene la siguiente sintaxis:{ $changeStream: { allChangesForCluster: <boolean>, fullDocument: <string>, fullDocumentBeforeChange: <string>, resumeAfter: <document> showExpandedEvents: <boolean>, startAfter: <document> startAtOperationTime: <timestamp> } } ParameterDescripciónallChangesForClusterOpcional: establece si el flujo de cambios debe incluir todos los cambios en el clúster. Solo puede abrirse en la base de datos
admin.fullDocumentOpcional: especifica si las notificaciones de cambios incluyen una copia del documento completo cuando es modificado por operaciones
update.default: Las notificaciones de cambios no incluyen el documento completo para las operaciones deupdate.requiredLas notificaciones de cambio incluyen una copia del documento modificado tal como apareció inmediatamente después del cambio. Si no se puede encontrar el documento, el flujo de cambios genera un error.Para usar esta opción, primero debes usar el comando
collModpara habilitar la opciónchangeStreamPreAndPostImages.Novedades en la versión 6.0.
updateLookup: Las notificaciones de cambio incluyen una copia del documento modificado por el cambio. Este documento es el documento actual comprometido por mayoría onullsi ya no existe.whenAvailableNotificación de cambio incluye una copia del documento modificado tal como apareció inmediatamente después del cambio onullsi el documento no está disponible.Para usar esta opción, primero debes usar el comando
collModpara habilitar la opciónchangeStreamPreAndPostImages.Novedades en la versión 6.0.
En el caso de actualizaciones parciales, la notificación de cambio también proporciona una descripción del cambio.
fullDocumentBeforeChangeIncluya el documento completo anterior al cambio. Este campo acepta los siguientes valores:
off: Desactiva la inclusión del documento de antes del cambio.whenAvailable: Incluye el documento del antes del cambio. La query no falla si el documento no modificado no está disponible.requiredIncluye documentos anteriores al cambio. La query falla si el documento sin modificar no está disponible.
resumeAfteropcional. Especifica un token de reanudación (resume token) como el punto de partida lógico para el flujo de cambios. No se puede usar para reanudar el flujo de cambios después de un
invalidateevento.resumeAfteres mutuamente excluyente constartAfterystartAtOperationTime.showExpandedEventsEspecifica si se deben incluir eventos de cambios adicionales, como las operaciones de DDL e índices.
Novedades en la versión 6.0.
startAfterOpcional. Especifica un token de reanudación como el punto de inicio lógico para el flujo de cambios. A diferencia de
resumeAfter,startAfterpuede reanudar las notificaciones después de un eventoinvalidatecreando un nuevo flujo de cambios.startAfteres mutuamente excluyente conresumeAfterystartAtOperationTime.startAtOperationTimeEspecifica un momento como punto de partida lógico para el flujo de cambios. No puede usarse con los campos
resumeAfterostartAfter.
Compatibilidad con Stable API
Los flujos de cambios están incluidos en la Stable API V1. Sin embargo, la opción showExpandedEvents no está incluida en la Stable API V1.
Ejemplos
Para crear un cursor de flujo de cambios usando la etapa de agregación, ejecuta el comando aggregate.
var cur = db.names.aggregate( [ { $changeStream: {} } ] )
Para abrir el cursor, ejecuta cur.
Cuando el flujo de cambios detecta un cambio, el método next() devuelve una notificación de evento de cambio. Por ejemplo, tras ejecutar cur.next(), MongoDB devuelve un documento similar al siguiente:
{ "_id": { _data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004" }, "operationType": "insert", "clusterTime": Timestamp({ t: 1659039316, i: 2 }), "wallTime": ISODate("2022-07-28T20:15:16.148Z"), "fullDocument": { "_id": ObjectId("62e2ee54c8756c0d5cf6f072"), "name": "Walker Percy" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") } }
Para usar el controlador MongoDB.NET/C# para agregar una $changeStream etapa a una canalización de agregación, llame a ChangeStream()método en un PipelineDefinition objeto.
El siguiente ejemplo crea una etapa de canalización que devuelve un cursor de flujo de cambio:
var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream();
Puedes usar un objeto ChangeStreamStageOptions para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma operación de $changeStream que el ejemplo anterior, pero especifica las siguientes opciones:
La opción
FullDocumentespecifica que las notificaciones de cambio no incluyen una copia del documento completo cuando se modifican mediante operaciones de actualización.La opción
StartAtOperationTimeespecifica el punto de inicio lógico para el flujo de cambio.
var changeStreamOptions = new ChangeStreamStageOptions() { FullDocument = ChangeStreamFullDocumentOption.Default, StartAtOperationTime = new BsonTimestamp(300), }; var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream(changeStreamOptions);
Nota
Prefiera el método Watch()
Cuando sea posible, utiliza el método IMongoCollection<TDocument>.Watch() en lugar de esta etapa de agregación. Utiliza el método ChangeStream() solo si las etapas posteriores descartan el token de reanudación (_id) o si no deseas que el cursor resultante se reanude automáticamente.
Puedes utilizar tanto el método watch() como el método aggregate() para ejecutar una operación $changeStream. $changeStream devuelve un ChangeStreamCursor cuando se pasa el pipeline de agregación al método watch() en un objeto de MongoDB Collection. $changeStream devuelve un AggregationCursor cuando pasas el pipeline de agregación al método aggregate().
Importante
$changeStream Resumibility
Si pasas un flujo de cambios al método aggregate(), el flujo de cambios no puede reanudarse. Un flujo de cambios solo se reanuda si lo pasas al método watch(). Para obtener más información sobre la reanudabilidad, consulta Reanudar una Change Stream (flujo de cambios).
El siguiente ejemplo crea y ejecuta un pipeline que devuelve un ChangeStreamCursor:
const pipeline = [{ $changeStream: {} }]; const changeStream = collection.watch(pipeline); return changeStream;
Puedes usar un ChangeStreamOptions objeto para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma operación de $changeStream que el ejemplo anterior, pero especifica las siguientes opciones:
La opción
fullDocumentespecifica que si una operación de actualización modifica un documento, las notificaciones de cambio incluyen una copia del documento completo.La opción
startAtOperationTimeespecifica el punto de inicio lógico para el flujo de cambio.
const pipeline = [ { $changeStream: { fullDocument: 'updateLookup', startAtOperationTime: 3000 } } ]; const changeStream = collection.watch(pipeline); return changeStream;
Obtén más información
Para obtener más información sobre las notificaciones de flujos de cambios, consulta Eventos de cambio.
Para aprender más sobre las etapas relacionadas del pipeline, consulta la guía $changeStreamSplitLargeEvent.