Overview
このチュートリアルに従って、変更データ キャプチャ(CDC)ハンドラーを使用して MongoDB Kafka Connector でデータを複製する方法を学びます。 CDC ハンドラーは、CDC イベントを MongoDB への書込み (write) 操作に変換するアプリケーションです。 あるデータストアの変更を別のデータストアに複製する必要がある場合は、 CDC ハンドラーを使用します。
このチュートリアルでは、MongoDB Kafka ソース コネクタと シンク コネクタを構成して実行し、CDC を使用して 2 つの MongoDB コレクションに同じドキュメントを含めます。 Source Connector は元のコレクションの変更ストリーム データを Kafka トピックに書込み、Sink Connector は Kafka トピック データをターゲットの MongoDB コレクションに書込みます。
CDC ハンドラーの動作方法について詳しくは、 変更データ キャプチャ ハンドラーのガイドをご覧ください。
CDC ハンドラーによるデータの複製
チュートリアル設定を完了する
Kafka Connector チュートリアル設定の手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
インタラクティブ shell の開始
個別の のDocker コンテナ上で 2 つの対話型 shellWindows を起動します。チュートリアルでは、shell を使用してさまざまなタスクを実行、観察できます。
対話型 shell を起動するには、ターミナルから次のコマンドを実行します。
docker exec -it mongo1 /bin/bash 
このチュートリアル全体では、このインタラクティブ shell をCDCShell1として参照します。
対話型 shell を起動するには、2 つ目のターミナルで次のコマンドを実行します。
docker exec -it mongo1 /bin/bash 
このチュートリアル全体では、このインタラクティブ shell をCDCShell2として参照します。
画面上の 2 つのWindowsを配置して、両方が表示されるようにし、リアルタイム更新を確認します。
CDCShell1を使用して connector を構成し、 Kafka トピックを監視します。 CDCShell2を使用して、MongoDB で書込み (write) 操作を実行します。
Source Connector の構成
CDCShell1で、  CDCTutorial.Source MongoDB 名前空間から読み取り、 CDCTutorial.Source Kafka トピックに書込むようにソース コネクタを構成します。
次のコマンドを使用して、 cdc-source.jsonという構成ファイルを作成します。
nano cdc-source.json 
以下の構成情報を ファイルに貼り付け、変更を保存します。
{   "name": "mongo-cdc-source",   "config": {     "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",     "connection.uri": "mongodb://mongo1",     "database": "CDCTutorial",     "collection": "Source"   } } 
作成した構成ファイルを使用してソース コネクタを起動するには、 CDCShell1で次のコマンドを実行します。
cx cdc-source.json 
注意
cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n" 
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status 
ソース コネクタが正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: source  |  mongo-cdc-source  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ... 
Sink Connector の設定
CDCShell1で、 CDCTutorial.Source Kafka トピックからCDCTutorial.Destination MongoDB 名前空間にデータをコピーするように Sink Connector を構成します。
次のコマンドを使用して、 cdc-sink.jsonという構成ファイルを作成します。
nano cdc-sink.json 
以下の構成情報を ファイルに貼り付け、変更を保存します。
{   "name": "mongo-cdc-sink",   "config": {     "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",     "topics": "CDCTutorial.Source",     "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",     "connection.uri": "mongodb://mongo1",     "database": "CDCTutorial",     "collection": "Destination"   } } 
作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx cdc-sink.json 
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status 
Sink Connector が正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: sink    |  mongo-cdc-sink    |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSinkConnector source  |  mongo-cdc-source  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ... 
Kafka トピックの監視
CDCShell1で、受信イベントの Kafka トピックを監視します。 次のコマンドを実行してkafkacatアプリケーションを起動し、トピックに公開されたデータを出力します。
kc CDCTutorial.Source 
注意
kcコマンドは、 Kafka に接続し、指定されたトピックの出力をフォーマットするためのオプションを持つkafkacatアプリケーションを呼び出す、チュートリアル開発環境に含まれるカスタム スクリプトです。
起動すると、次の出力が表示されます。これは、現在読み込むデータがないことを示しています。
% Reached end of topic CDCTutorial.Source [0] at offset 0 
ソースにデータを書き込み、データフローを監視する
CDCShell2で、次のコマンドを実行して MongoDB shell であるmongoshを使用して MongoDB に接続します。
mongosh "mongodb://mongo1" 
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test> 
プロンプトで次のコマンドを入力して、新しいドキュメントをCDCTutorial.Source MongoDB 名前空間に挿入します。
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" }); 
MongoDB が挿入コマンドを完了すると、次のテキストのような確認応答が返されます。
{   acknowledged: true,   insertedId: ObjectId("600b38ad...") } 
ソース コネクタは変更を取得し、 Kafka トピックに公開します。 CDCShell1ウィンドウに次のトピック メッセージが表示されます。
{   "schema": { "type": "string", "optional": false },   "payload": {     "_id": { "_data": "8260..." },     "operationType": "insert",     "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },     "wallTime": { "$date": "..." },     "fullDocument": {       "_id": { "$oid": "600b38ad..." },       "proclaim": "Hello World!"     },     "ns": { "db": "CDCTutorial", "coll": "Source" },     "documentKey": { "_id": { "$oid": "600b38a..." } }   } } 
Sink Connector は Kafka メッセージを選択し、データを MongoDB にシンクします。 CDCShell2で起動した MongoDB shell で次のコマンドを実行すると、MongoDB のCDCTutorial.Destination名前空間からドキュメントを検索できます。
db.Destination.find() 
次のドキュメントが結果として返されます。
[   {     _id: ObjectId("600b38a..."),     proclaim: 'Hello World'   } ] 
(任意)追加変更の生成
MongoDB shell から次のコマンドを実行して、 CDCTutorial.Source名前空間からドキュメントを削除してみてください。
db.Source.deleteMany({}) 
CDCShell1ウィンドウに次のトピック メッセージが表示されます。
{   "schema": { "type": "string", "optional": false },   "payload": {     "_id": { "_data": "8261...." },     ...     "operationType": "delete",     "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },     "ns": { "db": "CDCTutorial", "coll": "Source" },     "documentKey": { "_id": { "$oid": "6138..." } }   } } 
コレクション内の現在のドキュメント数を取得するには、次のコマンドを実行します。
db.Destination.count() 
これにより、コレクションが空であることを示す次の出力が返されます。
0 
次のコマンドを実行して、MongoDB shell を終了します。
exit 
概要
このチュートリアルでは、ソース コネクタを設定して、MongoDB コレクションに対する変更をキャプチャし、Apache Kafka に送信します。 また、MongoDB CDC ハンドラーで Sink Connector を構成し、Apache Kafka から MongoDB コレクションにデータを移動しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。