Definición
$changeStreamDevuelve un Cambiar el cursor de flujo en una colección, una base de datos o un clúster completo. Debe usarse como primera etapa de una canalización de agregación.
El
$changeStreamstage tiene la siguiente sintaxis:{ $changeStream: { allChangesForCluster: <boolean>, fullDocument: <string>, fullDocumentBeforeChange: <string>, resumeAfter: <document> showExpandedEvents: <boolean>, startAfter: <document> startAtOperationTime: <timestamp> } } ParameterDescripciónallChangesForClusterOpcional: Establece si el flujo de cambios debe incluir todos los cambios del clúster. Solo se puede abrir en la base de datos
admin.fullDocumentOpcional: especifica si las notificaciones de cambio incluyen una copia del documento completo cuando se modifican mediante operaciones
update.default:Las notificaciones de cambio no incluyen el documento completo para las operacionesupdate.requiredLas notificaciones de cambios incluyen una copia del documento modificado tal como aparecía inmediatamente después del cambio. Si no se encuentra el documento, el flujo de cambios genera un error.Para utilizar esta opción, primero debe utilizar el comando para habilitar
collModlachangeStreamPreAndPostImagesopción.Novedades en la versión 6.0.
updateLookupLas notificaciones de cambio incluyen una copia del documento modificado por el cambio. Este documento es el documento mayoritariamente confirmado onullsi ya no existe.whenAvailable:La notificación de cambio incluye una copia del documento modificado tal como apareció inmediatamente después del cambio onullsi el documento no está disponible.Para utilizar esta opción, primero debe utilizar el comando para habilitar
collModlachangeStreamPreAndPostImagesopción.Novedades en la versión 6.0.
En el caso de actualizaciones parciales, la notificación de cambio también proporciona una descripción del cambio.
fullDocumentBeforeChangeIncluya el documento completo anterior al cambio. Este campo acepta los siguientes valores:
off:Desactiva la inclusión del documento anterior al cambio.whenAvailableIncluye el documento anterior al cambio. La consulta no falla si el documento sin modificar no está disponible.requiredIncluye el documento anterior al cambio. La consulta falla si el documento sin modificar no está disponible.
resumeAfterOpcional. Especifica un token de reanudación como punto de inicio lógico para el flujo de cambios. No se puede usar para reanudar el flujo de cambios después de un
invalidateevento.resumeAfteres mutuamente excluyente constartAfterystartAtOperationTime.showExpandedEventsEspecifica si se deben incluir eventos de cambio adicionales, como operaciones de índice y DDL.
Novedades en la versión 6.0.
startAfterOpcional. Especifica un token de reanudación como el punto de inicio lógico para el flujo de cambios. A diferencia de
resumeAfter,startAfterpuede reanudar las notificaciones después de un eventoinvalidatecreando un nuevo flujo de cambios.startAfteres mutuamente excluyente conresumeAfterystartAtOperationTime.startAtOperationTimeEspecifica una hora como punto de inicio lógico para el flujo de cambios. No se puede usar con los campos
resumeAfterostartAfter.
Compatibilidad con Stable API
Los flujos de cambios están incluidos en la Stable API V1. Sin embargo, la opción showExpandedEvents no está incluida en la Stable API V1.
Ejemplos
Para crear un cursor de flujo de cambio utilizando la etapa de agregación, ejecute el aggregate comando.
var cur = db.names.aggregate( [ { $changeStream: {} } ] )
Para abrir el cursor, ejecute cur.
Cuando el flujo de cambios detecta un cambio, el método next() devuelve una notificación de evento de cambio. Por ejemplo, tras ejecutar cur.next(), MongoDB devuelve un documento similar al siguiente:
{ "_id": { _data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004" }, "operationType": "insert", "clusterTime": Timestamp({ t: 1659039316, i: 2 }), "wallTime": ISODate("2022-07-28T20:15:16.148Z"), "fullDocument": { "_id": ObjectId("62e2ee54c8756c0d5cf6f072"), "name": "Walker Percy" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") } }
Para usar el controlador MongoDB.NET/C# para agregar una $changeStream etapa a una canalización de agregación, llame a ChangeStream()método en un PipelineDefinition objeto.
El siguiente ejemplo crea una etapa de canalización que devuelve un cursor de flujo de cambio:
var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream();
Puede usar un objeto ChangeStreamStageOptions para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma $changeStream operación que el ejemplo anterior, pero especifica las siguientes opciones:
La opción
FullDocumentespecifica que las notificaciones de cambio no incluyen una copia del documento completo cuando se modifican mediante operaciones de actualización.La opción
StartAtOperationTimeespecifica el punto de inicio lógico para el flujo de cambio.
var changeStreamOptions = new ChangeStreamStageOptions() { FullDocument = ChangeStreamFullDocumentOption.Default, StartAtOperationTime = new BsonTimestamp(300), }; var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream(changeStreamOptions);
Nota
Prefiera el método Watch()
Siempre que sea posible, utilice el método IMongoCollection<TDocument>.Watch() en lugar de esta etapa de agregación. Utilice el método ChangeStream() solo si las etapas posteriores proyectan el token de reanudación (_id) o si no desea que el cursor resultante se reanude automáticamente.
Puede utilizar tanto el watch() método como el aggregate() método para ejecutar una $changeStream operación. $changeStream devuelve un ChangeStreamCursor cuando pasa la canalización de agregación al método watch() en un Collection objeto MongoDB. $changeStream devuelve un AggregationCursor cuando pasa la canalización de agregación al método added().
Importante
$changeStream Resumibility
Si se pasa un flujo de cambios al método added(), este no se puede reanudar. Un flujo de cambios solo se reanuda si se pasa al método watch(). Para obtener más información sobre la reanudabilidad, consulte Reanudar un flujo de cambios.
El siguiente ejemplo crea y ejecuta una canalización que devuelve un ChangeStreamCursor:
const pipeline = [{ $changeStream: {} }]; const changeStream = collection.watch(pipeline); return changeStream;
Puede usar un objeto ChangeStreamOptions para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma $changeStream operación que el ejemplo anterior, pero especifica las siguientes opciones:
La opción
fullDocumentespecifica que si una operación de actualización modifica un documento, las notificaciones de cambio incluyen una copia del documento completo.La opción
startAtOperationTimeespecifica el punto de inicio lógico para el flujo de cambio.
const pipeline = [ { $changeStream: { fullDocument: 'updateLookup', startAtOperationTime: 3000 } } ]; const changeStream = collection.watch(pipeline); return changeStream;
Obtén más información
Para obtener más información sobre las notificaciones de flujo de cambios, consulte Eventos de cambio.
Para aprender más sobre las etapas relacionadas del pipeline, consulta la guía $changeStreamSplitLargeEvent.