Overview
在此页面上,您可以了解如何在 MongoDB Kafka Sink 连接器中配置后处理器。后处理器修改连接器从 Kafka 主题读取的 Sink 记录,然后将其存储在 MongoDB 集合中。后处理器可以进行的数据修改的一些示例包括:
- 将文档 - _id字段设置为自定义值
- 包含或排除消息键或值字段 
- 重命名字段 
您可以使用连接器中预构建的后处理器,也可以实现自己的后处理器。
有关后处理器的详情,请参阅以下部分:
后处理器如何修改数据
后处理器会修改从 Kafka 主题读取的数据。此连接器会将消息存储在 SinkDocument 类中,其中包含 Kafka SinkRecord 键与值字段的表示形式。此连接器会按顺序应用配置中指定的所有后处理器,并将结果存储在 MongoDB 集合中。
后处理器执行数据修改任务,例如生成文档_id字段、投影消息键或值字段以及重命名字段。您可以使用Connector中包含的预构建帖子处理器,也可以通过扩展 PostProcessor 类来实现自己的后处理器。
如何指定后处理器
您可以在 post.processor.chain 配置设置中以逗号分隔的列表形式指定一个或多个后处理器。如果您指定多个后处理器,连接器会按顺序应用后处理器,其中每个后处理器都会修改前一个后处理器输出的数据。
为确保连接器写入 MongoDB 的文档包含唯一的 _id 字段,如果您没有包含 DocumentIdAdder 后处理器,它会自动将 DocumentIdAdder 后处理器添加到链的第一个位置。
以下示例设置规定,连接器应首先运行 KafkaMetaAdder 后处理器,然后在输出上运行 AllowListValueProjector 后处理器。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector 
预建后处理器
下表列出了接收器连接器所包含的所有后处理器的列表。
| 后处理器名称 | 说明 | |
|---|---|---|
| DocumentIdAdder | Full Path: Inserts an  _idfield determined by the configured strategy.The default strategy is  BsonOidStrategy.For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
| BlockListKeyProjector | Full Path: Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| BlockListValueProjector | Full Path: Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| AllowListKeyProjector | Full Path: Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| AllowListValueProjector | Full Path: Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| KafkaMetaAdder | Full Path: Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
| RenameByMapping | Full Path: Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
| RenameByRegex | Full Path: Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. | 
配置文档 ID 加法器后处理器
DocumentIdAdder 后处理器使用策略来确定应如何格式化 MongoDB 文档中的 _id 字段。策略定义了您可以根据使用案例进行自定义的预设行为。
如以下示例所示,您可以在 document.id.strategy 设置中为该后处理器指定策略:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy 
下表列出了配置 DocumentIdAdder 后处理器时可以使用的策略:
| 策略名称 | 说明 | |
|---|---|---|
| BsonOidStrategy | Full Path: Generates a MongoDB BSON ObjectId. Default strategy for the  DocumentIdAdderpost processor. | |
| KafkaMetaDataStrategy | Full Path: Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
| FullKeyStrategy | Full Path: Uses the complete key structure of the sink document to generate the
value for the  _idfield.Defaults to a blank document if no key exists. | |
| ProvidedInKeyStrategy | Full Path: Uses the  _idfield specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
| ProvidedInValueStrategy | Full Path: Uses the  _idfield specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
| PartialKeyStrategy | Full Path: Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
| PartialValueStrategy | Full Path: Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
| UuidProvidedInKeyStrategy | Full Path: Converts the  _idkey field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
| UuidProvidedInValueStrategy | Full Path: Converts the  _idvalue field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
| UuidStrategy | Full Path: Uses a randomly generated UUID in string format. | 
创建自定义文档 ID 策略
如果内置文档 ID 加法器策略未涵盖您的使用案例,则可按照以下步骤定义自定义文档 ID 策略:
- 创建一个实现接口 IdStrategy 并包含自定义配置逻辑的 Java 类。 
- 将此类编译为 JAR 文件。 
- 将已编译的 JAR 添加到所有Kafka Worker 的类路径/插件路径中。有关插件路径的更多信息,请参阅 Confluence 文档。 
- 将 - document.id.strategy设置更新为所有 Kafka Worker 中自定义类的完整类名。
注意
所选策略可能会对传递语义产生影响
BSON ObjectId 或 UUID 策略只能保证至少传递一次,因为连接器会在重试或再次处理记录时生成新的 ID。如果您能保证构成文档 ID 的字段是唯一的,则其他策略也允许一次性传递。
有关 IdStrategy 接口的示例实现,请参阅包含用此连接器打包的 ID 策略实现的源代码目录。
后处理器示例
本部分展示以下类型的后处理器的配置和输出示例:
允许列表和阻止列表示例
支持列表和阻止列表投影器后处理器决定输出中包含和排除哪些字段。
使用支持列表投影器时,后处理器仅输出您指定字段的数据。
使用阻止列表投影器时,后处理器仅删除您指定字段的数据。
注意
您可以使用“.”(点)符号来引用记录中的嵌套字段。您还可以使用该符号来引用数组中文档的字段。
在后处理器链中添加投影器时,您必须指定投影器类型,以及是否将其应用于接收器文档的键或值部分。
有关投影器配置和输出的示例,请参阅以下部分。
允许列表投影器示例
假设 Kafka 记录值文档类似于以下用户配置文件数据:
{   "name": "Sally Kimball",   "age": 10,   "address": {     "city": "Idaville",     "country": "USA"   },   "hobbies": [     "reading",     "solving crime"   ] } 
您可以配置 AllowList 值投影器,使用以下设置存储值文档的“name”、“address.city”和“hobbies”字段等选定数据:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies 
后处理器应用投影后,将输出以下记录:
{   "name": "Sally Kimball",   "address": {     "city": "Idaville"   },   "hobbies": [     "reading",     "solving crime"   ] } 
区块列表投影器示例
假设您的 Kafka 记录关键文档类似于以下用户识别数据:
{   "username": "user5983",   "registration": {     "date": "2021-09-13",     "source": "mobile"   },   "authToken": {     "alg": "HS256",     "type": "JWT",     "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"   } } 
您可以配置 BlockList 键投影器以省略“authToken”和“registration.source”字段,然后再使用以下设置存储数据:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source 
后处理器应用投影后,将输出以下记录:
{   "username": "user5983",   "registration": {     "date": "2021-09-13",   } } 
投影通配符模式匹配示例
本部分介绍如何配置投影器后处理器以匹配通配符模式,从而匹配字段名称。
| 模式 | 说明 | 
| 
 | 匹配处于当前级别的任意数量的字符。 | 
| 
 | 匹配当前级别和所有嵌套级别中的任何字符。 | 
关于本节中的允许列表和阻止列表通配符模式匹配示例,请参阅以下包含天气测量值的文档:
{   "city": "Springfield",   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "wind_speed_10m": {     "average": 3,     "units": "km/h"   },   "wind_speed_80m": {     "average": 8,     "units": "km/h"   },   "soil_conditions": {     "temperature": {       "high": 22,       "low": 17,       "units": "C"     },     "moisture": {       "average": 340,       "units": "mm"     }   } } 
允许列表通配符示例
您可以使用 * 通配符来匹配多个字段名称。以下示例配置将匹配如下字段:
- 名为 "city" 的顶级字段 
- 名为“average”的字段,即任何以名称“wind_speed”开头的顶级字段的子文档。 
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average 
后处理器应用支持列表投影后,将输出以下记录:
{   "city": "Springfield",   "wind_speed_10m": {     "average": 3,   },   "wind_speed_80m": {     "average": 8,   } } 
您可以使用 ** 通配符,匹配从您指定通配符开始的任何级别的对象。以下通配符匹配示例将投影任何包含名为“low”的字段的文档。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low 
应用投影的后处理器输出以下记录:
{   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "soil_conditions": {     "temperature": {       "high": 22,       "low": 17,       "units": "C"     }   } } 
阻止列表通配符示例
您可以使用通配符模式来匹配特定文档级别的字段,如以下区块列表配置示例所示:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature 
{   "city": "Springfield",   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "wind_speed_10m": {     "average": 3,     "units": "km/h"   },   "wind_speed_80m": {     "average": 8,     "units": "km/h"   },   "soil_conditions": {     "moisture": {       "average": 340,       "units": "mm"     }   } } 
字段重命名示例
本部分介绍如何配置 RenameByMapping 和 RenameByRegex 字段重命名器后处理器以更新接收器记录中的字段名称。字段重命名设置具体如下:
- 是否更新记录中的键或值文档 
- 要更新的字段名称 
- 新字段名 
您必须在 JSON 数组中指定 RenameByMapping 和 RenameByRegex 设置。您可以使用点符号或模式匹配来指定嵌套字段。
字段重命名后处理器示例使用以下示例 Sink 记录:
关键文档
{   "location": "Provence",   "date_month": "October",   "date_day": 17 } 
值文档
{   "flapjacks": {     "purchased": 598,     "size": "large"   } } 
按映射重命名示例
RenameByMapping 帖子处理器设置指定一个或多个 JSON 对象,这些对象将与字符串匹配的字段分配给一个新名称。每个对象都包含 oldName 元素中要匹配的文本以及 newName 元素中的替换文本,如下表所述。
| 密钥名称 | 说明 | 
|---|---|
| oldName | 指定是否匹配键或值文档中的字段以及要替换的字段名称。该设置使用“.”字符来分隔两个值。 | 
| newName | 为所有字段匹配项指定替换字段名。 | 
以下示例属性与关键文档的“位置”字段相匹配,并将其重命名为“国家/地区”:
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}] 
此设置指示 RenameByMapping 后处理器将原始密钥文档转换为以下文档:
{   "country": "Provence",   "date_month": "October",   "date_day": 17 } 
您可以通过在 oldName 字段中指定具有附加字段名称的值文档,对值文档执行类似的字段名称分配,如下所示:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}] 
此设置指示 RenameByMapping 后处理器将原始值文档转换为以下文档:
{   "crepes": {     "purchased": 598,     "size": "large"   } } 
您还可以使用字符串格式的 JSON 数组在 field.renamer.mapping 属性中指定一个或多个映射,如以下设置所示:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }] 
按正则表达式重命名
RenameByRegex 后处理器设置指定其应匹配的字段名和文本模式,以及匹配文本的替换值。您可以在包含下表所述字段的 JSON 对象中指定一个或多个重命名表达式:
| 密钥名称 | 说明 | 
|---|---|
| regexp | 包含匹配字段的正则表达式,用于执行替换。 | 
| 模式 | 包含与要替换的文本匹配的正则表达式。 | 
| 替换 | 包含在  | 
以下示例设置指示后处理器执行以下操作:
- 匹配密钥文档中任何以“date”开头的字段名。在匹配字段集中,用 - -字符替换所有与模式- _匹配的文本。
- 匹配值文档中任何属于 - crepes的子文档的字段名。在匹配字段集中,用- quantity替换所有与模式- purchased匹配的文本。
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}] 
当连接器将后处理器应用于示例密钥文档和示例值文档时,它会输出以下内容:
关键文档
{   "location": "Provence",   "date-month": "October",   "date-day": 17 } 
值文档
{   "crepes": {     "quantity": 598,     "size": "large"   } } 
警告
重命名后处理器不会覆盖现有字段名称
您在重命名后处理器中设置的目标字段名可能会导致同一文档出现重复的字段名。为避免这种情况,后处理器在文档同一级别复制现有字段名时会跳过重命名。
如何创建自定义帖子处理器
如果内置后处理器不适用于使用案例,则可以使用以下步骤创建自定义后处理器类:
- 创建一个扩展 PostProcessor 抽象类的 Java 类。 
- 覆盖类中的 - process()方法。您可以更新- SinkDocument,即 Sink 记录键和值字段的 BSON 表示,并在方法中访问原始的 Kafka- SinkRecord。
- 将此类编译为 JAR 文件。 
- 将已编译的 JAR 添加到所有Kafka Worker 的类路径/插件路径中。有关插件路径的更多信息,请参阅有关手动安装社区连接器的 Confluence 文档。 
- 将后处理器完整类名添加到后处理器链配置中。 
例如后处理器,您可以浏览内置后处理器类的源代码。