对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

接收连接器后处理程序

在此页面上,您可以了解如何在 MongoDB Kafka Sink 连接器中配置后处理器。后处理器修改连接器从 Kafka 主题读取的 Sink 记录,然后将其存储在 MongoDB 集合中。后处理器可以进行的数据修改的一些示例包括:

  • 将文档 _id 字段设置为自定义值

  • 包含或排除消息键或值字段

  • 重命名字段

您可以使用连接器中预构建的后处理器,也可以实现自己的后处理器。

有关后处理器的详情,请参阅以下部分:

后处理器会修改从 Kafka 主题读取的数据。此连接器会将消息存储在 SinkDocument 类中,其中包含 Kafka SinkRecord 键与值字段的表示形式。此连接器会按顺序应用配置中指定的所有后处理器,并将结果存储在 MongoDB 集合中。

后处理器执行数据修改任务,如生成文档 _id 字段、投影消息键或值字段以及重命名字段。您可以使用连接器中预构建的后处理器,也可以通过扩展 PostProcessor类来实现自己的后处理器。

重要

后处理器和变更数据捕获 (CDC) 处理程序

不能对 CDC 处理程序事件数据应用后处理器。如果同时指定这两者,连接器会记录警告。

您可以在 post.processor.chain 配置设置中以逗号分隔的列表形式指定一个或多个后处理器。如果您指定多个后处理器,连接器会按顺序应用后处理器,其中每个后处理器都会修改前一个后处理器输出的数据。

为确保连接器写入 MongoDB 的文档包含唯一的 _id 字段,如果您没有包含 DocumentIdAdder 后处理器,它会自动将 DocumentIdAdder 后处理器添加到链的第一个位置。

以下示例设置规定,连接器应首先运行 KafkaMetaAdder 后处理器,然后在输出上运行 AllowListValueProjector 后处理器。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

下表列出了接收器连接器所包含的所有后处理器的列表。

后处理器名称
说明

DocumentIdAdder

完整路径:

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder

插入由配置的策略确定的_id 字段。默认策略为
BsonOidStrategy

有关策略选项和配置的信息,请参阅配置文档 Id Adder 后处理器的部分。

BlockListKeyProjector

完整路径:

com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector


Sink记录中删除匹配的键字段。有关配置的更多信息,请参阅允许列表和阻止列表示例。

BlockListValueProjector

完整路径:

com.mongodb.kafka.connect.sink.processor.BlockListValueProjector


Sink记录中删除匹配的值字段。有关配置的更多信息,请参阅允许列表和阻止列表示例。

AllowListKeyProjector

完整路径:

com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector

仅包括
Sink记录中的匹配关键字段。有关配置的更多信息,请参阅允许列表和阻止列表示例。

AllowListValueProjector

完整路径:

com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``

仅包括
Sink记录中的匹配值字段。有关配置的更多信息,请参阅允许列表和阻止列表示例。

KafkaMetaAdder

完整路径:

com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

添加名为“topic-partition-offset”的字段,并将其值设置为 Kafka 主题、分区和偏移的串联。

RenameByMapping

完整路径:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping


重命名与键或值文档中指定字段名称完全匹配的字段。有关配置信息,请参阅“按映射重命名”示例。

RenameByRegex

完整路径:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex


重命名与键或值文档中的正则表达式匹配的字段。有关配置信息,请参阅按正则表达式重命名示例。

NullFieldValueRemover

完整路径:

com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``

从 Sink记录中删除所有包含 null 值的文档字段。

DocumentIdAdder 后处理器使用策略来确定应如何格式化 MongoDB 文档中的 _id 字段。策略定义了您可以根据使用案例进行自定义的预设行为。

如以下示例所示,您可以在 document.id.strategy 设置中为该后处理器指定策略:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

下表列出了配置 DocumentIdAdder 后处理器时可以使用的策略:

策略名称
说明

BsonOidStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy

生成MongoDB BSON ObjectId。
DocumentIdAdder帖子处理器的默认策略。

KafkaMetaDataStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy

生成由 Kafka 主题、分区和偏移量连接而成的字符串。

FullKeyStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy

使用接收器文档的完整键结构生成_id
字段的值。如果不存在密钥,则默认为空白文档。

ProvidedInKeyStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy

使用_id Sink文档的键结构中指定的 字段。如果接收器文档中缺少该字段,则会引发异常。

ProvidedInValueStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy

_id使用接收器文档的值结构中指定的 字段。如果接收器文档中缺少该字段,则会引发异常。

PartialKeyStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy

使用
Sink文档密钥结构的区块列表或允许列表投影。如果不存在密钥,则默认为空白文档。

PartialValueStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy


使用接收器文档值结构的区块列表或允许列表投影。如果不存在任何值,则默认为空白文档。

UuidProvidedInKeyStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy

_id 密钥字段转换为 UUID。该值必须是字符串或二进制类型,并且必须符合 UUID 格式

UuidProvidedInValueStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy

_id 值字段转换为 UUID。该值必须是字符串或二进制类型,并且必须符合 UUID 格式

UuidStrategy

完整路径:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``

使用以字符串格式随机生成的 UUID。

如果内置文档 ID 加法器策略未涵盖您的使用案例,则可按照以下步骤定义自定义文档 ID 策略:

  1. 创建一个实现接口 IdStrategy 并包含自定义配置逻辑的 Java 类。

  2. 将此类编译为 JAR 文件。

  3. 将已编译的 JAR 添加到所有Kafka Worker 的类路径/插件路径中。有关插件路径的更多信息,请参阅 Confluence 文档。

  4. document.id.strategy 设置更新为所有 Kafka Worker 中自定义类的完整类名。

注意

所选策略可能会对传递语义产生影响

BSON ObjectId 或 UUID 策略只能保证至少传递一次,因为连接器会在重试或再次处理记录时生成新的 ID。如果您能保证构成文档 ID 的字段是唯一的,则其他策略也允许一次性传递。

有关 IdStrategy 接口的示例实现,请参阅包含用此连接器打包的 ID 策略实现的源代码目录。

本部分展示以下类型的后处理器的配置和输出示例:

支持列表阻止列表投影器后处理器决定输出中包含和排除哪些字段。

使用支持列表投影器时,后处理器仅输出您指定字段的数据。

使用阻止列表投影器时,后处理器仅删除您指定字段的数据。

注意

您可以使用“.”(点)符号来引用记录中的嵌套字段。您还可以使用该符号来引用数组中文档的字段。

在后处理器链中添加投影器时,您必须指定投影器类型,以及是否将其应用于接收器文档的键或值部分。

有关投影器配置和输出的示例,请参阅以下部分。

假设 Kafka 记录值文档类似于以下用户配置文件数据:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

您可以配置 AllowList 值投影器,使用以下设置存储值文档的“name”、“address.city”和“hobbies”字段等选定数据:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

后处理器应用投影后,将输出以下记录:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

假设您的 Kafka 记录关键文档类似于以下用户识别数据:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

您可以配置 BlockList 键投影器以省略“authToken”和“registration.source”字段,然后再使用以下设置存储数据:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

后处理器应用投影后,将输出以下记录:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

本部分介绍如何配置投影器后处理器以匹配通配符模式,从而匹配字段名称。

模式

说明

*

匹配处于当前级别的任意数量的字符。

**

匹配当前级别和所有嵌套级别中的任何字符。

关于本节中的允许列表和阻止列表通配符模式匹配示例,请参阅以下包含天气测量值的文档:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

您可以使用 * 通配符来匹配多个字段名称。以下示例配置将匹配如下字段:

  • 名为 "city" 的顶级字段

  • 名为“average”的字段,即任何以名称“wind_speed”开头的顶级字段的子文档。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

后处理器应用支持列表投影后,将输出以下记录:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

您可以使用 ** 通配符,匹配从您指定通配符开始的任何级别的对象。以下通配符匹配示例将投影任何包含名为“low”的字段的文档。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

应用投影的后处理器输出以下记录:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

您可以使用通配符模式来匹配特定文档级别的字段,如以下区块列表配置示例所示:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

本部分介绍如何配置 RenameByMappingRenameByRegex 字段重命名器后处理器以更新接收器记录中的字段名称。字段重命名设置具体如下:

  • 是否更新记录中的键或值文档

  • 要更新的字段名称

  • 新字段名

您必须在 JSON 数组中指定 RenameByMappingRenameByRegex 设置。您可以使用点符号或模式匹配来指定嵌套字段。

字段重命名后处理器示例使用以下示例 Sink 记录:

关键文档

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

值文档

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

RenameByMapping 帖子处理器设置指定一个或多个 JSON 对象,这些对象将与字符串匹配的字段分配给一个新名称。每个对象都包含 oldName 元素中要匹配的文本以及 newName 元素中的替换文本,如下表所述。

密钥名称
说明

oldName

指定是否匹配键或值文档中的字段以及要替换的字段名称。该设置使用“.”字符来分隔两个值。

newName

为所有字段匹配项指定替换字段名。

以下示例属性与关键文档的“位置”字段相匹配,并将其重命名为“国家/地区”:

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

此设置指示 RenameByMapping 后处理器将原始密钥文档转换为以下文档:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

您可以通过在 oldName 字段中指定具有附加字段名称的值文档,对值文档执行类似的字段名称分配,如下所示:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

此设置指示 RenameByMapping 后处理器将原始值文档转换为以下文档:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

您还可以使用字符串格式的 JSON 数组在 field.renamer.mapping 属性中指定一个或多个映射,如以下设置所示:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

RenameByRegex 后处理器设置指定其应匹配的字段名和文本模式,以及匹配文本的替换值。您可以在包含下表所述字段的 JSON 对象中指定一个或多个重命名表达式:

密钥名称
说明

regexp

包含匹配字段的正则表达式,用于执行替换。

模式

包含与要替换的文本匹配的正则表达式。

替换

包含在 pattern 字段所定义正则表达式的所有匹配项的替换文本。

以下示例设置指示后处理器执行以下操作:

  • 匹配密钥文档中任何以“date”开头的字段名。在匹配字段集中,用 - 字符替换所有与模式 _ 匹配的文本。

  • 匹配值文档中任何属于 crepes 的子文档的字段名。在匹配字段集中,用 quantity 替换所有与模式 purchased 匹配的文本。

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

当连接器将后处理器应用于示例密钥文档示例值文档时,它会输出以下内容:

关键文档

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

值文档

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

警告

重命名后处理器不会覆盖现有字段名称

您在重命名后处理器中设置的目标字段名可能会导致同一文档出现重复的字段名。为避免这种情况,后处理器在文档同一级别复制现有字段名时会跳过重命名。

如果内置后处理器不适用于使用案例,则可以使用以下步骤创建自定义后处理器类:

  1. 创建一个扩展 PostProcessor 抽象类的 Java 类。

  2. 覆盖类中的 process() 方法。您可以更新 SinkDocument,即 Sink 记录键和值字段的 BSON 表示,并在方法中访问原始的 Kafka SinkRecord

  3. 将此类编译为 JAR 文件。

  4. 将已编译的 JAR 添加到所有Kafka Worker 的类路径/插件路径中。有关插件路径的更多信息,请参阅有关手动安装社区连接器的 Confluence 文档。

  5. 将后处理器完整类名添加到后处理器链配置中。

例如后处理器,您可以浏览内置后处理器类的源代码。