Docs Menu
Docs Home
/ /

$changeStreamSplitLargeEvent(集計ステージ)

$changeStreamSplitLargeEvent

バージョン 7.0 の新機能:(6.0.9 でも利用可能

変更ストリームに16 MB を超える大きなイベントがある場合、 BSONObjectTooLarge例外が返されます。 MongoDB 6.0.9以降、 イベントを小さなフラグメントに分割するには、 $changeStreamSplitLargeEventステージを使用します。

$changeStreamSplitLargeEventは、厳密に必要な場合にのみ使用してください。 たとえば、アプリケーションで完全なドキュメントの変更前または変更後のイメージが必要で、 16 MB を超える大規模なイベントが生成される場合は、 $changeStreamSplitLargeEventを使用します。

$changeStreamSplitLargeEventを使用する前に、まず 変更イベント のサイズを小さくしてみてください。 例:

  • アプリケーションで必要な場合を除き、ドキュメントの変更前または変更後のイメージをリクエストしないでください。 これにより、通常、変更イベントにおける最大のオブジェクトであるfullDocumentフィールドとfullDocumentBeforeChangeフィールドが生成されることが増えます。

  • アプリケーションに必要なフィールドのみを含めるには、 $projectステージを使用します。 これにより、変更イベントのサイズが縮小され、大規模なイベントをフラグメントに分割するための追加時間が回避されます。 これにより、各バッチで返される変更イベントが増えます。

パイプラインには$changeStreamSplitLargeEventステージが 1 つだけあり、最後のステージである必要があります。 $changeStreamSplitLargeEvent$changeStreamパイプラインでのみ使用できます。

$changeStreamSplitLargeEvent 構文:

{
$changeStreamSplitLargeEvent: {}
}

$changeStreamSplitLargeEvent 16 MB を超えるイベントをフラグメントに分割し、変更ストリーム カーソルを使用してフラグメントを順番に返します。

フラグメントは、最初のフラグメントで最大数のフィールドが返されるように分割されます。 これにより、イベント コンテキストができるだけ早く返されるようになります。

変更イベントが分割される場合、最上位のフィールドのサイズのみが使用されます。 $changeStreamSplitLargeEventはサブドキュメントを再帰的に処理または分割しません。 たとえば、 $projectステージを使用して、サイズが20 MB の 1 つのフィールドを持つ変更イベントを作成すると、イベントは分割されず、 ステージはエラーを返します。

各フラグメントには再開トークンがあります。 フラグメントのトークンを使用して再開されたストリームは、次のいずれかになります。

  • 後続のフラグメントから新しいストリームを開始します。

  • シーケンスの最後のフラグメントから再開する場合は、次のイベントから開始します。

イベントの各フラグメントにはsplitEventドキュメントが含まれます。

splitEvent: {
fragment: <int>,
of: <int>
}

以下の表で、フィールドを説明します。

フィールド
説明

fragment

フラグメント インデックスは、 1から提供されます。

of

イベントのフラグメントの合計数。

このセクションのサンプルシナリオでは、 myCollectionという名前の新しいコレクションを持つ$changeStreamSplitLargeEventの使用を示します。

myCollectionを作成し、 16 MB 未満のデータを含むドキュメントを 1 つ挿入します。

db.myCollection.insertOne(
{ _id: 0, largeField: "a".repeat( 16 * 1024 * 1024 - 1024 ) }
)

largeField 反復文字aが含まれています。

myCollectionに対してchangeStreamPreAndPostImagesを有効にします。これにより、変更ストリームは更新前(イメージ前)と更新後(イメージ後)のドキュメントを取得できます。

db.runCommand( {
collMod: "myCollection",
changeStreamPreAndPostImages: { enabled: true }
} )

db.collection.watch()を使用して、 myCollectionへの変更を監視するための変更ストリーム カーソルを作成します。

myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ],
{ fullDocument: "required", fullDocumentBeforeChange: "required" }
)

変更ストリーム イベント の場合、

  • fullDocument: "required" には、ドキュメントの変更後のイメージが含まれます。

  • fullDocumentBeforeChange: "required" には、ドキュメントの変更前のイメージが含まれます。

詳細については、$changeStream を参照してください。

myCollectionでドキュメントを更新します。これにより、ドキュメントの変更前と変更後のイメージを含む 変更ストリーム イベント も生成されます。

db.myCollection.updateOne(
{ _id: 0 },
{ $set: { largeField: "b".repeat( 16 * 1024 * 1024 - 1024 ) } }
)

largeField に繰り返しの文字bが含まれるようになりました。

next()メソッドを使用してmyChangeStreamCursorからフラグメントを取得し、そのフラグメントをfirstFragmentsecondFragmentthirdFragmentという名前のオブジェクトに保存します。

const firstFragment = myChangeStreamCursor.next()
const secondFragment = myChangeStreamCursor.next()
const thirdFragment = myChangeStreamCursor.next()

firstFragment.splitEvent表示:

firstFragment.splitEvent

フラグメントの詳細を含む出力は次のようになります。

splitEvent: { fragment: 1, of: 3 }

同様に、 secondFragment.splitEventthirdFragment.splitEventも次を返します。

splitEvent: { fragment: 2, of: 3 }
splitEvent: { fragment: 3, of: 3 }

firstFragmentのオブジェクトキーを調べるには、次の手順に従います。

Object.keys( firstFragment )

出力:

[
'_id',
'splitEvent',
'wallTime',
'clusterTime',
'operationType',
'documentKey',
'ns',
'fullDocument'
]

firstFragment.fullDocumentのサイズをバイト単位で調べるには次のようにします。

bsonsize( firstFragment.fullDocument )

出力:

16776223

secondFragment には、サイズが約 16 MB のfullDocumentBeforeChange変更前のイメージが含まれています。 次の例は、 secondFragmentのオブジェクトキーを示しています。

Object.keys( secondFragment )

出力:

[ '_id', 'splitEvent', 'fullDocumentBeforeChange' ]

thirdFragment には、サイズが約 16 MB のupdateDescriptionフィールドが含まれています。 次の例は、 thirdFragmentのオブジェクトキーを示しています。

Object.keys( thirdFragment )

出力:

[ '_id', 'splitEvent', 'updateDescription' ]

MongoDB .NET/C#ドライバーを使用して集計パイプラインに $changeStreamSplitLargeEvent ステージを追加するには、PipelineDefinition オブジェクトで ChangeStreamSplitLargeEvent() メソッドを呼び出します。

次の例では、16 MB を超えるイベントをフラグメントに分割し、変更ストリームカーソルで順番に返すパイプラインステージを作成します。各フラグメントの splitEventフィールドには、フラグメントインデックスと合計数が表示されます。

[BsonIgnoreExtraElements]
public class LargeDocument
{
[BsonId]
public int Id { get; set; }
public string LargeField { get; set; } = null!;
}
var client = new MongoClient("<connection-string>");
var db = client.GetDatabase("change_stream_test");
var collection = db.GetCollection<LargeDocument>("largeEventCollection");
collection.InsertOne(new LargeDocument
{
Id = 0,
LargeField = new string('a', 16 * 1024 * 1024 - 1024)
});
db.RunCommand<BsonDocument>(new BsonDocument
{
{ "collMod", "largeEventCollection" },
{ "changeStreamPreAndPostImages", new BsonDocument("enabled", true) }
});
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<LargeDocument>>()
.ChangeStreamSplitLargeEvent();
var watchOptions = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.Required,
FullDocumentBeforeChange = ChangeStreamFullDocumentBeforeChangeOption.Required
};
using var cursor = collection.Watch(pipeline, watchOptions);
collection.UpdateOne(
Builders<LargeDocument>.Filter.Eq(d => d.Id, 0),
Builders<LargeDocument>.Update.Set(d => d.LargeField, new string('b', 16 * 1024 * 1024 - 1024))
);
while (cursor.MoveNext())
{
foreach (var fragment in cursor.Current)
{
Console.WriteLine(fragment.BackingDocument["splitEvent"].ToJson());
}
}
{ "fragment" : 1, "of" : 3 }
{ "fragment" : 2, "of" : 3 }
{ "fragment" : 3, "of" : 3 }

watch() メソッドと aggregate() メソッドの両方を使用して、 $changeStreamSplitLargeEvent操作を実行できます。MongoDB Collectionオブジェクトの watch() メソッドに集計パイプラインを渡すと、$changeStreamSplitLargeEventChangeStreamCursor を返します。集計パイプラインを aggregate() メソッドに渡すと、$changeStreamSplitLargeEvent から が返されます。AggregationCursor

重要

$changeStreamSplitLargeEvent の再開可能性

変更ストリームを aggregate() メソッドに渡すと、変更ストリームは再開できません。変更ストリームは、 watch() メソッドに渡された場合にのみ再開します。再開可能性の詳細については、変更ストリームの再開を参照してください。

次の例では、16 MB を超えるイベントをフラグメントに分割し、ChangeStreamCursor で順番に返します。

const pipeline = [{ changeStreamSplitLargeEvent: {} }];
const changeStream = collection.watch(pipeline);
return changeStream;

変更ストリーム通知の詳細については、「変更イベント 」を参照してください。

関連するパイプラインステージの詳細については、$changeStreamガイドを参照してください。

戻る

$changeStream

項目一覧