Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
Tip
Atlas Stream Processing
変更ストリームの代わりに、Atlas Stream Processing を使用してデータのストリームを処理および変換できます。データベースイベントのみを登録する変更ストリームとは異なり、Atlas Stream Processing は複数のデータイベント型を管理し、拡張データプロセシング機能を提供します。この機能の詳細については、 MongoDB AtlasドキュメントのAtlas Stream Processingを参照してください。
サンプル データ
このガイドの例では、 Atlas サンプル データセットのsample_restaurants.restaurantsコレクションを使用します。 MongoDB Atlas クラスターを無料で作成して、サンプル データセットをロードする方法については、 「 PyMongo を使い始める 」を参照してください。
変更ストリームを開く
変更ストリームを開くには、 watch()メソッドを呼び出します。 watch()メソッドを呼び出す インスタンスによって、変更ストリームがリッスンするイベントの範囲が決まります。 次のクラスでwatch()メソッドを呼び出すことができます。
MongoClient: MongoDB 配置のすべての変更を監視Database: データベース内のすべてのコレクションの変更を監視するにはCollection: コレクションの変更をモニターするには
次の例では、restaurantsコレクションの変更ストリームを開き、変更が発生に応じて出力します。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch() as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch() as stream: async for change in stream: print(change)
変更の監視を開始するには、アプリケーションを実行します。次に、別のアプリケーションまたはシェルで、restaurantsコレクションを変更します。 次の例では、nameフィールドの値が Blarney Castle であるドキュメントを更新します。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = await collection.update_one(query_filter, update_operation)
コレクションを更新すると、変更ストリーム アプリケーションは変更が発生に応じて出力します。 出力される変更イベントは、次のようになります。
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
変更ストリーム出力の変更
pipelineパラメータをwatch()メソッドに渡して、変更ストリーム出力を変更できます。 このパラメーターを使用すると、指定された変更イベントのみを監視できます。 パラメーターを、それぞれが集計ステージを表すオブジェクトのリストとして形式します。
pipelineパラメーターでは、次のステージを指定できます。
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
次の例では、pipeline パラメータを使用して、アップデート操作のみを記録する変更ストリームを開きます。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。
change_pipeline = { "$match": { "operationType": "update" }}, with collection.watch(pipeline=change_pipeline) as stream: for change in stream: print(change)
change_pipeline = { "$match": { "operationType": "update" }}, async with await collection.watch(pipeline=change_pipeline) as stream: async for change in stream: print(change)
変更ストリーム出力の変更の詳細については、MongoDB Server マニュアルの「 変更ストリーム出力 の変更 」セクションを参照してください。
watch() 動作を変更する
watch()メソッドは、操作を構成するために使用できるオプションを表す任意のパラメーターを受け入れます。 オプションを指定しない場合、ドライバーは操作をカスタマイズしません。
次の表では、 watch()の動作をカスタマイズするために設定できるオプションについて説明します。
プロパティ | 説明 |
|---|---|
| 変更ストリームの出力を変更する 集計パイプライン ステージ のリスト。 |
| ドキュメントに加えられた変更のみを表示するのではなく、変更後に完全なドキュメントを表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。 |
| ドキュメントに加えられた変更のみを表示するのではなく、変更前のドキュメント全体を表示するかどうかを指定します。 このオプションの詳細については、「変更前イメージと変更後イメージを含める」を参照してください。 |
|
|
|
|
|
|
| 空のバッチを返す前に、新しいデータ変更が変更ストリーム カーソルに報告されるまでサーバーが待機する最大時間(ミリ秒)。 デフォルトは1000ミリ秒です。 |
| MongoDB Server v 6.0以降、 変更ストリームは、 |
| MongoDB クラスターからのレスポンスの各バッチで返す変更イベントの最大数。 |
| 変更ストリーム カーソルに使用する照合。 |
|
|
| 操作に添付するコメント。 |
変更前と変更後のイメージを含めます
重要
配置で MongoDB v 6.0以降が使用されている場合にのみ、コレクションで変更前と変更後のイメージを有効にできます。
デフォルトでは、コレクションに対して操作を実行すると、対応する変更イベントにはその操作によって変更されたフィールドのデルタのみが含まれます。 full_document_before_changefull_document変更前または変更後の完全なドキュメントを表示するには、watch() メソッドで パラメータまたは パラメータを指定します。
変更前のイメージは、変更前のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更前のイメージを含めるには、 full_document_before_changeパラメータを次のいずれかの値に設定します。
whenAvailable: 変更イベントには、変更前のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更前のイメージが含まれます。required: 変更イベントには、変更イベント用に変更されたドキュメントの変更前のイメージが含まれます。 変更前のイメージが利用できない場合、ドライバーはエラーを発生させます。
変更後のイメージとは、変更後のドキュメントの完全なバージョンです。 変更ストリーム イベントに変更後のイメージを含めるには、 full_documentパラメータを次のいずれかの値に設定します。
updateLookup: 変更イベントには、変更後一定時間の変更されたドキュメント全体のコピーが含まれます。whenAvailable: 変更イベントには、変更後のイメージが利用可能な場合にのみ、 変更イベント 用の変更されたドキュメントの変更後のイメージが含まれます。required: 変更イベントには、変更イベントの変更されたドキュメントの変更後のイメージが含まれます。 変更後のイメージが利用できない場合、ドライバーはエラーを発生させます。
次の例では、コレクションで watch() メソッドを呼び出し、fullDocument パラメータを指定して更新されたドキュメントの変更後のイメージを含めます。対応するコードを表示するには、Synchronous タブまたは Asynchronousタブを選択します。
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch(full_document='updateLookup') as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch(full_document='updateLookup') as stream: async for change in stream: print(change)
変更ストリーム アプリケーションが実行されている場合、前述の更新例を使用してrestaurantsコレクション内のドキュメントを更新すると、次のような変更イベントが出力されます。
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens', 'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'}, 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
変更前と変更後のイメージの詳細については、Change Streams MongoDB Serverマニュアルの「 とドキュメントの変更 前イメージおよび変更後イメージ 」を参照してください。
詳細情報
Change Streams変更ストリームの詳細については、MongoDB Server マニュアルの 「 ストリーム」 を参照してください。
API ドキュメント
このガイドで説明したメソッドや型の詳細については、次の API ドキュメントを参照してください。