Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Menu Docs
Página inicial do Docs
/ / /
Driver C#/ .NET
/

Monitorar alterações de dados

Neste guia, você pode aprender como usar um fluxo de alterações para monitorar alterações em tempo real em seus dados. Um change stream é uma funcionalidade do MongoDB Server que permite que seu aplicação se inscreva em alterações de dados em uma collection, banco de dados de dados ou sistema.

Os exemplos deste guia usam a coleção sample_restaurants.restaurants dosconjuntos de dados de amostra do Atlas. Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de exemplo, consulte aIntrodução ao driver .NET/C#do .

Os exemplos desta página utilizam as seguintes classes Restaurant, Address e GradeEntry como modelos:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

Observação

Os documentos na collection restaurants usam a convenção de nomenclatura snake-case. Os exemplos neste guia usam um ConventionPack para desserializar os campos na coleção em maiúsculas e minúsculas Pascal e mapeá-los para as propriedades na classe Restaurant .

Para saber mais sobre serialização personalizada, consulte Serialização personalizada.

Para abrir um change stream, chame o método Watch() ou WatchAsync() . A instância na qual você chama o método determina o escopo de eventos que o change stream escuta. Você pode chamar o método Watch() ou WatchAsync() nas seguintes classes:

  • MongoClient: Para monitorar todas as alterações no sistema MongoDB

  • Database: Para monitorar alterações em todas as coleções no banco de dados

  • Collection: Para monitorar alterações na coleção

O exemplo a seguir abre um fluxo de alteração na coleção restaurants e gera as alterações conforme elas ocorrem. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente.

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});

Para começar a observar as alterações, execute o aplicação. Em seguida, em um aplicação ou shell separado, modifique a coleção restaurants . Atualizar um documento que tenha um valor de resulta na seguinte saída do "name" "Blarney Castle" :

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

Você pode passar o parâmetro pipeline para os métodos Watch() e WatchAsync() para modificar a saída do change stream. Esse parâmetro permite que você observe somente eventos de alteração especificados. Crie o pipeline usando a classe EmptyPipelineDefinition e anexando os métodos relevantes do estágio de agregação .

Você pode especificar os seguintes estágios de agregação no parâmetro pipeline :

  • $addFields

  • $changeStreamSplitLargeEvent

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

Dica

Para saber como criar um agregação pipeline usando a classe PipelineDefinitionBuilder, consulte Estágios do aggregation pipeline no guia Operações com construtores.

Para saber mais sobre como modificar a saída do change stream, consulte a seção Modificar a saída do change stream no manual do MongoDB Server .

O exemplo a seguir usa o parâmetro pipeline para abrir um fluxo de alterações que registra somente operações de atualização. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}

Se o seu aplicação gerar eventos de alteração que excedam 16 MB, o servidor retornará um erro BSONObjectTooLarge. Para evitar esse erro, você pode usar o estágio de pipeline $changeStreamSplitLargeEvent para divisão os eventos em fragmentos menores. A API de agregação do driver .NET /C# inclui o método ChangeStreamSplitLargeEvent(), que você pode usar para adicionar o estágio $changeStreamSplitLargeEvent ao pipeline do change stream.

Este exemplo instrui o driver a observar as alterações e divisão os eventos de alteração que excedam o limite de 16 MB. O código imprime o documento de alteração para cada evento e chama métodos assistente para reagrupar quaisquer fragmentos de evento :

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}

Observação

Recomendamos reagrupar fragmentos de evento de alteração, como mostrado no exemplo anterior, mas esta etapa é opcional. Você pode usar a mesma lógica para assistir a eventos de alteração divisão e completos.

O exemplo anterior utiliza os métodos GetNextChangeStreamEvent(), GetNextChangeStreamEventAsync() e MergeFragment() para reagrupar fragmentos de evento de alteração em um único change stream documento. O seguinte código define estes métodos:

// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}

Dica

Para saber mais sobre a divisão de grandes eventos de alteração, consulte $changeStreamSplitLargeEvent no manual do MongoDB Server .

Os métodos Watch() e WatchAsync() aceitam parâmetros opcionais, que representam opções que você pode utilizar para configurar a operação. Se você não especificar nenhuma opção, o driver não personalizará a operação.

A tabela a seguir descreve as opções que você pode definir para personalizar o comportamento de Watch() e WatchAsync():

Opção
Descrição

FullDocument

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

FullDocumentBeforeChange

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

ResumeAfter

Directs Watch() or WatchAsync() to resume returning changes after the operation specified in the resume token.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
ResumeAfter is mutually exclusive with StartAfter and StartAtOperationTime.

StartAfter

Directs Watch() or WatchAsync() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
StartAfter is mutually exclusive with ResumeAfter and StartAtOperationTime.

StartAtOperationTime

Directs Watch() or WatchAsync() to return only events that occur after the specified timestamp.
StartAtOperationTime is mutually exclusive with ResumeAfter and StartAfter.

MaxAwaitTime

Specifies the maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds.

ShowExpandedEvents

Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True.

batchSize

Specifies the maximum number of documents that a change stream can return in each batch, which applies to Watch() or WatchAsync(). If the batchSize option is not set, watch functions have an initial batch size of 101 documents and a maximum size of 16 mebibytes (MiB) for each subsequent batch. This option can enforce a smaller limit than 16 MiB, but not a larger one. If you set batchSize to a limit that result in batches larger than 16 MiB, this option has no effect and Watch() or WatchAsync() uses the default batch size.

Collation

Specifies the collation to use for the change stream cursor. See the Collation section of this page for more information.

Comment

Attaches a comment to the operation.

Para configurar o agrupamento para sua operação, crie uma instância da classe Agrupamento .

A tabela seguinte descreve os parâmetros que o construtor do Collation aceita. Ela também lista a propriedade de classe correspondente que você pode usar para ler o valor de cada configuração.

Parâmetro
Descrição
Propriedade de classe

locale

Specifies the International Components for Unicode (ICU) locale. For a list of supported locales, see Collation Locales and Default Parameters in the MongoDB Server Manual.

If you want to use simple binary comparison, use the Collation.Simple static property to return a Collation object with the locale set to "simple".
Data Type: string

Locale

caseLevel

(Optional) Specifies whether to include case comparison.

When this argument is true, the driver's behavior depends on the value of the strength argument:

- If strength is CollationStrength.Primary, the driver compares base characters and case.
- If strength is CollationStrength.Secondary, the driver compares base characters, diacritics, other secondary differences, and case.
- If strength is any other value, this argument is ignored.

When this argument is false, the driver doesn't include case comparison at strength level Primary or Secondary.

Data Type: boolean
Default: false

CaseLevel

caseFirst

(Optional) Specifies the sort order of case differences during tertiary level comparisons.

Default: CollationCaseFirst.Off

CaseFirst

strength

(Optional) Specifies the level of comparison to perform, as defined in the ICU documentation.

Default: CollationStrength.Tertiary

Strength

numericOrdering

(Optional) Specifies whether the driver compares numeric strings as numbers.

If this argument is true, the driver compares numeric strings as numbers. For example, when comparing the strings "10" and "2", the driver treats the values as 10 and 2, and finds 10 to be greater.

If this argument is false or excluded, the driver compares numeric strings as strings. For example, when comparing the strings "10" and "2", the driver compares one character at a time. Because "1" is less than "2", the driver finds "10" to be less than "2".

For more information, see Collation Restrictions in the MongoDB Server manual.

Data Type: boolean
Default: false

NumericOrdering

alternate

(Optional) Specifies whether the driver considers whitespace and punctuation as base characters for purposes of comparison.

Default: CollationAlternate.NonIgnorable (spaces and punctuation are considered base characters)

Alternate

maxVariable

(Optional) Specifies which characters the driver considers ignorable when the alternate argument is CollationAlternate.Shifted.

Default: CollationMaxVariable.Punctuation (the driver ignores punctuation and spaces)

MaxVariable

normalization

(Optional) Specifies whether the driver normalizes text as needed.

Most text doesn't require normalization. For more information about normalization, see the ICU documentation.

Data Type: boolean
Default: false

Normalization

backwards

(Optional) Specifies whether strings containing diacritics sort from the back of the string to the front.

Data Type: boolean
Default: false

Backwards

Para obter mais informações sobre agrupamento, consulte a página Agrupamento no manual do MongoDB Server.

Importante

Você pode habilitar pré-imagens e pós-imagens em collections somente se seu sistema usar MongoDB v6.0 ou posterior.

Por padrão, quando você executa uma operação em uma collection, o evento de alteração correspondente inclui somente o delta dos campos modificados por essa operação. Para ver o documento completo antes ou depois de uma alteração, crie um objeto ChangeStreamOptions e especifique as opções FullDocumentBeforeChange ou FullDocument . Em seguida, passe o objeto ChangeStreamOptions para o método Watch() ou WatchAsync() .

A pré-imagem é a versão completa de um documento antes de uma alteração. Para incluir a pré-imagem no evento de fluxo de alteração, defina a opção FullDocumentBeforeChange para um dos seguintes valores:

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração somente se a pré-imagem estiver disponível.

  • ChangeStreamFullDocumentBeforeChangeOption.Required: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração. Se a pré-imagem não estiver disponível, o driver gerará um erro.

A pós-imagem é a versão completa de um documento após uma alteração. Para incluir a pós-imagem na alteração de evento de fluxo, defina a opção FullDocument para um dos seguintes valores:

  • ChangeStreamFullDocumentOption.UpdateLookup: o evento de alteração inclui uma cópia de todo o documento alterado de algum tempo após a alteração.

  • ChangeStreamFullDocumentOption.WhenAvailable: O evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração somente se a pós-imagem estiver disponível.

  • ChangeStreamFullDocumentOption.Required: o evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração. Se a pós-imagem não estiver disponível, o driver gerará um erro.

O exemplo a seguir abre um fluxo de alteração em uma collection e inclui a pós-imagem de documentos atualizados especificando a opção FullDocument . Selecione a aba Synchronous ou Asynchronous para ver o código correspondente.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});

Executar o exemplo de código anterior e atualizar um documento que tenha um valor de resulta na seguinte saída do "name" "Blarney Castle" :

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

Para saber mais sobre pré e pós-imagens, consulte Change Streams com pré e pós-imagens de documentos no manual do MongoDB Server .

Para saber mais sobre fluxos de alterações, consulte Change Streams de alterações no manual do MongoDB Server .

Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API:

Voltar

Monitoramento

Nesta página