Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。
サンプル データ
このガイドの例では、 coursesコレクション内のドキュメントのモデルとして、次の Course構造体を使用します。
type Course struct { Title string Enrollment int32 }
このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdbデータベースのcoursesコレクションにロードします。
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
存在しないデータベースとコレクション
書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。
各ドキュメントには、各ドキュメントの フィールドとtitle enrollmentフィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。
注意
各出力例では、切り捨てられた_data 、 clusterTime 、 ObjectIDの値が表示されます。これらの値はドライバーが独自に生成するためです。
変更ストリームを開く
次のオブジェクトで Watch() メソッドを使用して、 MongoDBの変更を監視できます。
コレクション : 特定のコレクションの変更を監視
データベース:データベース内のすべてのコレクションに対する変更を監視
MongoClient : すべてのデータベースの変更を監視
各オブジェクトに対して、 Watch()メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。
Watch() メソッドにはコンテキスト パラメータとパイプラインパラメータが必要です。すべての変更を返すには、空の Pipelineオブジェクトを渡します。
オプションとして、Watch() メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。
例
次の例では、coursesコレクションの変更ストリームを開き、変更ストリームイベントを発生に応じて出力します。
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
別のプログラムまたは shell でcoursesコレクションを変更すると、このコードは変更が発生に応じて出力します。 title値が"Advanced Screenwriting"で、かつenrollment値が20であるドキュメントを挿入すると、次の変更イベントが発生します。
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
完全に実行可能な例を表示するには、このガイドの「 変更ストリームを開く例: 完全なファイル 」セクションを参照してください。
フィルター変更イベント
変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。
このパラメーターでは、次のパイプライン ステージを使用できます。
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
次の例では、 dbデータベースで変更ストリームを開きますが、新しい削除操作のみを監視します。
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
Watch()メソッドはdbデータベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。
変更ストリームを開く例:完全なファイル
注意
セットアップ例
この例では、接続 URI を使用してMongoDBのインスタンスに接続します。MongoDBインスタンスへの接続の詳細については、「 MongoClient の作成ガイド 」を参照してください。この例では、Atlasサンプルデータセットに含まれる sample_restaurantsデータベースの restaurantsコレクションも使用します。「Atlas を使い始める」ガイドに従って、 MongoDB Atlasの無料階層のデータベースにロードできます。
次の例では、 restaurantsコレクションの変更ストリームを開き、挿入されたドキュメントを出力します。
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
期待される結果
完全な例を実行した後、別のシェルで ドキュメントを挿入する の完全ファイル例を実行します。挿入操作を実行すると、次のような出力が表示されます。
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
重要
この 使用例の操作を終了したら、ターミナルを閉じてシャットダウンしてください。
変更ストリーム オプションの構成
Watch()メソッドの動作を変更するには、 optionsパラメーターを使用します。
Watch()メソッドでは次のオプションを指定できます。
ResumeAfterStartAfterFullDocumentFullDocumentBeforeChangeBatchSizeMaxAwaitTimeCollationStartAtOperationTimeCommentShowExpandedEventsCustomCustomPipeline
これらのオプションの詳細については、 db を参照してください。コレクション.watch()サーバー マニュアルのエントリがない場合
変更前と変更後のイメージ
コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()メソッドのoptionsパラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。
変更後のドキュメントの完全なバージョンであるドキュメントの 変更後のイメージ を表示するには、options パラメータの FullDocumentフィールドを次のいずれかの値に設定します。
UpdateLookup: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。WhenAvailable: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。Required: 出力はWhenAvailableの と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
FullDocumentBeforeChange変更前のドキュメントの完全なバージョンであるドキュメントの変更前のイメージを表示するには、 パラメータのoptions フィールドを次のいずれかの値に設定します。
WhenAvailable: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。Required: 出力はWhenAvailableの と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
重要
ドキュメントの変更前イメージと変更後イメージにアクセスするには、コレクションに対して changeStreamPreAndPostImages を有効にする必要があります。手順と詳細については、 MongoDB Serverマニュアルの collMod データベースコマンドガイドの「変更ストリーム」セクションを参照してください。
注意
挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。
例
次の例では、 coursesコレクションでWatch()メソッドを呼び出します。 optionsパラメータのFullDocumentフィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
"World
Fiction"のtitleを使用してドキュメントのenrollment値を35から30に更新すると、次の変更イベントが発生します。
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
FullDocumentオプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"値が出力されなくなります。
詳細情報
変更ストリームの詳細については、サーバー マニュアルのChange Streamsを参照してください。
API ドキュメント
Watch()メソッドの詳細については、次の API ドキュメントを参照してください。