Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
Click here >
Docs Menu
Docs Home
/ /

$changeStreamSplitLargeEvent (etapa de agregación)

$changeStreamSplitLargeEvent

Novedades en MongoDB 7.0 (y 6.0.9).

Si un flujo de cambios tiene eventos grandes que superan los 16 MB, se devuelve una excepción BSONObjectTooLarge. A partir de MongoDB 7.0 (y 6.0.9), puede usar una etapa $changeStreamSplitLargeEvent para dividir los eventos en fragmentos más pequeños.

Solo debes usar $changeStreamSplitLargeEvent cuando sea estrictamente necesario. Por ejemplo, si tu aplicación requiere imágenes completas de documentos previas o posteriores, y genera eventos grandes que superan 16 MB, utiliza $changeStreamSplitLargeEvent.

Antes de decidir usar $changeStreamSplitLargeEvent, primero debes intentar reducir el tamaño del evento de cambio. Por ejemplo:

  • No solicites imágenes previas o posteriores de documentos a menos que tu aplicación las requiera. Esto genera los campos fullDocument y fullDocumentBeforeChange en más casos, que suelen ser los objetos más grandes en un evento de cambio.

  • Usa una etapa $project para incluir solo los campos necesarios para tu aplicación. Esto reduce el tamaño del evento de cambio y evita el tiempo adicional de dividir grandes eventos en fragmentos. Esto permite que se devuelvan más eventos de cambio en cada lote.

Solo puede tener una etapa $changeStreamSplitLargeEvent en su pipeline y debe ser la última etapa. Solo puede usar $changeStreamSplitLargeEvent en una pipeline de $changeStream.

$changeStreamSplitLargeEvent sintaxis:

{
$changeStreamSplitLargeEvent: {}
}

$changeStreamSplitLargeEvent Divide el evento que supera los 16 MB en fragmentos y los devuelve secuencialmente usando el cursor de flujo de cambios.

Los fragmentos se dividen para que la mayor cantidad de campos se devuelvan en el primer fragmento. Esto garantiza que el contexto del evento se devuelva lo más rápido posible.

Cuando el evento de cambio se divide, solo se utiliza el tamaño de los campos de nivel superior. $changeStreamSplitLargeEvent no procesa ni divide recursivamente los subdocumentos. Por ejemplo, si usas una etapa $project para crear un evento de cambio con un solo campo de 20 MB de tamaño, el evento no se divide y la etapa retornará un error.

Cada fragmento tiene un token de reanudación. Un flujo que se reanuda usando el token de un fragmento realizará una de las siguientes acciones:

  • Comenzar un nuevo flujo desde el fragmento subsiguiente.

  • Comienza en el siguiente evento si estás reanudando a partir del fragmento final en la secuencia.

Cada fragmento de un evento incluye un splitEvent documento:

splitEvent: {
fragment: <int>,
of: <int>
}

La siguiente tabla describe los campos.

Campo
Descripción

fragment

Índice de fragmentos, comenzando en 1.

of

Número total de fragmentos para el evento.

El escenario de ejemplo en esta sección muestra el uso de $changeStreamSplitLargeEvent con una nueva colección llamada myCollection.

Cree myCollection e inserte un documento con poco menos de 16 MB de datos:

db.myCollection.insertOne(
{ _id: 0, largeField: "a".repeat( 16 * 1024 * 1024 - 1024 ) }
)

largeField contiene la letra repetida a.

Habilita changeStreamPreAndPostImages para myCollection, lo que permite que un flujo de cambios recupere un documento tal como era antes de una actualización (imagen previa) y después de una actualización (imagen posterior):

db.runCommand( {
collMod: "myCollection",
changeStreamPreAndPostImages: { enabled: true }
} )

Crea un cursor de flujo de cambios para supervisar los cambios en myCollection utilizando db.collection.watch():

myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ],
{ fullDocument: "required", fullDocumentBeforeChange: "required" }
)

Para el evento de flujo de cambios:

  • fullDocument: "required" incluye la imagen posterior al documento.

  • fullDocumentBeforeChange: "required" incluye la preimagen del documento.

Para obtener más detalles, consulte $changeStream.

Actualiza el documento en myCollection, lo que también produce un evento de flujo de cambios con las imágenes previas y posteriores del documento:

db.myCollection.updateOne(
{ _id: 0 },
{ $set: { largeField: "b".repeat( 16 * 1024 * 1024 - 1024 ) } }
)

largeField ahora contiene la letra repetida b.

Recupera los fragmentos de myChangeStreamCursor utilizando el método next() y almacena los fragmentos en objetos llamados firstFragment, secondFragment y thirdFragment:

const firstFragment = myChangeStreamCursor.next()
const secondFragment = myChangeStreamCursor.next()
const thirdFragment = myChangeStreamCursor.next()

Mostrar firstFragment.splitEvent:

firstFragment.splitEvent

Salida con los detalles del fragmento:

splitEvent: { fragment: 1, of: 3 }

De manera similar, secondFragment.splitEvent y thirdFragment.splitEvent devuelven:

splitEvent: { fragment: 2, of: 3 }
splitEvent: { fragment: 3, of: 3 }

Para examinar las claves de objeto para firstFragment:

Object.keys( firstFragment )

Salida:

[
'_id',
'splitEvent',
'wallTime',
'clusterTime',
'operationType',
'documentKey',
'ns',
'fullDocument'
]

Para examinar el tamaño en bytes de firstFragment.fullDocument:

bsonsize( firstFragment.fullDocument )

Salida:

16776223

secondFragment contiene la preimagen fullDocumentBeforeChange, que tiene un tamaño aproximado de 16 MB. El siguiente ejemplo muestra las claves de objeto para secondFragment:

Object.keys( secondFragment )

Salida:

[ '_id', 'splitEvent', 'fullDocumentBeforeChange' ]

thirdFragment contiene el campo updateDescription, que tiene un tamaño aproximado de 16 MB. El siguiente ejemplo muestra las claves de objeto para thirdFragment:

Object.keys( thirdFragment )

Salida:

[ '_id', 'splitEvent', 'updateDescription' ]

Para usar el controlador MongoDB .NET/C# para agregar una etapa $changeStreamSplitLargeEvent a una canalización de agregación, llame al ChangeStreamSplitLargeEvent() método en un objeto PipelineDefinition.

El siguiente ejemplo crea una etapa de canalización que divide los eventos que superan los 16 MB en fragmentos y los devuelve secuencialmente en un cursor de flujo de cambios. El campo splitEvent en cada fragmento muestra el índice del fragmento y el recuento total.

[BsonIgnoreExtraElements]
public class LargeDocument
{
[BsonId]
public int Id { get; set; }
public string LargeField { get; set; } = null!;
}
var client = new MongoClient("<connection-string>");
var db = client.GetDatabase("change_stream_test");
var collection = db.GetCollection<LargeDocument>("largeEventCollection");
collection.InsertOne(new LargeDocument
{
Id = 0,
LargeField = new string('a', 16 * 1024 * 1024 - 1024)
});
db.RunCommand<BsonDocument>(new BsonDocument
{
{ "collMod", "largeEventCollection" },
{ "changeStreamPreAndPostImages", new BsonDocument("enabled", true) }
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<LargeDocument>>()
.ChangeStreamSplitLargeEvent();
var watchOptions = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.Required,
FullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.Required
};
using var cursor = collection.Watch(pipeline, watchOptions);
collection.UpdateOne(
Builders<LargeDocument>.Filter.Eq(d => d.Id, 0),
Builders<LargeDocument>.Update.Set(d => d.LargeField, new string('b', 16 * 1024 * 1024 - 1024))
);
while (cursor.MoveNext())
{
foreach (var fragment in cursor.Current)
{
Console.WriteLine(fragment.BackingDocument["splitEvent"].ToJson());
}
}
{ "fragment" : 1, "of" : 3 }
{ "fragment" : 2, "of" : 3 }
{ "fragment" : 3, "of" : 3 }

Puedes utilizar tanto el método watch() como el método aggregate() para ejecutar una operación $changeStreamSplitLargeEvent. $changeStreamSplitLargeEvent devuelve un ChangeStreamCursor cuando se pasa el pipeline de agregación al método watch() en un objeto de MongoDB Collection. $changeStreamSplitLargeEvent devuelve un AggregationCursor cuando pasas el pipeline de agregación al método aggregate().

Importante

$changeStreamSplitLargeEvent Resumibility

Si pasas un flujo de cambios al método aggregate(), el flujo de cambios no puede reanudarse. Un flujo de cambios solo se reanuda si lo pasas al método watch(). Para obtener más información sobre la reanudabilidad, consulta Reanudar una Change Stream (flujo de cambios).

El siguiente ejemplo divide los eventos que superan los 16 MB en fragmentos y los devuelve secuencialmente en un ChangeStreamCursor:

const pipeline = [{ changeStreamSplitLargeEvent: {} }];
const changeStream = collection.watch(pipeline);
return changeStream;

Para obtener más información sobre las notificaciones de flujos de cambios, consulta Eventos de cambio.

Para aprender más sobre las etapas relacionadas del pipeline, consulta la guía $changeStream.

Volver

$changeStream

En esta página