Overview
このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。
変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。
サンプル データ
このガイドの例では、 courses
コレクション内のドキュメントのモデルとして、次の Course
構造体を使用します。
type Course struct { Title string Enrollment int32 }
このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdb
データベースのcourses
コレクションにロードします。
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
存在しないデータベースとコレクション
書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。
各ドキュメントには、各ドキュメントの フィールドとtitle
enrollment
フィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。
注意
各出力例では、切り捨てられた_data
、 clusterTime
、 ObjectID
の値が表示されます。これらの値はドライバーが独自に生成するためです。
変更ストリームを開く
次のオブジェクトで Watch()
メソッドを使用して、 MongoDBの変更を監視できます。
コレクション : 特定のコレクションの変更を監視
データベース:データベース内のすべてのコレクションに対する変更を監視
MongoClient : すべてのデータベースの変更を監視
各オブジェクトに対して、 Watch()
メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。
Watch()
メソッドにはコンテキスト パラメータとパイプラインパラメータが必要です。すべての変更を返すには、空の Pipeline
オブジェクトを渡します。
オプションとして、Watch()
メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。
例
次の例では、courses
コレクションの変更ストリームを開き、変更ストリームイベントを発生に応じて出力します。
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
別のプログラムまたは shell でcourses
コレクションを変更すると、このコードは変更が発生に応じて出力します。 title
値が"Advanced Screenwriting"
で、かつenrollment
値が20
であるドキュメントを挿入すると、次の変更イベントが発生します。
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
完全に実行可能な例を表示するには、このガイドの「 変更ストリームを開く例: 完全なファイル 」セクションを参照してください。
フィルター変更イベント
変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。
このパラメーターでは、次のパイプライン ステージを使用できます。
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
次の例では、 db
データベースで変更ストリームを開きますが、新しい削除操作のみを監視します。
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
Watch()
メソッドはdb
データベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。
変更ストリームを開く例:完全なファイル
注意
セットアップ例
この例では、接続 URI を使用してMongoDBのインスタンスに接続します。MongoDBインスタンスへの接続の詳細については、「 MongoClient の作成ガイド 」を参照してください。この例では、Atlasサンプルデータセットに含まれる sample_restaurants
データベースの restaurants
コレクションも使用します。「Atlas を使い始める」ガイドに従って、 MongoDB Atlasの無料階層のデータベースにロードできます。
次の例では、 restaurants
コレクションの変更ストリームを開き、挿入されたドキュメントを出力します。
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
期待される結果
完全な例を実行した後、別のシェルでドキュメントを挿入する の完全ファイル例を実行します。挿入操作を実行すると、次のような出力が表示されます。
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
重要
この 使用例の操作を終了したら、ターミナルを閉じてシャットダウンしてください。
変更ストリーム オプションの構成
Watch()
メソッドの動作を変更するには、 options
パラメーターを使用します。
Watch()
メソッドでは次のオプションを指定できます。
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
Custom
CustomPipeline
これらのオプションの詳細については、 db を参照してください。コレクション.watch()サーバー マニュアルのエントリがない場合
変更前と変更後のイメージ
コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()
メソッドのoptions
パラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。
変更後のドキュメントの完全なバージョンであるドキュメントの 変更後のイメージ を表示するには、options
パラメータの FullDocument
フィールドを次のいずれかの値に設定します。
UpdateLookup
: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。WhenAvailable
: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。Required
: 出力はWhenAvailable
の と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
FullDocumentBeforeChange
変更前のドキュメントの完全なバージョンであるドキュメントの変更前のイメージを表示するには、 パラメータのoptions
フィールドを次のいずれかの値に設定します。
WhenAvailable
: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。Required
: 出力はWhenAvailable
の と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
重要
ドキュメントの変更前イメージと変更後イメージにアクセスするには、コレクションに対して changeStreamPreAndPostImages
を有効にする必要があります。手順と詳細については、 MongoDB Serverマニュアルの collMod データベースコマンドガイドの「変更ストリーム」セクションを参照してください。
注意
挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。
例
次の例では、 courses
コレクションでWatch()
メソッドを呼び出します。 options
パラメータのFullDocument
フィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
"World
Fiction"
のtitle
を使用してドキュメントのenrollment
値を35
から30
に更新すると、次の変更イベントが発生します。
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
FullDocument
オプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"
値が出力されなくなります。
詳細情報
変更ストリームの詳細については、サーバー マニュアルのChange Streamsを参照してください。
API ドキュメント
Watch()
メソッドの詳細については、次の API ドキュメントを参照してください。