変更ストリームを使用すると、 oplog を手動で追跡する以前の複雑さやリスクなしで、アプリケーションがリアルタイムデータ変更にアクセスできます。アプリケーションは変更ストリームを使用して、単一のコレクション、データベース、または配置全体のすべてのデータ変更にサブスクライブし、それらに即時に対応できます。変更ストリームは集計フレームワークを使用するため、アプリケーションは特定の変更をフィルタリングすることも、通知を任意に変換することもできます。
注意
変更ストリームはデータベースイベントに制限されます。Atlas Stream Processing には、複数のデータイベントタイプの管理や、Atlas データベースと同じクエリAPIを使用した複雑なデータのStream Processingなどの拡張機能があります。詳しくは、 Atlas Stream Processing を参照してください。
MongoDB 5.1 以降では、変更ストリームが最適化され、リソースの使用効率が上がり、一部の集計パイプライン ステージの実行が高速化されています。
可用性
変更ストリームは、レプリカセットとシャーディングされたクラスターで使用できます。
ストレージエンジン
レプリカセットとシャーディングされたクラスターでは、WiredTiger ストレージエンジンを使用する必要があります。変更ストリームは、保管時の暗号化機能を使用する配置でも使用できます。
レプリカセット プロトコル バージョン
レプリカセットとシャーディングされたクラスターは、レプリカセット プロトコル バージョン 1(
pv1)を使用する必要があります。読み取り保証 (read concern) "過半数" 有効化
変更ストリームは、
"majority"の読み取り保証 (read concern) のサポートに関係なく 使用できます 。つまり、変更ストリームを使用するには、読み取り保証(read concern)majorityのサポートを有効にする(デフォルト)か無効にするかを選択できます。
注意
時系列コレクションは、ドキュメントレベルで変更を追跡する代わりに最適化されたストレージ形式を使用するため、変更ストリームをサポートしていません。時系列コレクションを Atlas Stream Processing のソースとして使用することはできません。
Tip
詳細については時系列コレクションの制限を参照してください。
Stable API でのサポート
変更ストリームはStable API V 1に含まれています。 ただし、 showExpandedEventsオプションは Stable API V 1に含まれていません。
接続
変更ストリームの接続は、+srv 接続オプションを使用して DNS シードリスト、または接続文字列でサーバーを個別にリストする方法のいずれかを使用できます。
変更ストリームへの接続を失うか、接続が切断された場合、ドライバーは、読み込み設定 (read preference) が一致するクラスター内の別のノードを介して変更ストリームへの接続を再確立しようとします。ドライバーが正しく読み込み設定 (read preference) されているノードを見つけられない場合、例外が発生します。
詳細については、「接続文字列 URI 形式」を参照してください。
コレクション、データベース、または配置の監視
変更ストリームを以下に対して開くことができます。
ターゲット | 説明 |
|---|---|
コレクション | 単一のコレクション( このページの例では、MongoDB ドライバーを使用して、単一のコレクションの変更ストリーム カーソルを開いて操作します。 |
データベース | 単一データベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
配置 | 配置(レプリカセットまたはシャーディングされたクラスター)の変更ストリーム カーソルを開き、すべてのデータベース( MongoDB ドライバー メソッドについては、「 ドライバーのドキュメント 」を参照してください。 |
変更ストリームのパフォーマンスに関する考慮事項
データベースに対して開かれたアクティブな変更ストリームの量が接続プールのサイズを超えると、通知のレイテンシが発生する可能性があります。各変更ストリームは、次のイベントを待機している間、getMore 操作によって接続を開いたまま保持します。レイテンシを回避するには、プールサイズがオープン中の変更ストリーム数より大きいことを確認してください。詳細については、maxPoolSize の設定を参照してください。
シャーディングされたクラスターの考慮事項
変更ストリームをシャーディングされたクラスター上で開くとき
mongosは、各シャードに個別の変更ストリームを作成します。この動作は、変更ストリームが特定のシャードキーの範囲を対象にしているかどうかに関わらず発生します。mongosは変更ストリームの結果を受け取ると、その結果をソートおよびフィルタリングします。必要に応じて、mongosはfullDocumentルックアップも実行します。
最高のパフォーマンスを得るには、変更ストリームでの $lookup クエリの使用を制限します。
変更ストリームを開く
変更ストリームを開くには
レプリカセットの場合は、任意のデータ保持ノードから変更ストリームを開きます。
シャーディングされたクラスターの場合は、
mongosから変更ストリームを開きます。
次の例では、コレクションの変更ストリームを開き、カーソルを反復処理して変更ストリーム ドキュメントを検索します。[1]
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
以下のC の例ではMongoDBレプリカセットに接続し、 inventoryコレクションを含むデータベースにアクセスしている ことを前提としています。
mongoc_collection_t *collection; bson_t *pipeline = bson_new (); bson_t opts = BSON_INITIALIZER; mongoc_change_stream_t *stream; const bson_t *change; const bson_t *resume_token; bson_error_t error; collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
以下のC# の例では、 MongoDBレプリカセットに接続し、 inventoryコレクションを含むデータベース にアクセスしている ことを前提としています。
var cursor = inventory.Watch(); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
以下の Go の例では、MongoDBレプリカセットに接続し、inventory コレクションを含むデータベースにアクセスしていることを前提としています。
cs, err := coll.Watch(ctx, mongo.Pipeline{}) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
以下の Java の例では、MongoDB レプリカセットに接続し、inventory コレクションを含むデータベースにアクセスしていることを前提としています。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator(); ChangeStreamDocument<Document> next = cursor.next();
以下の Kotlin の例では、 MongoDB レプリカセットに接続し、 かつinventoryコレクションを含むデータベースにアクセスできる ことを前提としています。 これらのタスクの完了の詳細については、「 Kotlin コルーチン ドライバーのデータベースとコレクション 」のガイドを参照してください。
val job = launch { val changeStream = collection.watch() changeStream.collect { println("Received a change event: $it") } }
以下の例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory コレクションが含まれていることを前提としています。
cursor = db.inventory.watch() document = await cursor.next()
以下の Node.js の例では、MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory コレクションが含まれていることを前提としています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch(); changeStream .on('change', next => { // process next document }) .once('error', () => { // handle error });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const collection = db.collection('inventory'); const changeStream = collection.watch(); const next = await changeStream.next();
changeStream により EventEmitter が拡張されます。
以下の例では、MongoDB レプリカセットに接続し、 inventory コレクションを含むデータベースにアクセスしていることを前提としています。
$changeStream = $db->inventory->watch(); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
以下の Python の例では、 MongoDB レプリカセットに接続し、 かつinventoryコレクションを含むデータベースにアクセスしていることを前提としています。
cursor = db.inventory.watch() next(cursor)
以下の例では、MongoDB レプリカセットに接続し、 inventory コレクションを含むデータベースにアクセスしていることを前提としています。
cursor = inventory.watch.to_enum cursor.next
以下のSwift (Async)例ではMongoDBレプリカセットに接続し、アクセスしているデータベースに inventoryコレクションが含まれていることを前提としています。
let inventory = db.collection("inventory") // Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }
以下の Swift(Sync)例では MongoDB レプリカセットに接続し、アクセスしているデータベースに inventory コレクションが含まれていることを前提としています。
let inventory = db.collection("inventory") let changeStream = try inventory.watch() let next = changeStream.next()
変更ストリーム カーソルを反復処理して変更イベントを取得します。変更ストリームのドキュメント形式については、「変更イベント」を参照してください。
変更ストリーム カーソルは、次のいずれかが発生するまで開いたままになります。
カーソルが明示的に閉じている。
無効化イベントが発生している(コレクションの削除や名前の変更など)。
MongoDBデプロイへの接続が閉じているか、タイムアウトになっている。詳細については、「動作」を参照してください。
配置がシャーディングされたクラスターの場合、シャードを削除すると、開いている変更ストリームのカーソルが閉じることがあります。閉じた変更ストリームのカーソルは完全に再開できない場合があります。
注意
閉じられていないカーソルのライフサイクルは言語に依存します。
| [1] | startAtOperationTime を指定すると、特定の時点でカーソルを開くことができます。過去の開始点を指定する場合、oplog の時間範囲内である必要があります。 |
変更ストリーム出力の修正
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = BCON_NEW ("pipeline", "[", "{", "$match", "{", "fullDocument.username", BCON_UTF8 ("alice"), "}", "}", "{", "$addFields", "{", "newField", BCON_UTF8 ("this is an added field!"), "}", "}", "]"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>() .Match(change => change.FullDocument["username"] == "alice" || change.OperationType == ChangeStreamOperationType.Delete) .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>( "{ $addFields : { newField : 'this is an added field!' } }"); var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline := mongo.Pipeline{bson.D{{ "$match", bson.D{{ "$or", bson.A{ bson.D{{"fullDocument.username", "alice"}}, bson.D{{"operationType", "delete"}}, }, }}, }}} cs, err := coll.Watch(ctx, pipeline) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>"); // Select the MongoDB database and collection to open the change stream against MongoDatabase db = mongoClient.getDatabase("myTargetDatabase"); MongoCollection<Document> collection = db.getCollection("myTargetCollection"); // Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete"))))); // Create the change stream cursor, passing the pipeline to the // collection.watch() method MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
pipelineリストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$matchステージが含まれます。
username値はaliceoperationType値はdelete
pipelineをwatch()メソッドに渡すと、変更ストリームは指定されたpipelineを介して通知を渡し、通知を返すように指示します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
val pipeline = listOf( Aggregates.match( or( eq("fullDocument.username", "alice"), `in`("operationType", listOf("delete")) ) )) val job = launch { val changeStream = collection.watch(pipeline) changeStream.collect { println("Received a change event: $it") } }
pipelineリストには、次の条件の 1 つまたは両方を満たす操作をフィルタリングする単一の$matchステージが含まれます。
username値はaliceoperationType値はdelete
pipelineをwatch()メソッドに渡すと、変更ストリームは指定されたpipelineを介して通知を渡し、通知を返すように指示します。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) document = await cursor.next()
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
次の例では、ストリームを使用して変更イベントを処理しています。
const pipeline = [ { $match: { 'fullDocument.username': 'alice' } }, { $addFields: { newField: 'this is an added field!' } } ]; const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch(pipeline); const next = await changeStreamIterator.next();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
$pipeline = [ ['$match' => ['fullDocument.username' => 'alice']], ['$addFields' => ['newField' => 'this is an added field!']], ]; $changeStream = $db->inventory->watch($pipeline); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
pipeline = [ {"$match": {"fullDocument.username": "alice"}}, {"$addFields": {"newField": "this is an added field!"}}, ] cursor = db.inventory.watch(pipeline=pipeline) next(cursor)
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
変更ストリームを構成する際、次のパイプライン ステージのうち 1 つ以上の配列を指定することで変更ストリーム出力を制御できます。
let pipeline: [BSONDocument] = [ ["$match": ["fullDocument.username": "alice"]], ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self) let next = changeStream.next()
Tip
変更ストリーム イベント ドキュメントの _id フィールドは、再開トークンとして機能します。変更ストリーム イベントの _id フィールドを変更または削除するために、パイプラインを使用しないでください。
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
更新操作のための完全なドキュメントの検索
デフォルトでは、変更ストリームは更新操作中にフィールドのデルタのみを返します。ただし、更新されたドキュメントの過半数がコミットした最新のバージョンを返すように変更ストリームを構成できます。
updateLookup操作は、コレクションから、シャードキーとドキュメント識別子で識別されるドキュメントを読み取ります。コレクションは名前によって識別され、変更ストリームが処理される時点で存在するコレクションデータを使用します。次のシナリオを検討してみましょう。
コレクションの名前が変更された場合、ドキュメントは返されません。
コレクションの名前が変更され、古い名前で新しいコレクションが作成された場合、新しいコレクションに対してルックアップ操作が実行されます 。一致するドキュメントが見つかった場合は、それが返されます。
警告
迅速な削除やトラフィックの急増が発生している状況では、$match フィルターで fullDocument: "updateLookup" を構成すると、「再開トークンが見つかりません」エラーが発生する可能性があります。これは、ドキュメントの削除によって fullDocumentフィールドがnull 値を返す場合に発生します。これは、一致するドキュメントがないためであり、変更ストリームが再開トークンを見つけられなくなります。
代わりに、fullDocumentBeforeChange:
"whenAvailable" と fullDocument: "whenAvailable" の変更前と変更後のイメージを使用してください。「Change Streams with イメージ前ドキュメントとイメージ後ドキュメント」セクションを参照してください。
➤ 右上の言語の選択のドロップダウンメニューを使用して、このページの例の言語を設定します。
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument" オプションに "updateLookup" 値を指定してmongoc_collection_watch メソッドを使用します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup"); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream);
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" を db.collection.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup }; var cursor = inventory.Watch(options); while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); cursor.Dispose();
更新されたドキュメントの最新の過半数コミット済みバージョンを返すには、SetFullDocument(options.UpdateLookup) 変更ストリームオプションを使用します。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup)) assert.NoError(t, err) defer cs.Close(ctx) ok := cs.Next(ctx) next := cs.Current
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 FullDocument.UPDATE_LOOKUPをdb.collection.watch.fullDocument()メソッドに渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator(); next = cursor.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 ChangeStreamFlow.FullDocument() に渡します。FullDocument.UPDATE_LOOKUPメソッドを使用します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す FullDocument フィールドが含まれています。
val job = launch { val changeStream = collection.watch() .fullDocument(FullDocument.UPDATE_LOOKUP) changeStream.collect { println(it) } }
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup' を db.collection.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す `full_document フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") document = await cursor.next()
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、{ fullDocument: 'updateLookup' } を db.collection.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。
次の例では、ストリームを使用して変更イベントを処理しています。
const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); changeStream .on('change', next => { // process next document }) .once('error', error => { // handle error });
代わりに、イテレーターを使用して変更イベントを処理することもできます。
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' }); const next = await changeStreamIterator.next();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" を db.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す fullDocument フィールドが含まれています。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]); $changeStream->rewind(); $firstChange = $changeStream->current(); $changeStream->next(); $secondChange = $changeStream->current();
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document='updateLookup' を db.collection.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document フィールドが含まれています。
cursor = db.inventory.watch(full_document="updateLookup") next(cursor)
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、full_document: 'updateLookup' を db.watch() メソッドへ渡します。
以下の例では、すべての更新操作通知に、更新操作の影響を受けるドキュメントの現在のバージョンを表す full_document フィールドが含まれています。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum cursor.next
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)をwatch()メソッドに渡します。
let inventory = db.collection("inventory") // Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }
過半数以上にコミットされた最新バージョンの更新されたドキュメントを返すには、 options:
ChangeStreamOptions(fullDocument: .updateLookup)をwatch()メソッドに渡します。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next()
注意
過半数がコミットした操作で、更新操作後、lookup 前に、更新されたドキュメントを変更した操作が 1 つ以上ある場合、返される完全なドキュメント全体は、更新操作時のドキュメントとは大幅に異なる可能性があります。
ただし、変更ストリーム ドキュメントに含まれるデルタには、その変更ストリーム イベントに適用された監視対象コレクションの変更が常に正しく記述されています。
次のいずれかに当てはまる場合、更新イベントの fullDocument フィールドが欠落していることがあります。
ドキュメントが削除された場合、または更新と検索の間にコレクションが削除された場合。
更新によって、そのコレクションのシャードキーで、少なくとも1つのフィールドの値が変更された場合。
変更ストリームの応答ドキュメントの形式の詳細については、「変更イベント」を参照してください。
変更ストリームの再開
変更ストリームは、カーソルを開くときに、resumeAfter または startAfter のいずれかに再開トークンを指定することによって再開できます。
警告
再開トークンを使用して変更ストリームを再開する場合は、最初にトークンを生成したときと同じパイプラインとオプションを使用します。別の変更ストリームパイプラインまたは別のオプションを使用すると、予期しない動作が発生したり、データの整合性に悪影響を影響たり、変更ストリームが再開できなくなったりする可能性があります。
resumeAfter Change Streams の
カーソルを開くときに再開トークンを resumeAfter に渡すことで、特定のイベントの後に変更ストリームを再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
無効化イベント(コレクションの削除や名前の変更など)によってストリームが閉じられた後は、
resumeAfterを使用して変更ストリームを再開することはできません。代わりに、startAfter を使用して、無効化イベント後に新しい変更ストリームを開始できます。
以下の例では、ストリームが破棄された後に再作成されるように、ストリーム オプションにresumeAfter オプションが追加されています。変更ストリームに _id を渡すと、指定された操作の後に通知の再開が試行されます。
stream = mongoc_collection_watch (collection, pipeline, NULL); if (mongoc_change_stream_next (stream, &change)) { resume_token = mongoc_change_stream_get_resume_token (stream); BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token); mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); } mongoc_change_stream_destroy (stream); }
以下の例では、 resumeTokenが最後の変更ストリーム ドキュメントから検索され、オプションとしてWatch()メソッドに渡されています。 resumeTokenをWatch()メソッドに渡すと、変更ストリームは、再開トークンで指定された操作の後に通知の再開を試行するように指示します。
var resumeToken = previousCursor.GetResumeToken(); var options = new ChangeStreamOptions { ResumeAfter = resumeToken }; var cursor = inventory.Watch(options); cursor.MoveNext(); var next = cursor.Current.First(); cursor.Dispose();
ChangeStreamOptions.SetResumeAfter を使用して、変更ストリームの再開トークンを指定できます。resumeAfter オプションが設定されている場合、変更ストリームは再開トークンで指定された操作の後に通知を再開します。SetResumeAfter は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。
resumeToken := original.ResumeToken() cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) assert.NoError(t, err) defer cs.Close(ctx) ok = cs.Next(ctx) result := cs.Current
resumeAfter()メソッドを使用すると、再開トークンで指定された操作の後に通知を再開できます。 resumeAfter()メソッドは、再開トークンに解決する必要がある値を指定します。たとえば、 以下の例ではresumeTokenです。
BsonDocument resumeToken = next.getResumeToken(); cursor = inventory.watch().resumeAfter(resumeToken).iterator(); next = cursor.next();
ChangeStreamFlow.resumeAfter()メソッドを使用して、再開トークンで指定された操作の後に通知を再開します。resumeAfter() メソッドは、以下の例の resumeToken 変数など、再開トークンに解決する必要がある値を受け取ります。
val resumeToken = BsonDocument() val job = launch { val changeStream = collection.watch() .resumeAfter(resumeToken) changeStream.collect { println(it) } }
resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) document = await cursor.next()
resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。
const collection = db.collection('inventory'); const changeStream = collection.watch(); let newChangeStream; changeStream .once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close(); newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream .on('change', next => { processChange(next); }) .once('error', error => { // handle error }); }) .once('error', error => { // handle error });
resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では $resumeToken です。
$resumeToken = $changeStream->getResumeToken(); if ($resumeToken === null) { throw new \Exception('Resume token was not found'); } $changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind(); $firstChange = $changeStream->current();
resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。
resume_token = cursor.resume_token cursor = db.inventory.watch(resume_after=resume_token) next(cursor)
resume_after 修飾子を使用すると、再開トークンで指定された操作の後に通知を再開できます。resume_after 修飾子は、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resume_token です。
change_stream = inventory.watch cursor = change_stream.to_enum next_change = cursor.next resume_token = change_stream.resume_token new_cursor = inventory.watch([], resume_after: resume_token).to_enum new_cursor.next
resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。
let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }
resumeAfter オプションを使用すると、再開トークンで指定された操作の後に通知を再開できます。resumeAfter オプションは、再開トークンに解決する必要がある値を指定します。たとえば、以下の例では resumeToken です。
let inventory = db.collection("inventory") let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) let next = changeStream.next() let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()
startAfter Change Streams の
カーソルを開くときに再開トークンを startAfter に渡すことで、特定のイベントの後に新しい変更ストリームを開始できます。resumeAfter とは異なり、startAfter は新しい変更ストリームを作成することで無効化イベント後に通知を再開できます。
再開トークンの詳細については、「再開トークン」を参照してください。
重要
oplog には、トークンまたはタイムスタンプ(タイムスタンプが過去のものの場合)に関連付けられた操作を見つけるのに十分な履歴が必要です。
再開トークン
再開トークンには次の 2 種類があります。
イベント トークン: 特定の変更イベントを識別します。変更ストリームカーソルは、変更イベントが発生するたびにイベントトークンを生成します。
高値トークン: 関連する 変更イベントのない点を表します。サーバーは定期的に高値トークンを生成し、変更イベントが発生していない場合でもクラスター時間が経過したことを示します。
Tip
サーバーは高値の再開トークンのタイムスタンプを定期的に増やします。書き込み (write) 頻度の低いアイドル シャードでは、ユースケースによっては、この高度が十分な頻度で発生しないことがあります。ハイライト タイムスタンプをより頻繁に進めるには、appendOplogNote コマンドを使用してアイドル シャード上のoplogに noop エントリを書込みます。
再開トークンは、複数のソースにあります。
ソース | 説明 |
|---|---|
各変更イベントの通知には、 | |
このフィールドは、 | |
|
MongoDB 4.2 以降、変更ストリームの集計パイプライン でイベントの _id フィールドが変更される場合、変更ストリームで例外がスローされるようになります。
Tip
MongoDB に備わる 「スニペット」は、16 進数でエンコードされた再開トークンを解読する mongosh の拡張機能です。
mongosh: から 再開トークン スニペットをインストールして実行できます。
snippet install resumetoken decodeResumeToken('<RESUME TOKEN>')
システムに npm がインストールされている場合は、コマンドラインから再開トークンを実行することもできます(mongoshを使用せずに)。
npx mongodb-resumetoken-decoder <RESUME TOKEN>
下記についての詳細は、参照先をご覧ください。
変更イベントからの再開トークン
変更イベントの通知には、_id フィールドに再開トークンが含まれています。
{ "_id": { "_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004" }, "operationType": "insert", "clusterTime": Timestamp({ "t": 1666193824, "i": 1 }), "collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"), "wallTime": ISODate("2022-10-19T15:37:04.604Z"), "fullDocument": { "_id": ObjectId("635019a078be67426d7cf4d2"'), "name": "Giovanni Verga" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { "_id": ObjectId("635019a078be67426d7cf4d2") } }
からの再開トークン aggregate
aggregateコマンドを使用する場合、$changeStream 集計ステージには cursor.postBatchResumeToken フィールドに再開トークンが含まれています。
{ "cursor": { "firstBatch": [], "postBatchResumeToken": { "_data": "8263515EAC000000022B0429296E1404" }, "id": Long("4309380460777152828"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp({ "t": 1666277036, "i": 1 }), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666277036, "i": 1 }) }
からの再開トークン getMore
getMore コマンドにも、cursor.postBatchResumeToken フィールドに再開トークンが含まれています。
{ "cursor": { "nextBatch": [], "postBatchResumeToken": { "_data": "8263515979000000022B0429296E1404" }, "id": Long("7049907285270685005"), "ns": "test.names" }, "ok": 1, "$clusterTime": { "clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ), "signature": { "hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0), "keyId": Long("0") } }, "operationTime": Timestamp({ "t": 1666275705, "i": 1 }) }
ユースケース
変更ストリームは、データの変更が確実であることが確認された後に下流のシステムに通知することで、依存しているビジネスシステムを持つアーキテクチャにメリットをもたらすことができます。たとえば、変更ストリームは、データ抽出・変換・ロード(ETL)サービス、クロスプラットフォーム同期、コラボレーション機能、通知サービスを実装する際、開発者の時間を節約するのに役立ちます。
アクセス制御
特定のコレクションに対して変更ストリームを開くには、対応するコレクションに対して
changeStreamアクションとfindアクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] } 単一のデータベースに対して変更ストリームを開くには、データベース内のすべての非
systemコレクションに対してchangeStreamアクションとfindアクションを許可する権限がアプリケーションに必要です。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] } 配置の全体に対して変更ストリームを開くには、配置内のすべてのデータベースのすべての非
systemコレクションに対してchangeStreamアクションとfindアクションを許可する権限がアプリケーションに必要です。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
イベント通知
変更ストリームは、レプリカセット内のデータを保持しているノードの過半数に反映されたデータ変更についてのみ通知します。変更ストリームは、障害発生時にも永続性が保証された、過半数コミット済みの変更に対してのみ通知がトリガーされることを保証します。
たとえば、レプリカセットが3つのノードで構成されており、プライマリに対して変更ストリーム カーソルが開かれているとします。クライアントが挿入操作を実行した場合、変更ストリームは、挿入がデータを保持しているノードの過半数に保存された後でのみ、データ変更をアプリケーションに通知します。
操作がトランザクションに関連付けられている場合、変更イベント ドキュメントには txnNumber と lsid が含まれます。
照合
明示的な照合が指定されていない限り、変更ストリームはsimple バイナリ比較を使用します。
Change Streams と孤立したドキュメント
MongoDB5.3 以降では、 範囲移行 中に、 孤立したドキュメント の更新に対して 変更ストリーム イベントは生成されません。
変更ストリームにおけるドキュメントの変更前と変更後のイメージ
MongoDB 6.0 以降では、変更ストリーム イベントを使用して、変更前と変更後のドキュメントのバージョン(変更前とイメージと変更後のイメージ)を出力できます。
変更前のイメージとは、置換、更新、または削除される前のドキュメントです。挿入されたドキュメントには、変更前のイメージはありません。
変更後のイメージとは、挿入、置換、または更新された後のドキュメントです。削除されたドキュメントには、変更後のイメージはありません。
db.createCollection()、create、またはcollModを使用し、コレクションに対してchangeStreamPreAndPostImagesを有効にします。例、collModコマンドを使用する場合は次のようになります。db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
変更ストリーム イベントにおいて、次の条件に当てはまる場合、変更前と変更後のイメージは使用できません。
ドキュメントの更新または削除操作時に、コレクションにおいて有効になっていない場合。
expireAfterSecondsで設定した、変更前と変更後のイメージ保持時間が経過した後に削除された場合。次の例では、クラスター全体で
expireAfterSecondsを100秒に設定します。use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) 注意
setClusterParameterコマンドはMongoDB Atlasクラスターではサポートされていません。すべてのコマンドに対する Atlas のサポートの詳細については、「 Atlas でサポートされていないコマンド 」を参照してください。次の例では、
expireAfterSecondsを含む現在のchangeStreamOptions設定を返します。db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) expireAfterSecondsをoffに設定すると、デフォルトの保持ポリシーが適用されます。対応する変更ストリーム イベントがoplog から削除されるまで、変更前と変更後のイメージは保持されます。変更ストリーム イベントが oplog から削除されると、
expireAfterSecondsの変更前と変更後のイメージの保持時間にかかわらず、対応する変更前と変更後のイメージも削除されます。
その他の考慮事項
変更前と変更後のイメージを有効にすると、ストレージ容量が消費され、処理時間が増えます。変更前と変更後のイメージは、必要な場合のみ有効にしてください。
変更ストリーム イベントのサイズを 16 メビバイト未満に制限します。イベントのサイズを制限するには、次の方法があります。
ドキュメントのサイズを 8 MB に制限します。
updateDescriptionのような他の変更ストリーム イベントのフィールドがそれほど大きくない場合、変更ストリーム出力で変更前と変更後のイメージを同時にリクエストできます。updateDescriptionのような他の変更ストリーム イベントのフィールドが大きくない場合、最大 16 メビバイトのドキュメントの変更ストリーム出力では、変更後のイメージのみをリクエストします。次の場合、16 メビバイトまでのドキュメントの変更ストリーム出力で、変更前のイメージのみをリクエストします。
ドキュメントのアップデートがドキュメントの構造または内容のごく一部にしか影響しない場合、そして
replace変更イベントが発生しない場合。replaceイベントには、常に変更後のイメージが含まれます。
変更前イメージをリクエストするには、
db.collection.watch()で、fullDocumentBeforeChangeをrequiredまたはwhenAvailableに設定します。変更後イメージをリクエストするには、同じ方法でfullDocumentを設定します。変更前のイメージは
config.system.preimagesコレクションに書き込まれます。config.system.preimagesコレクションが大きくなる場合があります。コレクションのサイズを制限するには、前述のとおり、変更前のイメージにexpireAfterSeconds時間を設定します。config.system.preimagesのサイズを監視するには、シャーディングされたクラスターか、レプリカセットのmongodノードに接続します。次に、以下のコマンドを実行します。use config db.system.preimages.totalSize() db.system.preimages.stats() 変更前のイメージはバックグラウンド プロセスによって非同期で削除されます。
重要
下位互換性のない機能
MongoDB 6.0 以降では、変更ストリームにドキュメントの変更前のイメージと変更後のイメージを使用している場合、以前の MongoDB バージョンにダウングレードする前に、collMod コマンドを使用して各コレクションの changeStreamPreAndPostImages を無効にする必要があります。
Tip
変更ストリーム イベントと出力については、「変更イベント」を参照してください。
コレクションの変更を監視するには、
db.collection.watch()を参照してください。変更ストリーム出力の完全な例については、「Change Streams とドキュメントの変更前イメージおよび変更後イメージ」を参照してください。
変更ストリーム出力の完全な例については、「Change Streams とドキュメントの変更前イメージおよび変更後イメージ」を参照してください。
クラスターの initialSyncMethod パラメータが fileCopyBased の場合、変更ストリームリスナーには影響しません。
initialSyncMethod が logical で、新しく同期されたノードで変更ストリームが開かれ、 論理的な最初の同期の完了より前の点でのイベントが読み取られる場合、変更前と変更後のイメージが欠落している可能性があります。