AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

既存のデータのコピー

これらの使用例は、 MongoDB Kafkaソースコネクタを使用して、 MongoDBからApache Kafkaトピックにデータをコピーする方法を示しています。

次の例は、単一のコレクションまたは複数のコレクションから既存のデータをコピーするようにソースコネクタを構成する方法を示しています。

MongoDB コレクションを Apache Kafka にコピーし、いくつかのデータをフィルタリングするとします。

要件とソリューションは次のとおりです。

要件
解決法

MongoDB 配置内のshoppingデータベースのcustomersコレクションを Apache Kafka トピックにコピーします。

このガイドの「データのコピー 」セクションを参照してください。

countryフィールドに "Mexco" の値があるドキュメントのみをコピーします。

このガイドの「フィルター データ」セクションを参照してください。

customers コレクションには次のドキュメントが含まれます。

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

ソース コネクタで次の構成オプションを指定して、 shoppingデータベースのcustomersコレクションの内容をコピーします。

database=shopping
collection=customers
startup.mode=copy_existing

ソース コネクタは、コレクションへの各ドキュメントの挿入を説明する変更イベント ドキュメントを作成してコレクションをコピーします。

注意

データコピーは重複したイベントを生成する可能性があります

ソースコネクタが既存のデータを変換するときに、いずれかのシステムがデータベース内のデータを変更した場合、 MongoDB は最新の変更を反映するために重複した変更ストリームイベントを生成することがあります。データコピーが依存する変更ストリームイベントが冪等であるため、コピーされたデータは結果整合性があり、"少なくとも 1 回" の配信保証に準拠します。

変更イベント ドキュメントの詳細については、 Change Streamsのガイドを参照してください。

startup.modeオプションの詳細については、「起動プロパティ 」を参照してください。

ソース コネクタ構成のstartup.mode.copy.existing.pipelineオプションで集計パイプラインを指定することで、データをフィルタリングできます。 次の構成では、 countryフィールドに「Mexco」が含まれるすべてのドキュメントに一致する集計パイプラインを指定します。

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

startup.mode.copy.existing.pipelineオプションの詳細については、「起動プロパティ 」を参照してください。

集計パイプラインの詳細については、次のリソースを参照してください。

customersコレクションをコピーするための最終的なソース コネクタ構成は次のようになります。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

コネクタがデータをコピーすると、 shopping.customers Apache Kafka トピックには、前述のサンプル コレクションに対応する次の変更イベント ドキュメントが表示されます。

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

注意

トピック内のデータをコレクションに書き込む

変更データキャプチャ ハンドラー を使用して、Apache Kafka トピック内の変更イベントドキュメントを MongoDB 書込み (write) 操作に変換します。 詳細については、「変更データ キャプチャ ハンドラーのガイド」を参照してください。

次のドキュメントを含む products という名前の shoppingデータベース内の別のコレクションからデータをコピーするとします。

{
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
}

次のコードに示すように、startup.mode.copy.existing.namespace.regex 構成設定を使用して、customers コレクションと products コレクションの両方からコピーできます。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
startup.mode=copy_existing
startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$

前述のセクション で説明したshopping.customers Apache Kafkaトピック内の変更イベントドキュメントに加えて、shopping.products トピックには次のドキュメントを参照できます。

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
},
"ns": { "db": "shopping", "coll": "products" }
}

Tip

startup.mode.copy.existing.namespace.regex 設定の詳細については、 スタートアップ プロパティガイドの 設定 表を参照してください。