このチュートリアルに従って、MongoDB Kafka Sink Connector を構成して Apache Kafka トピックからデータを読み取り、MongoDB コレクションに書込む方法を学びます。
MongoDB Kafka Sink Connector を使い始める
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
Sink Connector の設定
次のコマンドを使用して、チュートリアル Docker コンテナに対話型の shell セッションを作成します。
docker exec -it mongo1 /bin/bash 
次のコマンドを使用して、  simplesink.jsonというソース構成ファイルを作成します。
nano simplesink.json 
以下の構成情報を ファイルに貼り付け、変更を保存します。
{   "name": "mongo-tutorial-sink",   "config": {     "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",     "topics": "Tutorial2.pets",     "connection.uri": "mongodb://mongo1",     "key.converter": "org.apache.kafka.connect.storage.StringConverter",     "value.converter": "org.apache.kafka.connect.json.JsonConverter",     "value.converter.schemas.enable": false,     "database": "Tutorial2",     "collection": "pets"   } } 
注意
構成プロパティで強調表示された行は、 Kafka からのデータを変換する方法をコネクタに指示する変換コマンド を指定します。
作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx simplesink.json 
注意
cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n" 
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status 
Sink Connector が正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: sink  |  mongo-tutorial-sink  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ... 
Kafka トピックへのデータの書込み (write)
同じ shell で、 Kafka トピックにデータを書込む Python スクリプトを作成します。
nano kafkawrite.py 
次のコードを ファイルに貼り付け、変更を保存します。
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush() 
Python スクリプトを実行します。
python3 kafkawrite.py 
MongoDB コレクションのデータの表示
同じ shell で、次のコマンドを実行して、MongoDB shell であるmongoshを使用して MongoDB に接続します。
mongosh "mongodb://mongo1" 
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test> 
プロンプトで次のコマンドを入力して、 Tutorial2.pets MongoDB 名前空間内のすべてのドキュメントを取得します。
use Tutorial2 db.pets.find() 
次のドキュメントが結果として返されます。
{ _id: ObjectId("62659..."), name: 'roscoe' } 
コマンドexitを入力して MongoDB shell を終了します。
(任意)Docker コンテナを停止します
このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
Tip
その他のチュートリアル
MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。
docker-compose -p mongo-kafka down --rmi all 
次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。
docker-compose -p mongo-kafka down 
コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。
概要
このチュートリアルでは、 Kafka トピックから MongoDB クラスターのコレクションにデータを保存するように Sink Connector を構成しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。