Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

$changeStream (etapa de agregación)

$changeStream

Devuelve un(a) Change Stream cursor en una colección, una base de datos o en todo un clúster. Debe usarse como la primera etapa en un pipeline de agregación.

La $changeStream la etapa tiene la siguiente sintaxis:

{
$changeStream: {
allChangesForCluster: <boolean>,
fullDocument: <string>,
fullDocumentBeforeChange: <string>,
resumeAfter: <document>
showExpandedEvents: <boolean>,
startAfter: <document>
startAtOperationTime: <timestamp>
}
}
Parameter
Descripción

allChangesForCluster

Opcional: establece si el flujo de cambios debe incluir todos los cambios en el clúster. Solo puede abrirse en la base de datos admin.

fullDocument

Opcional: especifica si las notificaciones de cambios incluyen una copia del documento completo cuando es modificado por operaciones update.

  • default: Las notificaciones de cambios no incluyen el documento completo para las operaciones de update.

  • requiredLas notificaciones de cambio incluyen una copia del documento modificado tal como apareció inmediatamente después del cambio. Si no se puede encontrar el documento, el flujo de cambios genera un error.

    Para usar esta opción, primero debes usar el comando collMod para habilitar la opción changeStreamPreAndPostImages.

    Novedades en la versión 6.0.

  • updateLookup: Las notificaciones de cambio incluyen una copia del documento modificado por el cambio. Este documento es el documento actual comprometido por mayoría o null si ya no existe.

  • whenAvailableNotificación de cambio incluye una copia del documento modificado tal como apareció inmediatamente después del cambio o null si el documento no está disponible.

    Para usar esta opción, primero debes usar el comando collMod para habilitar la opción changeStreamPreAndPostImages.

    Novedades en la versión 6.0.

En el caso de actualizaciones parciales, la notificación de cambio también proporciona una descripción del cambio.

fullDocumentBeforeChange

Incluya el documento completo anterior al cambio. Este campo acepta los siguientes valores:

  • off: Desactiva la inclusión del documento de antes del cambio.

  • whenAvailable: Incluye el documento del antes del cambio. La query no falla si el documento no modificado no está disponible.

  • requiredIncluye documentos anteriores al cambio. La query falla si el documento sin modificar no está disponible.

resumeAfter

opcional. Especifica un token de reanudación (resume token) como el punto de partida lógico para el flujo de cambios. No se puede usar para reanudar el flujo de cambios después de un invalidate evento.

resumeAfter es mutuamente excluyente con startAfter y startAtOperationTime.

showExpandedEvents

Especifica si se deben incluir eventos de cambios adicionales, como las operaciones de DDL e índices.

Novedades en la versión 6.0.

startAfter

Opcional. Especifica un token de reanudación como el punto de inicio lógico para el flujo de cambios. A diferencia de resumeAfter, startAfter puede reanudar las notificaciones después de un evento invalidate creando un nuevo flujo de cambios.

startAfter es mutuamente excluyente con resumeAfter y startAtOperationTime.

startAtOperationTime

Especifica un momento como punto de partida lógico para el flujo de cambios. No puede usarse con los campos resumeAfter o startAfter.

Los flujos de cambios están incluidos en la Stable API V1. Sin embargo, la opción showExpandedEvents no está incluida en la Stable API V1.

Para crear un cursor de flujo de cambios usando la etapa de agregación, ejecuta el comando aggregate.

var cur = db.names.aggregate( [
{ $changeStream: {} }
] )

Para abrir el cursor, ejecuta cur.

Cuando el flujo de cambios detecta un cambio, el método next() devuelve una notificación de evento de cambio. Por ejemplo, tras ejecutar cur.next(), MongoDB devuelve un documento similar al siguiente:

{
"_id": {
_data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004"
},
"operationType": "insert",
"clusterTime": Timestamp({ t: 1659039316, i: 2 }),
"wallTime": ISODate("2022-07-28T20:15:16.148Z"),
"fullDocument": {
"_id": ObjectId("62e2ee54c8756c0d5cf6f072"),
"name": "Walker Percy"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") }
}

Para usar el controlador MongoDB.NET/C# para agregar una $changeStream etapa a una canalización de agregación, llame a ChangeStream()método en un PipelineDefinition objeto.

El siguiente ejemplo crea una etapa de canalización que devuelve un cursor de flujo de cambio:

var pipeline = new EmptyPipelineDefinition<Movie>()
.ChangeStream();

Puedes usar un objeto ChangeStreamStageOptions para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma operación de $changeStream que el ejemplo anterior, pero especifica las siguientes opciones:

  • La opción FullDocument especifica que las notificaciones de cambio no incluyen una copia del documento completo cuando se modifican mediante operaciones de actualización.

  • La opción StartAtOperationTime especifica el punto de inicio lógico para el flujo de cambio.

var changeStreamOptions = new ChangeStreamStageOptions()
{
FullDocument = ChangeStreamFullDocumentOption.Default,
StartAtOperationTime = new BsonTimestamp(300),
};
var pipeline = new EmptyPipelineDefinition<Movie>()
.ChangeStream(changeStreamOptions);

Nota

Prefiera el método Watch()

Cuando sea posible, utiliza el método IMongoCollection<TDocument>.Watch() en lugar de esta etapa de agregación. Utiliza el método ChangeStream() solo si las etapas posteriores descartan el token de reanudación (_id) o si no deseas que el cursor resultante se reanude automáticamente.

Puedes utilizar tanto el método watch() como el método aggregate() para ejecutar una operación $changeStream. $changeStream devuelve un ChangeStreamCursor cuando se pasa el pipeline de agregación al método watch() en un objeto de MongoDB Collection. $changeStream devuelve un AggregationCursor cuando pasas el pipeline de agregación al método aggregate().

Importante

$changeStream Resumibility

Si pasas un flujo de cambios al método aggregate(), el flujo de cambios no puede reanudarse. Un flujo de cambios solo se reanuda si lo pasas al método watch(). Para obtener más información sobre la reanudabilidad, consulta Reanudar una Change Stream (flujo de cambios).

El siguiente ejemplo crea y ejecuta un pipeline que devuelve un ChangeStreamCursor:

const pipeline = [{ $changeStream: {} }];
const changeStream = collection.watch(pipeline);
return changeStream;

Puedes usar un ChangeStreamOptions objeto para personalizar el comportamiento del flujo de cambios. El siguiente ejemplo realiza la misma operación de $changeStream que el ejemplo anterior, pero especifica las siguientes opciones:

  • La opción fullDocument especifica que si una operación de actualización modifica un documento, las notificaciones de cambio incluyen una copia del documento completo.

  • La opción startAtOperationTime especifica el punto de inicio lógico para el flujo de cambio.

const pipeline = [
{
$changeStream: {
fullDocument: 'updateLookup',
startAtOperationTime: 3000
}
}
];
const changeStream = collection.watch(pipeline);
return changeStream;

Para obtener más información sobre las notificaciones de flujos de cambios, consulta Eventos de cambio.

Para aprender más sobre las etapas relacionadas del pipeline, consulta la guía $changeStreamSplitLargeEvent.

Volver

$bucketAuto

En esta página