変更ストリームを開く
次のオブジェクトの watch()
メソッドを使用して、MongoDB の変更を監視できます。
各オブジェクトに対して、 watch()
メソッドは変更ストリームを開き、変更が発生したときに変更イベントドキュメントを発行します。
オプションとして、watch()
メソッドは複数の集約ステージの配列で構成される集約パイプラインを最初のパラメーターとして取ることができます。集計ステージでは、変更イベントをフィルタリングして変換します。
次のスニペットでは、$match
ステージで runtime
の値が 15 未満のすべての変更イベントドキュメントが照合され、その他のドキュメントがすべて除外されます。
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
watch()
メソッドは、2 番目のパラメーターとしてoptions
オブジェクトを受け入れます。このオブジェクトで構成できる設定の詳細については、このセクションの最後にあるリンクを参照してください。
watch()
メソッドは ChangeStream のインスタンスを返します。変更ストリームを反復処理したり、イベントをリスニングしたりすることで、変更ストリームからイベントを読み取ることができます。
警告
EventEmitter
モードと Iterator
モードでのChangeStream
の使用は、ドライバーでサポートされていないため、エラーが発生します。これは、ドキュメントを最初に受け取るコンシューマーをドライバー側で保証できない場合の未定義の動作を防ぐためのものです。
変更ストリームからイベントを読み取る方法を示すタブを選択します。
バージョン 4.12 以降、 ChangeStream
オブジェクトは非同期反復可能オブジェクトになります。この変更により、 for-await
ループを使用して、開いている変更ストリームからイベントを取得できるようになります。
for await (const change of changeStream) { console.log("Received change: ", change); }
次のようなChangeStream
オブジェクト上のメソッドを呼び出すことができます。
hasNext()
ストリーム内の残りのドキュメントを確認するnext()
ストリーム内の次のドキュメントをリクエストするclose()
ChangeStream を閉じる
警告
Node.jsドライバー v6.19 以前では、tryNext() メソッドは変更ストリームのresumeToken を自動的に更新しません。更新された resumeToken
が必要な場合は、next()
メソッドを使用するか、ドライバー v6 にアップグレードしてください。20以降に更新します。
on()
メソッドを呼び出すことによって、ChangeStream
オブジェクトに listner 関数をアタッチできます。このメソッドは、Javascript EventEmitter
クラスから継承されます。以下のように、最初のパラメーターとして文字列"change"
を渡し、2 番目のパラメーターとして listner 関数を渡します。
changeStream.on("change", (changeEvent) => { /* your listener function */ });
リスナー関数は、 change
イベントが発生したときにトリガーされます。変更イベント ドキュメントを受信時に処理するよう、リスナー内でロジックを指定できます。
pause()
を呼び出してイベントの発行を停止するか、 resume()
呼び出してイベントの発行を継続することで、変更ストリームを制御できます。
変更イベントの処理を停止するには、ChangeStream
インスタンスで close() メソッドを呼び出します。これにより、変更ストリームが閉じられ、リソースが解放されます。
changeStream.close();
例
反復
次の例ではinsertDB
データベースのhaikus
コレクションの変更ストリームがオープンされ、変更イベントが発生に応じて出力されます。
注意
この例を使用して、MongoDB のインスタンスに接続し、サンプルデータを含むデータベースと交流できます。MongoDB インスタンスへの接続とサンプルデータセットの読み込みの詳細については、 使用例ガイドを参照してください。
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
注意
同一のコードスニペット
上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。
このコードを実行してから、挿入または削除操作の実行など、 haikus
コレクションに変更を加えると、変更イベント ドキュメントがターミナルに出力されます。
たとえば、コレクションにドキュメントを挿入すると、コードによって次の内容が出力されます。
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
注意
更新から完全なドキュメントを受け取る
アップデート操作に関する情報を含む変更イベントは、デフォルトではアップデートされたドキュメント全体ではなく、変更されたフィールドのみを返します。次のように、オプションオブジェクトの fullDocument
フィールドを "updateLookup"
に設定することで、ドキュメントの最新バージョンも返すように変更ストリームを設定できます。
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
リスナー関数
次の例では、 insertDB
データベースのhaikus
コレクションの変更ストリームを開きます。コレクションで発生する変更イベントを受信して出力するリスナー関数を作成します。
まず、コレクションで変更ストリームを開き、on()
メソッドを使用して、変更ストリーム上でリスナーを定義します。リスナーを設定したら、コレクションに対する変更を実行して変更イベントを生成します。
コレクション上で変更イベントを生成するために、insertOne()
メソッドを使用して新しいドキュメントを追加します。insertOne()
がリスナー関数の登録より先に実行される可能性があるため、simulateAsyncPause
と定義したタイマーを使用し、1 秒間待ってから挿入を実行します。
また、ドキュメント挿入後に simulateAsyncPause
も使用します。これにより、close()
メソッドによって ChangeStream
インスタンスが閉じる前に、リスナー関数が変更イベントを受け取って処理を完了するための十分な時間を確保できます。
注意
タイマーを使用する理由
この例で使用されているタイマーは、あくまでもデモンストレーションのためのものです。これにより、リスナーの登録と、終了前のリスナーによる変更イベントの処理に十分な時間を確保できます。
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
注意
同一のコードスニペット
上記の JavaScript と TypeScript のコード スニペットは同一です。このユースケースに関連するドライバーの TypeScript 固有の機能はありません。
このページで説明されているクラスとメソッドの詳細については、次のリソースを参照してください。