このチュートリアルに従って、MongoDB Kafka Connector を使用して既存の MongoDB コレクションを時系列コレクションに変換する方法を学びます。
時系列コレクションは時系列データを効率的に保存します。 時系列データは、時間間隔で取得された測定値、測定を説明するメタデータ、測定時間で構成されています。
connector を使用して MongoDB コレクションから時系列コレクションにデータを変換するには、次のタスクを実行する必要があります。
- コレクション内のすべてのドキュメントに共通の時間フィールドを識別します。 
- 既存のコレクション データを Kafka トピックにコピーするようにソース コネクタを構成します。 
- Kafka トピック データを時系列コレクションにコピーするように Sink Connector を構成します。 
このチュートリアルでは、前述のタスクを実行して、株価データを コレクションから時系列コレクションに移行します。 時系列コレクションはデータをより効率的に保存してインデックスを作成し、 集計演算子 を使用して経時的な株価パフォーマンスを分析する機能を保持します。
時系列コレクションへのコレクションの移行
チュートリアル設定を完了する
Kafka Connector チュートリアル セットの手順を完了して、Confluent Kafka Connect と MongoDB 環境を起動します。
サンプル データの生成
Docker 環境で次のコマンドを実行してスクリプトを起動し、チュートリアルの MongoDB レプリカセットに作成された株式記号とその価格を含むサンプル コレクションを生成します。
docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData" 
データ ジェネレーターの実行を開始すると、次のような生成されたデータが表示されます。
... 1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15 2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15 3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15 ... 
Source Connector の構成
別のターミナル ウィンドウで、次のコマンドを使用して、チュートリアル セット用にダウンロードしたチュートリアル Docker コンテナにインタラクティブ shell セッションを作成します。
docker exec -it mongo1 /bin/bash 
次のコマンドを使用して、  stock-source.jsonというソース構成ファイルを作成します。
nano stock-source.json 
以下の構成情報を ファイルに貼り付け、変更を保存します。
{   "name": "mongo-source-marketdata",   "config": {     "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",     "key.converter": "org.apache.kafka.connect.storage.StringConverter",     "value.converter": "org.apache.kafka.connect.json.JsonConverter",     "publish.full.document.only": "true",     "connection.uri": "mongodb://mongo1",     "topic.prefix": "marketdata",     "database": "Stocks",     "collection": "PriceData",     "copy.existing": "true"   } } 
この構成では、コネクターは、 PriceData MongoDB コレクションからmarketdata.Stocks.PriceData Kafka トピックに既存のデータをコピーし、完了すると、そのコレクションに挿入される将来のデータをコピーするように指示します。
作成した構成ファイルを使用してソース コネクタを起動するには、shell で次のコマンドを実行します。
cx stock-source.json 
注意
cxコマンドは、チュートリアル開発環境に含まれるカスタム スクリプトです。 このスクリプトは、 Kafka Connect REST API に対する次の同等のリクエストを実行して、新しいコネクターを作成します。
curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n" 
コネクタのステータスを確認するには、shell で次のコマンドを実行します。
status 
ソース コネクタが正常に起動すると、次の出力が表示されます。
Kafka topics: ... The status of the connectors: source  |  mongo-source-marketdata  |  RUNNING  |  RUNNING  |  com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-source-marketdata" ] ... 
ソース コネクタが起動したら、次のコマンドを実行して、 Kafka トピックがコレクション データを受信したことを確認します。
kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData 
出力には、次のようなソース コネクタによって公開されるトピック データが表示されます。
{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"} 
 CTRL+Cと入力して、 kafkacatを終了できます。
Sink Connector の設定
Kafka トピックからデータを読み取り、 Stocksという名前のデータベース内のStockDataMigrateという名前の時系列コレクションに書き込むように Sink Connector を構成します。
次のコマンドを使用して、 stock-sink.jsonという Sink 構成ファイルを作成します。
nano stock-sink.json 
以下の構成情報を ファイルに貼り付け、変更を保存します。
{   "name": "mongo-sink-marketdata",   "config": {     "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",     "topics": "marketdata.Stocks.PriceData",     "connection.uri": "mongodb://mongo1",     "database": "Stocks",     "collection": "StockDataMigrate",     "key.converter": "org.apache.kafka.connect.storage.StringConverter",     "value.converter": "org.apache.kafka.connect.json.JsonConverter",     "timeseries.timefield": "tx_time",     "timeseries.timefield.auto.convert": "true",     "timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"   } } 
Tip
上記の Sink Connector 構成では、 時間フィールド日付形式変換 が使用されています。 あるいは、 TimestampConverter単一メッセージ変換(SM)を使用して、 tx_timeフィールドをStringからISODateに変換することもできます。 TimestampConverter SNT を使用する場合、 Kafka トピックのデータのスキーマを定義する必要があります。
TimestampConverter SMT の使用方法について詳しくは、TimestampConverter Confluent のドキュメントを参照してください。
更新した構成ファイルを使用して Sink Connector を起動するには、shell で次のコマンドを実行します。
cx stock-sink.json 
Sink Connector がトピック データの処理を完了すると、 StockDataMigrate時系列コレクション内のドキュメントには、 ISODate型の値を持つtx_timeフィールドが含まれます。
時系列コレクション データの検証
Sink Connector がトピック データの処理を完了すると、 StockDataMigrate時系列コレクションには、 PriceDataコレクションのすべてのマーケット データが含まれます。
MongoDB でデータを表示するには、次のコマンドを実行して、 mongoshを使用してレプリカセットに接続します。
mongosh "mongodb://mongo1" 
プロンプトで次のコマンドを入力して、 Stocks.StockDataMigrate MongoDB 名前空間内のすべてのドキュメントを取得します。
use Stocks db.StockDataMigrate.find() 
次のドキュメントのような、 コマンドから返されたドキュメントのリストが表示されます。
{     tx_time: ISODate("2022-05-25T21:16:35.983Z"),     _id: ObjectId("628e9..."),     symbol: 'FAV',     price: 18.43,     company_name: 'FUZZY ATTACK VENTURES' } 
概要
このチュートリアルでは、MongoDB コレクションにデータを定期的に書込む株式ティッカー データ ジェネレーターを作成しました。 Kafka トピックにデータをコピーするようにソース コネクタを構成し、そのデータを新しい MongoDB 時系列コレクションに書込むように Sink Connector を構成しました。
詳細
このチュートリアルで述べられた概念の詳細については、次のリソースをお読みください。