对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

指定模式

此用法示例演示了如何配置MongoDB Kafka源Connector以应用自定义模式应用于数据。 模式是指定Apache Kafka主题中数据的结构和类型信息的定义。 当您必须确保源Connector填充的主题数据具有一致的结构时,请使用模式。

要了解有关使用连接器模式的更多信息,请参阅应用模式指南。

假设您的应用程序追踪MongoDB集合中的客户数据,并且您想将此数据发布到Kafka主题。 您希望客户数据的订阅者收到格式一致的数据。 您可以选择对数据应用模式。

要求和解决方案如下:

要求
解决方案

从 MongoDB 集合接收客户数据

配置 MongoDB 源连接器,接收来自特定数据库和集合的数据更新。
请参阅从集合接收数据

提供客户数据模式


指定与客户数据的结构和数据类型相对应的模式。请参阅创建自定义模式。

从客户数据中省略 Kafka 元数据

仅包含 fullDocument 字段中的数据。
请参阅从已发布记录中省略元数据

有关符合上述要求的完整配置文件,请参阅指定配置

要将 Source 连接器配置为从 MongoDB 集合接收数据,请指定数据库和集合名称。对于本示例,您可以将连接器配置为从 customers 数据库中的 purchases 集合读取,如下所示:

database=customers
collection=purchases

集合中的示例客户数据文档包含以下信息:

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

在样本文档中,您决定模式应使用以下数据类型显示字段:

字段名称
数据类型
说明

名称

客户姓名

visits

客户访问日期

goods_purchased

字符串(假定类型)到整数值的映射

客户购买的商品名称和每件商品的数量

您可以使用 Apache Avro 模式格式描述您的数据,如以下示例模式所示:

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

重要

转换器

如果您想使用 Avro 二进制编码通过 Apache Kafka 发送数据,则必须使用 Avro 转换器。有关更多信息,请参阅转换器指南。

连接器将客户数据文档和描述该文档的元数据发布到 Kafka 主题。您可以使用以下设置,将连接器设置为仅纳入该记录的 fullDocument 字段中包含的文档数据:

publish.full.document.only=true

有关 fullDocument 字段的更多信息,请参阅变更流指南。

自定义模式连接器配置应如下所示:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

注意

嵌入式模式

在上述配置中,Kafka Connect JSON 模式转换器将自定义模式嵌入消息中。要了解有关 JSON 模式转换器的更多信息,请参阅转换器指南。

有关指定模式的更多信息,请参阅应用模式指南。