AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

変更データ キャプチャ ハンドラーによるデータの複製

このチュートリアルに従って、変更データ キャプチャ(CDC)ハンドラーを使用して MongoDB Kafka Connector でデータを複製する方法を学びます。 CDC ハンドラーは、CDC イベントを MongoDB への書込み (write) 操作に変換するアプリケーションです。 あるデータストアの変更を別のデータストアに複製する必要がある場合は、 CDC ハンドラーを使用します。

このチュートリアルでは、MongoDB Kafka ソース コネクタと シンク コネクタを構成して実行し、CDC を使用して 2 つの MongoDB コレクションに同じドキュメントを含めます。 Source Connector は元のコレクションの変更ストリーム データを Kafka トピックに書込み、Sink Connector は Kafka トピック データをターゲットの MongoDB コレクションに書込みます。

CDC ハンドラーの動作方法について詳しくは、「変更データ キャプチャ ハンドラー」のガイドを参照してください。

1

Kafka コネクタチュートリアル設定 の手順を完了して、Confluent Kafka Connect とMongoDB環境を起動します。

2

個別の のDocker コンテナ上で 2 つの対話型 shellWindows を起動します。チュートリアルでは、shell を使用してさまざまなタスクを実行、観察できます。

対話型 shell を起動するには、ターミナルから次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をCDCShell1として参照します。

対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。

docker exec -it mongo1 /bin/bash

このチュートリアル全体では、このインタラクティブ shell をCDCShell2として参照します。

画面上の 2 つのWindowsを配置して、両方が表示されるようにし、リアルタイム更新を確認します。

CDCShell1を使用して connector を構成し、 Kafka トピックを監視します。 CDCShell2を使用して、MongoDB で書込み (write) 操作を実行します。

3

CDCShell1で、 CDCTutorial.Source MongoDB 名前空間から読み取り、 CDCTutorial.Source Kafka トピックに書込むようにソース コネクタを構成します。

次のコマンドを使用して、 cdc-source.jsonという構成ファイルを作成します。

nano cdc-source.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

作成した構成ファイルを使用してソース コネクタを起動するには、 CDCShell1で次のコマンドを実行します。

cx cdc-source.json

注意

cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

ソース コネクタが正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1で、 CDCTutorial.Source Kafka トピックからCDCTutorial.Destination MongoDB 名前空間にデータをコピーするように Sink Connector を構成します。

次のコマンドを使用して、 cdc-sink.jsonという構成ファイルを作成します。

nano cdc-sink.json

以下の構成情報を ファイルに貼り付け、変更を保存します。

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。

cx cdc-sink.json

コネクタのステータスを確認するには、shell で次のコマンドを実行します。

status

Sink Connector が正常に起動すると、次の出力が表示されます。

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1で、受信イベントの Kafka トピックを監視します。 次のコマンドを実行してkafkacatアプリケーションを起動し、トピックに公開されたデータを出力します。

kc CDCTutorial.Source

注意

kcコマンドは、 Kafka に接続し、指定されたトピックの出力をフォーマットするためのオプションを持つkafkacatアプリケーションを呼び出す、チュートリアル開発環境に含まれるカスタム スクリプトです。

起動すると、次の出力が表示されます。これは、現在読み込むデータがないことを示しています。

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2で、次のコマンドを実行して MongoDB shell であるmongoshを使用して MongoDB に接続します。

mongosh "mongodb://mongo1"

正常に接続すると、次の MongoDB shell プロンプトが表示されます。

rs0 [direct: primary] test>

プロンプトで次のコマンドを入力して、新しいドキュメントをCDCTutorial.Source MongoDB 名前空間に挿入します。

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

ソース コネクタは変更を取得し、 Kafka トピックに公開します。 CDCShell1ウィンドウに次のトピック メッセージが表示されます。

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

Sink Connector は Kafka メッセージを選択し、データを MongoDB にシンクします。 CDCShell2で起動した MongoDB shell で次のコマンドを実行すると、MongoDB のCDCTutorial.Destination名前空間からドキュメントを検索できます。

db.Destination.find()

次のドキュメントが結果として返されます。

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

MongoDB shell から次のコマンドを実行して、 CDCTutorial.Source名前空間からドキュメントを削除してみてください。

db.Source.deleteMany({})

CDCShell1ウィンドウに次のトピック メッセージが表示されます。

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

コレクション内の現在のドキュメント数を取得するには、次のコマンドを実行します。

db.Destination.count()

これにより、コレクションが空であることを示す次の出力が返されます。

0

次のコマンドを実行して、MongoDB shell を終了します。

exit

このチュートリアルでは、ソース コネクタを設定して、MongoDB コレクションに対する変更をキャプチャし、Apache Kafka に送信します。 また、MongoDB CDC ハンドラーで Sink Connector を構成し、Apache Kafka から MongoDB コレクションにデータを移動しました。

このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。