Menu Docs
Página inicial do Docs
/ /

$changeStreamSplitLargeEvent (estágio de agregação )

$changeStreamSplitLargeEvent

Novidade no MongoDB 7.0 (e 6.0.9).

Se umchange stream tiver eventos grandes que excedam 16 MB, uma exceção BSONObjectTooLarge será retornada. A partir do MongoDB 7.0 (e 6.0.9), você pode utilizar um estágio $changeStreamSplitLargeEvent para divisão os eventos em fragmentos menores.

Você só deve usar $changeStreamSplitLargeEvent quando estritamente necessário. Por exemplo, se seu aplicativo exigir imagens pré ou pós-documento completo e gerar eventos grandes que excedam 16 MB, use $changeStreamSplitLargeEvent.

Antes de decidir usar $changeStreamSplitLargeEvent, você deve primeiro tentar reduzir o tamanho do evento de alteração. Por exemplo:

  • Não solicite pré ou pós-imagens de documentos, a menos que seu aplicativo as exija. Isso gera campos fullDocument e fullDocumentBeforeChange em mais casos, que normalmente são os maiores objetos em um evento de alteração.

  • Use um estágio $project a fim de incluir apenas os campos necessários para sua aplicação. Isso reduz o tamanho do evento de alteração e evita o tempo adicional para dividir eventos grandes em fragmentos. Assim, mais eventos de alteração podem ser retornados em cada lote.

Você só pode ter um estágio $changeStreamSplitLargeEvent no seu pipeline, e ele deve ser o último. Você só pode usar $changeStreamSplitLargeEvent em um pipeline $changeStream.

$changeStreamSplitLargeEvent sintaxe:

{
$changeStreamSplitLargeEvent: {}
}

$changeStreamSplitLargeEvent divide eventos que excedem 16 MB em fragmentos e retorna os fragmentos sequencialmente usando o cursor de fluxo de alterações.

Os fragmentos são divisão para que o número máximo de campos seja retornado no primeiro fragmento. Isso garante que o contexto do evento seja retornado o mais rápido possível.

Quando o evento de alteração é dividido, somente o tamanho dos campos de nível superior é usado. $changeStreamSplitLargeEvent não processa nem faz a divisão de subdocumentos recursivamente. Por exemplo, se você usar um estágio $project para criar um evento de alteração com um único campo que tem 20 MB de tamanho, o evento não sofrerá uma divisão, e o estágio retornará um erro.

Cada fragmento possui um token de retomada. Um stream retomado usando o token de um fragmento irá:

  • Iniciar um novo fluxo a partir do fragmento subsequente.

  • Comece no próximo evento se continuar a partir do fragmento final da sequência.

Cada fragmento de um evento inclui um documento splitEvent :

splitEvent: {
fragment: <int>,
of: <int>
}

A tabela a seguir descreve os campos.

Campo
Descrição

fragment

Índice de fragmento, a partir de 1.

of

Número total de fragmentos do evento.

O cenário de exemplo nesta seção mostra o uso de $changeStreamSplitLargeEvent com uma nova coleção denominada myCollection.

Crie myCollection e insira um documento com pouco menos 16 MB de dados:

db.myCollection.insertOne(
{ _id: 0, largeField: "a".repeat( 16 * 1024 * 1024 - 1024 ) }
)

largeField contém a letra repetida a.

Ative ChangeStreamPreandPostImages para myCollection, o que permite que um fluxo de alterações recupere um documento como estava antes de uma atualização (pré-imagem) e depois de uma atualização (pós-imagem):

db.runCommand( {
collMod: "myCollection",
changeStreamPreAndPostImages: { enabled: true }
} )

Crie um cursor de fluxo de alterações para monitorar as alterações em myCollection usando db.collection.watch():

myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ],
{ fullDocument: "required", fullDocumentBeforeChange: "required" }
)

Para o evento de fluxo de alterações:

  • fullDocument: "required" inclui a pós-imagem do documento.

  • fullDocumentBeforeChange: "required" inclui a pré-imagem do documento.

Para detalhes, consulte $changeStream.

Atualize o documento em myCollection, que também produz um evento de fluxo de alteração com as imagens pré e pós do documento :

db.myCollection.updateOne(
{ _id: 0 },
{ $set: { largeField: "b".repeat( 16 * 1024 * 1024 - 1024 ) } }
)

largeField agora contém a letra repetida b.

Recupere os fragmentos de myChangeStreamCursor usando o método next() e armazene os fragmentos em objetos chamados firstFragment, secondFragment e thirdFragment:

const firstFragment = myChangeStreamCursor.next()
const secondFragment = myChangeStreamCursor.next()
const thirdFragment = myChangeStreamCursor.next()

Mostrar firstFragment.splitEvent:

firstFragment.splitEvent

Saída com os detalhes do fragmento:

splitEvent: { fragment: 1, of: 3 }

Da mesma forma, secondFragment.splitEvent e thirdFragment.splitEvent retornam:

splitEvent: { fragment: 2, of: 3 }
splitEvent: { fragment: 3, of: 3 }

Para examinar as chaves de objeto para firstFragment:

Object.keys( firstFragment )

Saída:

[
'_id',
'splitEvent',
'wallTime',
'clusterTime',
'operationType',
'documentKey',
'ns',
'fullDocument'
]

Para examinar o tamanho em bytes de firstFragment.fullDocument:

bsonsize( firstFragment.fullDocument )

Saída:

16776223

secondFragment contém a pré-imagem fullDocumentBeforeChange , que tem aproximadamente 16 MB de tamanho. O exemplo a seguir mostra as chaves de objeto para secondFragment:

Object.keys( secondFragment )

Saída:

[ '_id', 'splitEvent', 'fullDocumentBeforeChange' ]

thirdFragment contém o campo updateDescription , que tem aproximadamente 16 MB de tamanho. O exemplo a seguir mostra as chaves de objeto para thirdFragment:

Object.keys( thirdFragment )

Saída:

[ '_id', 'splitEvent', 'updateDescription' ]

Para usar o driver MongoDB .NET/C# para adicionar um estágio $changeStreamSplitLargeEvent a um pipeline de agregação, chame o método ChangeStreamSplitLargeEvent() em um objeto PipelineDefinition.

O exemplo a seguir cria um estágio de pipeline que divide os eventos que excedem 16 MB em fragmentos e os retorna sequencialmente em um cursor do fluxo de alterações. O campo splitEvent em cada fragmento mostra o índice de fragmento e a contagem total.

[BsonIgnoreExtraElements]
public class LargeDocument
{
[BsonId]
public int Id { get; set; }
public string LargeField { get; set; } = null!;
}
var client = new MongoClient("<connection-string>");
var db = client.GetDatabase("change_stream_test");
var collection = db.GetCollection<LargeDocument>("largeEventCollection");
collection.InsertOne(new LargeDocument
{
Id = 0,
LargeField = new string('a', 16 * 1024 * 1024 - 1024)
});
db.RunCommand<BsonDocument>(new BsonDocument
{
{ "collMod", "largeEventCollection" },
{ "changeStreamPreAndPostImages", new BsonDocument("enabled", true) }
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<LargeDocument>>()
.ChangeStreamSplitLargeEvent();
var watchOptions = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.Required,
FullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.Required
};
using var cursor = collection.Watch(pipeline, watchOptions);
collection.UpdateOne(
Builders<LargeDocument>.Filter.Eq(d => d.Id, 0),
Builders<LargeDocument>.Update.Set(d => d.LargeField, new string('b', 16 * 1024 * 1024 - 1024))
);
while (cursor.MoveNext())
{
foreach (var fragment in cursor.Current)
{
Console.WriteLine(fragment.BackingDocument["splitEvent"].ToJson());
}
}
{ "fragment" : 1, "of" : 3 }
{ "fragment" : 2, "of" : 3 }
{ "fragment" : 3, "of" : 3 }

Você pode usar o método watch() e o método aggregate() para executar uma operação $changeStreamSplitLargeEvent. $changeStreamSplitLargeEvent retorna um ChangeStreamCursor quando você passa o pipeline de agregação para o método watch() em um objeto MongoDB Collection. $changeStreamSplitLargeEvent retorna um AggregationCursor quando você passa o pipeline de agregação para o método aggregate().

Importante

Retomabilidade de $changeStreamSplitLargeEvent

Se você passar um change stream para o método aggregate(), o change stream não poderá ser retomado. Um change stream só é retomado se você passá-lo para o método watch(). Para saber mais sobre a capacidade de retomada, consulte Retomar um fluxo de alterações.

O exemplo a seguir divide eventos que excedem 16 MB em fragmentos e os retorna sequencialmente em um ChangeStreamCursor:

const pipeline = [{ changeStreamSplitLargeEvent: {} }];
const changeStream = collection.watch(pipeline);
return changeStream;

Para mais informações sobre como alterar as notificações de transmissão, consulte Alterar eventos.

Para saber mais sobre os estágios de pipeline relacionados, consulte o guia$changeStream.

Voltar

$changeStream

Nesta página