これらの使用例は、 MongoDB Kafkaソースコネクタを使用して、 MongoDBからApache Kafkaトピックにデータをコピーする方法を示しています。
例
次の例は、単一のコレクションまたは複数のコレクションから既存のデータをコピーするようにソースコネクタを構成する方法を示しています。
コレクション データのコピーとフィルタリング
MongoDB コレクションを Apache Kafka にコピーし、いくつかのデータをフィルタリングするとします。
要件とソリューションは次のとおりです。
要件 | 解決法 |
---|---|
MongoDB 配置内の | See the Copy Data section of this guide. |
| See the Filter Data section of this guide. |
customers
コレクションには次のドキュメントが含まれます。
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
データのコピー
ソース コネクタで次の構成オプションを指定して、 shopping
データベースのcustomers
コレクションの内容をコピーします。
database=shopping collection=customers startup.mode=copy_existing
ソース コネクタは、コレクションへの各ドキュメントの挿入を説明する変更イベント ドキュメントを作成してコレクションをコピーします。
注意
データコピーは重複したイベントを生成する可能性があります
ソース コネクタが既存のデータを変換するときに、いずれかのシステムがデータベース内のデータを変更した場合、MongoDB は最新の変更を反映するために重複した変更ストリーム イベントを生成することがあります。 データコピーが依存する変更ストリーム イベントは偶数であるため、コピーされたデータは結果整合性があります。
変更イベント ドキュメントの詳細については、 Change Streamsのガイドを参照してください。
startup.mode
オプションの詳細については、「起動プロパティ 」を参照してください。
フィルター データ
ソース コネクタ構成のstartup.mode.copy.existing.pipeline
オプションで集計パイプラインを指定することで、データをフィルタリングできます。 次の構成では、 country
フィールドに「Mexco」が含まれるすべてのドキュメントに一致する集計パイプラインを指定します。
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
startup.mode.copy.existing.pipeline
オプションの詳細については、「起動プロパティ 」を参照してください。
集計パイプラインの詳細については、次のリソースを参照してください。
MongoDB マニュアルの集計。
構成を指定する
customers
コレクションをコピーするための最終的なソース コネクタ構成は次のようになります。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
コネクタがデータをコピーすると、 shopping.customers
Apache Kafka トピックには、前述のサンプル コレクションに対応する次の変更イベント ドキュメントが表示されます。
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
注意
トピック内のデータをコレクションに書き込む
変更データキャプチャ ハンドラー を使用して、Apache Kafka トピック内の変更イベントドキュメントを MongoDB 書込み (write) 操作に変換します。 詳細については、「変更データ キャプチャ ハンドラーのガイド」を参照してください。
複数のソースからのデータのコピー
次のドキュメントを含む products
という名前の shopping
データベース内の別のコレクションからデータをコピーするとします。
{ "_id": 1, "item_name": "lipstick", "department": "cosmetics", "quantity": 45 }
次のコードに示すように、startup.mode.copy.existing.namespace.regex
構成設定を使用して、customers
コレクションと products
コレクションの両方からコピーできます。
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping startup.mode=copy_existing startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$
前述のセクション で説明したshopping.customers
Apache Kafkaトピック内の変更イベントドキュメントに加えて、shopping.products
トピックには次のドキュメントを参照できます。
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "item_name": "lipstick", "department": "cosmetics", "quantity": 45 }, "ns": { "db": "shopping", "coll": "products" } }
Tip
startup.mode.copy.existing.namespace.regex
設定の詳細については、 スタートアップ プロパティガイドの 設定 表を参照してください。