AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

スキーマを指定する

この使用例では、MongoDB Kafka ソース コネクタを構成して、カスタムスキーマをデータに適用する方法を示します。 スキーマは、Apache Kafka トピック内のデータに関する構造とタイプ情報を指定する定義です。 ソース コネクタによって設定されるトピックのデータが一貫した構造になっているようにする必要がある場合は、スキーマを使用します。

コネクタでスキーマを使用する方法の詳細については、「スキーマの適用」ガイドを参照してください。

アプリケーションが MongoDB コレクション内のカスタマー データを追跡し、このデータを Kafka トピックに公開するとします。 カスタマー データのサブスクライブに、一貫した形式のデータを受信したい。 データに スキーマ を適用することを選択します。

要件とソリューションは次のとおりです。

要件
解決法

MongoDB コレクションからカスタマー データを受信する

特定のデータベースとコレクションからデータの更新を受信するようにMongoDBソースコネクタを構成します。 「 コレクションからデータを受信する
」を参照してください。

カスタマー データ スキーマを提供する

カスタマーデータの構造とデータ型に対応するスキーマを指定します。
カスタム スキーマの作成 」を参照してください。

カスタマー データから Kafka メタデータを省略

フィールドのデータのみを含めます。fullDocument 「 公開レコードからメタデータを省略する
」を参照してください。

上記の要件を満たす完全な構成ファイルについては、「構成の指定 」を参照してください。

MongoDB コレクションからデータを受信するようにソース コネクタを構成するには、データベース名とコレクション名を指定します。 この例では、次のように、 customersデータベース内のpurchasesコレクションから読み取るようにコネクタを構成できます。

database=customers
collection=purchases

コレクションのサンプル カスタマー データ ドキュメントには、次の情報が含まれています。

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

サンプル ドキュメントから、スキーマは次のデータ型を使用してフィールドを表示する必要があると判断します。

フィールド名
データ型
説明

name

顧客名

訪問

カスタマーが訪問した日付

product_price

string(想定された型)を整数値にマップ

顧客が購入した商品の名前と数量

以下のサンプル スキーマに示すように、Apache Avro スキーマ形式を使用してデータを記述できます。

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

重要

変換子

Atlas のバイナリ エンコーディングで Apache Kafka を介してデータを送信する場合は、Avro 変換を使用する必要があります。 詳細については、変換に関するガイド を参照してください。

connectorは、カスタマー データ ドキュメントと、そのドキュメントを記述するメタデータを Kafka トピックに公開します。 次の設定を使用して、レコードのfullDocumentフィールドに含まれるドキュメント データのみを含めるように connector を設定できます。

publish.full.document.only=true

fullDocumentフィールドの詳細については、 Change Streamsのガイドを参照してください。

カスタム スキーマ コネクターの構成は次のようになります。

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

注意

埋め込みスキーマ

上記の構成では、 Kafka Connect JSON schema 変換 は カスタム スキーマを メッセージに埋め込みます。 JSON schema 変換の詳細については、変換のガイドを参照してください。

スキーマの指定の詳細については、「スキーマの適用」ガイドを参照してください。