Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/ / /
Go Driver
/

Change Streams によるデータの監視

このガイドでは、変更ストリームを使用してデータベースへのリアルタイムの変更を監視する方法を学習できます。 変更ストリームは、アプリケーションがコレクション、データベース、または配置のデータ変更をサブスクライブできる MongoDB Server の機能です。

変更ストリームは新しい変更イベントを出力し、リアルタイムのデータ変更にアクセスできるようにします。 コレクション、データベース、またはクライアント オブジェクトに対して変更ストリームを開くことができます。

このガイドの例では、 coursesコレクション内のドキュメントのモデルとして、次の Course構造体を使用します。

type Course struct {
Title string
Enrollment int32
}

このガイドの例を実行するには、次のスニペットを使用して、これらのドキュメントをdbデータベースのcoursesコレクションにロードします。

coll := client.Database("db").Collection("courses")
docs := []interface{}{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

Tip

存在しないデータベースとコレクション

書き込み操作を実行するときに必要なデータベースとコレクションが存在しない場合は、サーバーが暗黙的にそれらを作成します。

各ドキュメントには、各ドキュメントの フィールドとtitle enrollmentフィールドに対応する、コース名と最大登録者数を含む大学コースの説明が含まれています。

注意

各出力例では、切り捨てられた_dataclusterTimeObjectIDの値が表示されます。これらの値はドライバーが独自に生成するためです。

次のオブジェクトで Watch() メソッドを使用して、 MongoDBの変更を監視できます。

  • コレクション : 特定のコレクションの変更を監視

  • データベース:データベース内のすべてのコレクションに対する変更を監視

  • MongoClient : すべてのデータベースの変更を監視

各オブジェクトに対して、 Watch()メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。

Watch() メソッドにはコンテキスト パラメータとパイプラインパラメータが必要です。すべての変更を返すには、空の Pipelineオブジェクトを渡します。

オプションとして、Watch() メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。

次の例では、coursesコレクションの変更ストリームを開き、変更ストリームイベントを発生に応じて出力します。

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

別のプログラムまたは shell でcoursesコレクションを変更すると、このコードは変更が発生に応じて出力します。 title値が"Advanced Screenwriting"で、かつenrollment値が20であるドキュメントを挿入すると、次の変更イベントが発生します。

map[_id:map[_data:...] clusterTime: {...}
documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...")
enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db]
operationType:insert]

完全に実行可能な例を表示するには、このガイドの「 変更ストリームを開く例: 完全なファイル 」セクションを参照してください。

変更ストリーム出力を変更するには、 パイプライン パラメーターを使用します。 このパラメーターを使用すると、特定の変更イベントのみを監視できます。 パイプライン パラメータをドキュメントの配列として形式し、各ドキュメントは集計ステージを表します。

このパラメーターでは、次のパイプライン ステージを使用できます。

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

次の例では、 dbデータベースで変更ストリームを開きますが、新しい削除操作のみを監視します。

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

注意

Watch()メソッドはdbデータベースで呼び出されたため、コードはこのデータベース内の任意のコレクションに対する新しい削除操作を出力します。

注意

セットアップ例

この例では、接続 URI を使用してMongoDBのインスタンスに接続します。MongoDBインスタンスへの接続の詳細については、「 MongoClient の作成ガイド 」を参照してください。この例では、Atlasサンプルデータセットに含まれる sample_restaurantsデータベースの restaurantsコレクションも使用します。Atlas を使い始める」ガイドに従って、 MongoDB Atlasの無料階層のデータベースにロードできます。

次の例では、 restaurantsコレクションの変更ストリームを開き、挿入されたドキュメントを出力します。

coll := client.Database("sample_restaurants").Collection("restaurants")
// Creates instructions to watch for insert operations
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
// Creates a change stream that receives change events
cs, err := coll.Watch(context.TODO(), pipeline)
if err != nil {
panic(err)
}
defer cs.Close(context.TODO())
fmt.Println("Waiting For Change Events. Insert something in MongoDB!")
// Prints a message each time the change stream receives an event
for cs.Next(context.TODO()) {
var event bson.M
if err := cs.Decode(&event); err != nil {
panic(err)
}
output, err := json.MarshalIndent(event["fullDocument"], "", " ")
if err != nil {
panic(err)
}
fmt.Printf("%s\n", output)
}
if err := cs.Err(); err != nil {
panic(err)
}

完全に実行可能な例を表示します。

完全な例を実行した後、別のシェルでドキュメントを挿入する の完全ファイル例を実行します。挿入操作を実行すると、次のような出力が表示されます。

// results truncated {
"_id": ..., "name": "8282", "cuisine": "Korean"
}

重要

この 使用例の操作を終了したら、ターミナルを閉じてシャットダウンしてください。

Watch()メソッドの動作を変更するには、 optionsパラメーターを使用します。

Watch()メソッドでは次のオプションを指定できます。

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • Custom

  • CustomPipeline

これらのオプションの詳細については、 db を参照してください。コレクション.watch()サーバー マニュアルのエントリがない場合

コレクションに対して CRUD 操作を実行すると、デフォルトでは、対応する変更イベント ドキュメントには、操作によって変更されたフィールドのデルタのみが含まれます。 Watch()メソッドのoptionsパラメータで 設定を指定すると、デルタに加えて、変更の前と変更後の完全なドキュメントを表示できます。

変更後のドキュメントの完全なバージョンであるドキュメントの 変更後のイメージ を表示するには、options パラメータの FullDocumentフィールドを次のいずれかの値に設定します。

  • UpdateLookup: 変更イベント ドキュメントには、変更されたドキュメント全体のコピーが含まれます。

  • WhenAvailable: 変更イベント ドキュメントには、変更イベント用に変更されたドキュメントの変更後のイメージが利用可能な場合、その変更が含まれます。

  • Required: 出力はWhenAvailableの と同じですが、変更後のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。

FullDocumentBeforeChange変更前のドキュメントの完全なバージョンであるドキュメントの変更前のイメージを表示するには、 パラメータのoptions フィールドを次のいずれかの値に設定します。

  • WhenAvailable: 変更イベント ドキュメントには、変更前のイメージが使用可能な場合、変更されたドキュメントの変更前のイメージが含まれます。

  • Required: 出力はWhenAvailableの と同じですが、変更前のイメージが利用できない場合、ドライバーはサーバー側のエラーを発生させます。

重要

ドキュメントの変更前イメージと変更後イメージにアクセスするには、コレクションに対して changeStreamPreAndPostImages を有効にする必要があります。手順と詳細については、 MongoDB Serverマニュアルの collMod データベースコマンドガイドの「変更ストリーム」セクションを参照してください。

注意

挿入されたドキュメントには変更前のイメージはなく、削除されたドキュメントには変更後のイメージはありません。

次の例では、 coursesコレクションでWatch()メソッドを呼び出します。 optionsパラメータのFullDocumentフィールドに値を指定すると、変更されたフィールドのみではなく、変更されたドキュメント全体のコピーが出力されます。

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

"World Fiction"titleを使用してドキュメントのenrollment値を35から30に更新すると、次の変更イベントが発生します。

{"_id": {"_data": "..."},"operationType": "update","clusterTime":
{"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}},
"ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}},
"updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

FullDocumentオプションを指定しない場合、同じアップデート操作では変更イベント ドキュメントに"fullDocument"値が出力されなくなります。

変更ストリームの詳細については、サーバー マニュアルのChange Streamsを参照してください。

Watch()メソッドの詳細については、次の API ドキュメントを参照してください。

戻る

ログ記録

項目一覧