Overview
このガイドでは、変更ストリームを使用してドキュメントの変更を監視する方法を学習できます。
変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。
サンプル データ
このガイドの例では、 coursesコレクション内のドキュメントのモデルとして、次の Course構造体を使用します。
type Course struct { Title string Enrollment int32 }
このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdbデータベースのcoursesコレクションにロードします。
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Tip
存在しないデータベースとコレクション
書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。
各ドキュメントには、各ドキュメントの フィールドとtitle enrollmentフィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。
注意
各出力例では、切り捨てられた_data 、 clusterTime 、 ObjectIDの値が表示されます。これらの値はドライバーが独自に生成するためです。
変更ストリームを開く
変更ストリームを開くには、 Watch()メソッドを使用します。 Watch()メソッドにはコンテキスト パラメータとパイプライン パラメータが必要です。 すべての変更を返すには、空のPipelineオブジェクトを渡します。
例
次の例では、 coursesコレクションの変更ストリームを開き、すべての変更を出力します。
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
別のプログラムまたは shell でcoursesコレクションを変更すると、このコードは変更が発生に応じて出力します。 title値が"Advanced Screenwriting"で、かつenrollment値が20であるドキュメントを挿入すると、次の変更イベントが発生します。
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
変更ストリーム出力の変更
変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。
このパラメーターでは、次のパイプライン ステージを使用できます。
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
例
次の例では、 dbデータベースで変更ストリームを開きますが、新しい削除操作のみを監視します。
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
注意
Watch()メソッドはdbデータベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。
の動作を変更する Watch()
Watch()メソッドの動作を変更するには、 optionsパラメーターを使用します。
Watch()メソッドでは次のオプションを指定できます。
ResumeAfterStartAfterFullDocumentFullDocumentBeforeChangeBatchSizeMaxAwaitTimeCollationStartAtOperationTimeCommentShowExpandedEventsStartAtOperationTimeCustomCustomPipeline
これらのオプションの詳細については、 MongoDB Server マニュアルをご覧ください。
変更前と変更後のイメージ
コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()メソッドのoptionsパラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。
ドキュメントの変更後のイメージ、変更後のドキュメントの完全なバージョンを表示するには、 optionsパラメータのFullDocumentフィールドを次のいずれかの値に設定します。
UpdateLookup: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。WhenAvailable: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。Required: 出力はWhenAvailableの と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
変更前のドキュメントの変更前のイメージ、つまりドキュメントの完全なバージョンを表示するには、 optionsパラメータのFullDocumentBeforeChangeフィールドを次のいずれかの値に設定します。
WhenAvailable: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。Required: 出力はWhenAvailableの と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。
重要
ドキュメントの変更前と変更後のイメージにアクセスするには、コレクションに対してchangeStreamPreAndPostImagesを有効にする必要があります。 手順と詳細については、 MongoDB Server マニュアルを参照してください。
注意
挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。
例
次の例では、 coursesコレクションでWatch()メソッドを呼び出します。 optionsパラメータのFullDocumentフィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
"World Fiction"のtitleを使用してドキュメントのenrollment値を35から30に更新すると、次の変更イベントが発生します。
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
FullDocumentオプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"値が出力されなくなります。
詳細情報
変更ストリームの実行可能な例については、「 変更ストリームを開く例 」を参照してください。
変更ストリームの詳細については、「 Change Streamsストリーム 」を参照してください。
API ドキュメント
Watch()メソッドの詳細については、次の API ドキュメント リンクをご覧ください。