このチュートリアルに従って、MongoDB Kafka Sink Connector を構成して Apache Kafka トピックからデータを読み取り、MongoDB コレクションに書込む方法を学びます。
MongoDB Kafka Sink Connector を使い始める
チュートリアル設定を完了する
Kafka コネクタチュートリアル セットの手順を完了して、Confluent Kafka Connect とMongoDB環境を起動します。
Sink Connector の設定
次のコマンドを使用して、チュートリアル Docker コンテナに対話型の shell セッションを作成します。
docker exec -it mongo1 /bin/bash
次のコマンドを使用して、 simplesink.jsonというソース構成ファイルを作成します。
nano simplesink.json
以下の構成情報を ファイルに貼り付け、変更を保存します。
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
注意
構成プロパティで強調表示された行は、 Kafka からのデータを変換する方法をコネクタに指示する変換コマンド を指定します。
作成した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx simplesink.json
注意
cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status
Sink Connector が正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
Kafka トピックへのデータの書込み (write)
同じ shell で、 Kafka トピックにデータを書込む Python スクリプトを作成します。
nano kafkawrite.py
次のコードを ファイルに貼り付け、変更を保存します。
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
Python スクリプトを実行します。
python3 kafkawrite.py
MongoDB コレクションのデータの表示
同じ shell で、次のコマンドを実行して、MongoDB shell であるmongoshを使用して MongoDB に接続します。
mongosh "mongodb://mongo1"
正常に接続すると、次の MongoDB shell プロンプトが表示されます。
rs0 [direct: primary] test>
プロンプトで次のコマンドを入力して、 Tutorial2.pets MongoDB 名前空間内のすべてのドキュメントを取得します。
use Tutorial2 db.pets.find()
次のドキュメントが結果として返されます。
{ _id: ObjectId("62659..."), name: 'roscoe' }
コマンドexitを入力して MongoDB shell を終了します。
(任意)Docker コンテナを停止します
このチュートリアルを完了したら、Docker アセットを停止または削除して、コンピューター上のリソースを解放します。 Docker コンテナとイメージの両方を削除することも、 コンテナのみを削除することも選択できます。 コンテナとイメージを削除した場合、サイズが約 2.4 GB の MongoDB Kafka Connector 開発環境を再起動するには、それらを再度ダウンロードする必要があります。 コンテナのみを削除すると、イメージを再利用し、サンプル データ パイプライン内の大きなファイルのほとんどをダウンロードしないでください。
Tip
その他のチュートリアル
MongoDB Kafka Connector の追加チュートリアルを完了する予定の場合は、コンテナのみを削除することを検討してください。 MongoDB Kafka Connector の追加チュートリアルを完了する予定がない場合は、コンテナとイメージの削除を検討してください。
実行する削除タスクに対応するタブを選択します。
次の shell コマンドを実行して、開発環境の Docker コンテナとイメージを削除します。
docker-compose -p mongo-kafka down --rmi all
次の shell コマンドを実行して、Docker コンテナを削除しますが、開発環境用のイメージは保持します。
docker-compose -p mongo-kafka down
コンテナを再起動するには、チュートリアル設定 でコンテナを起動するのに必要な手順と同じ手順に従います。
概要
このチュートリアルでは、 Kafka トピックから MongoDB クラスターのコレクションにデータを保存するように Sink Connector を構成しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。