AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

Change Streams によるデータの監視

このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。

Tip

Atlas Stream Processing

変更ストリームの代わりに、Atlas Stream Processing を使用してデータのストリームを処理および変換できます。データベースイベントのみを登録する変更ストリームとは異なり、Atlas Stream Processing は複数のデータイベント型を管理し、拡張データプロセシング機能を提供します。この機能の詳細については、 MongoDB AtlasドキュメントのAtlas Stream Processingを参照してください。

このガイドの例では、 Atlas サンプル データセットsample_restaurants.restaurantsコレクションを使用します。 MongoDB Atlas クラスターを無料で作成して、サンプル データセットをロードする方法については、 「 PyMongo を使い始める 」を参照してください。

変更ストリームを開くには、 watch()メソッドを呼び出します。 watch()メソッドを呼び出す インスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスでwatch()メソッドを呼び出すことができます。

  • MongoClient: MongoDB 配置のすべての変更を監視

  • Database: データベース内のすべてのコレクションの変更を監視するには

  • Collection: コレクションの変更をモニターするには

次の例では、restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch() as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch() as stream:
async for change in stream:
print(change)

変更の監視を開始するには、アプリケーションを実行します。次に、別のアプリケーションまたはシェルで、restaurantsコレクションを変更します。 次の例では、nameフィールドの値が Blarney Castle であるドキュメントを更新します。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。

database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = await collection.update_one(query_filter, update_operation)

コレクションを更新すると、変更ストリーム アプリケーションは変更が発生に応じて出力します。 出力される変更イベントは、次のようになります。

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

pipelineパラメータをwatch()メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。 パラメーターを、それぞれが集計ステージを表すオブジェクトのリストとして形式します。

pipelineパラメーターでは、次のステージを指定できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

次の例では、pipeline パラメータを使用して、アップデート操作のみを記録する変更ストリームを開きます。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。

change_pipeline = { "$match": { "operationType": "update" }},
with collection.watch(pipeline=change_pipeline) as stream:
for change in stream:
print(change)
change_pipeline = { "$match": { "operationType": "update" }},
async with await collection.watch(pipeline=change_pipeline) as stream:
async for change in stream:
print(change)

変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。

watch()メソッドは、操作を構成するために使用できるオプションを表す任意のパラメーターを受け入れます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません。

次の表では、 watch()の動作をカスタマイズするために設定できるオプションについて説明します。

プロパティ
説明

pipeline

変更ストリームの出力を変更する 集計パイプライン ステージ のリスト。

full_document

ドキュメントに加えられた変更のみを表示するのではなく、変更後に完全なドキュメントを表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。

full_document_before_change

ドキュメントに加えられた変更のみを表示するのではなく、変更前のドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。

resume_after

watch()再開トークンで指定された操作の後に変更を返すことを再開するように に指示します。各変更ストリームのイベントドキュメントには、 フィールドとして再開トークンが含まれています。変更後に再開する操作を表す変更イベントドキュメントの
_id_idフィールド全体を渡します。
resume_after は およびstart_after start_at_operation_timeと排他関係にあります。

start_after

watch()再開トークンで指定された操作の後に新しい変更ストリームを開始するように に指示します。無効化イベント後に通知を再開できるようにします。各変更ストリームのイベントドキュメントには、 フィールドとして再開トークンが含まれています。変更後に再開する操作を表す変更イベントドキュメントの
_id_idフィールド全体を渡します。
start_afterresume_after およびstart_at_operation_time と排他関係にあります。

start_at_operation_time

watch()指定されたタイムスタンプ後に発生したイベントのみを返すように に指示します。
start_at_operation_time resume_afterは およびstart_after と排他関係にあります。

max_await_time_ms

空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。 デフォルトは1000ミリ秒です。

show_expanded_events

MongoDB Server v 6.0以降、 変更ストリームは、 createIndexesイベントやdropIndexesイベントなどのデータ定義言語(DDL)イベントの変更通知をサポートします。 展開されたイベントを変更ストリームに含めるには、変更ストリーム カーソルを作成し、このパラメータをTrueに設定します。

batch_size

MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数。

collation

変更ストリーム カーソルに使用する照合。

session

ClientSessionのインスタンス。

comment

操作に添付するコメント。

重要

配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。

デフォルトでは、コレクションに対して操作を実行すると、対応する変更イベントにはその操作によって変更されたフィールドのデルタのみが含まれます。 full_document_before_changefull_document変更前または変更後の完全なドキュメントを表示するには、watch() メソッドで パラメータまたは パラメータを指定します。

変更前のイメージは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更前のイメージを含めるには、 full_document_before_changeパラメータを次のいずれかの値に設定します。

  • whenAvailable: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。

  • required: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。

変更後のイメージとは、変更のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更後のイメージを含めるには、 full_documentパラメータを次のいずれかの値に設定します。

  • updateLookup: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。

  • whenAvailable: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。

  • required: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。

次の例では、コレクションで watch() メソッドを呼び出し、fullDocument パラメータを指定して更新されたドキュメントの変更後のイメージを含めます。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch(full_document='updateLookup') as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch(full_document='updateLookup') as stream:
async for change in stream:
print(change)

変更ストリーム アプリケーションが実行されている場合、前述の更新例を使用してrestaurantsコレクション内のドキュメントを更新すると、次のような変更イベントが出力されます。

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens',
'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'},
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。

Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。

このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。