Docs Menu
Docs Home
/ /

Supervisar los cambios en los datos

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Los ejemplos de esta guía utilizan el sample_restaurants.restaurants Colección de los conjuntos de datos de muestra de Atlas. Para aprender a crear un clúster gratuito de MongoDB Atlas y cargar los conjuntos de datos de muestra, consulte Introducción al controlador .NET/C#.

En los ejemplos de esta página se utilizan las siguientes clases Restaurant, Address y GradeEntry como modelos:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

Nota

Los documentos de la colección restaurants utilizan la convención de nomenclatura snake-case. Los ejemplos de esta guía utilizan un ConventionPack para deserializar los campos de la colección en notación Pascal y asignarlos a las propiedades de la clase Restaurant.

Para aprender más sobre la serialización personalizada, consultar Serialización personalizada.

Para abrir un flujo de cambios, llame al método Watch() o WatchAsync(). La instancia en la que llama al método determina el alcance de los eventos que el flujo de cambios escucha. Puede llamar al método Watch() o WatchAsync() en las siguientes clases:

  • MongoClient:Para monitorear todos los cambios en la implementación de MongoDB

  • Database:Para monitorear los cambios en todas las colecciones de la base de datos

  • Collection:Para monitorear cambios en la colección

El siguiente ejemplo abre un flujo de cambios en la colección restaurants y muestra los cambios a medida que ocurren. Seleccione el Synchronous o la pestaña Asynchronous para ver el código correspondiente.

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});

Para empezar a observar los cambios, ejecute la aplicación. Luego, en una aplicación o shell independiente, modifique la colección restaurants. Actualizar un documento con un valor "name" de "Blarney Castle" genera la siguiente salida del flujo de cambios:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

Puede pasar el parámetro pipeline a los métodos Watch() y WatchAsync() para modificar la salida del flujo de cambios. Este parámetro le permite observar únicamente los eventos de cambio especificados. Cree la canalización usando la clase EmptyPipelineDefinition y añadiendo los métodos de la etapa de agregación correspondientes.

Puede especificar las siguientes etapas de agregación en el parámetro pipeline:

  • $addFields

  • $changeStreamSplitLargeEvent

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

Tip

Para aprender a crear una canalización de agregación mediante la PipelineDefinitionBuilder clase,consulte Etapas de la canalización de agregación en la guía Operaciones con constructores.

Para obtener más información sobre cómo modificar la salida de su flujo de cambios, consulte la sección Modificar la salida del flujo de cambios en el manual de MongoDB Server.

El siguiente ejemplo utiliza el parámetro pipeline para abrir un flujo de cambios que registra únicamente las operaciones de actualización. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}

Si su aplicación genera eventos de cambio que superan los 16 MB, el servidor devuelve un error BSONObjectTooLarge. Para evitar este error, puede usar la etapa de canalización $changeStreamSplitLargeEvent para dividir los eventos en fragmentos más pequeños. La API de agregación de controladores de .NET/C# incluye el método ChangeStreamSplitLargeEvent(), que puede usar para agregar la etapa $changeStreamSplitLargeEvent a la canalización del flujo de cambios.

Este ejemplo indica al controlador que detecte cambios y divida eventos de cambio que superen el límite de 16 MB. El código imprime el documento de cambio para cada evento e invoca métodos auxiliares para reensamblar los fragmentos de evento:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}

Nota

Recomendamos reensamblar los fragmentos de eventos de cambio, como se muestra en el ejemplo anterior, pero este paso es opcional. Puede usar la misma lógica para supervisar los eventos de cambio divididos y completos.

El ejemplo anterior utiliza los métodos GetNextChangeStreamEvent(), GetNextChangeStreamEventAsync() y MergeFragment() para reensamblar fragmentos de eventos de cambio en un único documento de flujo de cambios. El siguiente código define estos métodos:

// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}

Tip

Para obtener más información sobre cómo dividir eventos de cambio grandes, consulte $changeStreamSplitLargeEvent en el manual de MongoDB Server.

Los métodos Watch() y WatchAsync() aceptan parámetros opcionales, que representan opciones que se pueden usar para configurar la operación. Si no se especifica ninguna opción, el controlador no personaliza la operación.

La siguiente tabla describe las opciones que puede configurar para personalizar el comportamiento de Watch() y WatchAsync():

Opción
Descripción

FullDocument

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

FullDocumentBeforeChange

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

ResumeAfter

Directs Watch() or WatchAsync() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime.

StartAfter

Directs Watch() or WatchAsync() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime.

StartAtOperationTime

Directs Watch() or WatchAsync() to return only events that occur after the specified timestamp.
StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter.

MaxAwaitTime

Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.

ShowExpandedEvents

Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True.

batchSize

Specifies the maximum number of documents that a change stream can return in each batch, which applies to Watch() or WatchAsync(). If the batchSize option is not set, watch functions have an initial batch size of 101 documents and a maximum size of 16 mebibytes (MiB) for each subsequent batch. This option can enforce a smaller limit than 16 MiB, but not a larger one. If you set batchSize to a limit that result in batches larger than 16 MiB, this option has no effect and Watch() or WatchAsync() uses the default batch size.

Collation

Specifies the collation to use for the change stream cursor. See the Collation section of this page for more information.

Comment

Attaches a comment to the operation.

Para configurar la intercalación para tu operación, crea una instancia de la clase intercalación.

La siguiente tabla describe los parámetros que acepta el constructor Collation. También enumera la propiedad de clase correspondiente que se puede usar para leer el valor de cada configuración.

Parameter
Descripción
Propiedad de clase

locale

Specifies the International Components for Unicode (ICU) locale. For a list of supported locales, see Collation Locales and Default Parameters in the MongoDB Server Manual.

If you want to use simple binary comparison, use the Collation.Simple static property to return a Collation object with the locale set to "simple".
Data Type: string

Locale

caseLevel

(Optional) Specifies whether to include case comparison.

When this argument is true, the driver's behavior depends on the value of the strength argument:

- If strength is CollationStrength.Primary, the driver compares base characters and case.
- If strength is CollationStrength.Secondary, the driver compares base characters, diacritics, other secondary differences, and case.
- If strength is any other value, this argument is ignored.

When this argument is false, the driver doesn't include case comparison at strength level Primary or Secondary.

Data Type: boolean
Default: false

CaseLevel

caseFirst

(Optional) Specifies the sort order of case differences during tertiary level comparisons.

Default: CollationCaseFirst.Off

CaseFirst

strength

(Optional) Specifies the level of comparison to perform, as defined in the ICU documentation.

Default: CollationStrength.Tertiary

Strength

numericOrdering

(Optional) Specifies whether the driver compares numeric strings as numbers.

If this argument is true, the driver compares numeric strings as numbers. For example, when comparing the strings "10" and "2", the driver treats the values as 10 and 2, and finds 10 to be greater.

If this argument is false or excluded, the driver compares numeric strings as strings. For example, when comparing the strings "10" and "2", the driver compares one character at a time. Because "1" is less than "2", the driver finds "10" to be less than "2".

For more information, see Collation Restrictions in the MongoDB Server manual.

Data Type: boolean
Default: false

NumericOrdering

alternate

(Optional) Specifies whether the driver considers whitespace and punctuation as base characters for purposes of comparison.

Default: CollationAlternate.NonIgnorable (spaces and punctuation are considered base characters)

Alternate

maxVariable

(Optional) Specifies which characters the driver considers ignorable when the alternate argument is CollationAlternate.Shifted.

Default: CollationMaxVariable.Punctuation (the driver ignores punctuation and spaces)

MaxVariable

normalization

(Optional) Specifies whether the driver normalizes text as needed.

Most text doesn't require normalization. For more information about normalization, see the ICU documentation.

Data Type: boolean
Default: false

Normalization

backwards

(Optional) Specifies whether strings containing diacritics sort from the back of the string to the front.

Data Type: boolean
Default: false

Backwards

Más información sobre la intercalación en la página Intercalación en el manual de MongoDB Server.

Importante

Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.

De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, cree un objeto ChangeStreamOptions y especifique las opciones FullDocumentBeforeChange o FullDocument. A continuación, pase el objeto ChangeStreamOptions al método Watch() o WatchAsync().

La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, configure la FullDocumentBeforeChange opción con uno de los siguientes valores:

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable:El evento de cambio incluye una imagen previa del documento modificado para eventos de cambio solo si la imagen previa está disponible.

  • ChangeStreamFullDocumentBeforeChangeOption.RequiredEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el controlador genera un error.

La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, configure la FullDocument opción con uno de los siguientes valores:

  • ChangeStreamFullDocumentOption.UpdateLookup:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.

  • ChangeStreamFullDocumentOption.WhenAvailable:El evento de cambio incluye una imagen posterior del documento modificado solo para eventos de cambio si la imagen posterior está disponible.

  • ChangeStreamFullDocumentOption.RequiredEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el controlador genera un error.

El siguiente ejemplo abre un flujo de cambios en una colección e incluye la imagen posterior de los documentos actualizados mediante la opción FullDocument. Seleccione la pestaña Synchronous o Asynchronous para ver el código correspondiente.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});

Ejecutar el ejemplo de código anterior y actualizar un documento que tenga un valor de "name" igual a "Blarney Castle" resulta en la siguiente salida del flujo de cambios:

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Monitoring

En esta página