对于AI助手:文档索引位于 https://www.mongodb.com/zh-cn/docs/llms.txt — 通过将 .md 附加到任何URL路径,可以获得所有页面的降价版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

错误处理和从中断属性恢复

使用以下配置设置指定 MongoDB Kafka Source 连接器在遇到错误时的行为方式,并指定与恢复中断读取相关的设置。

名称
说明

mongo.errors.tolerance

类型:字符串


描述:Connector出现错误时是否继续处理消息。如果您希望Connector在遇到错误时停止处理消息并报告问题,请将此项设置为

"none"

"all"。如果您希望Connector继续处理消息并忽略遇到的任何错误,请将此项设置为 。重要提示:该属性会覆盖

Connect Framework

的errors.tolerance属性。默认值:"none"
接受的值:"none""all"

mongo.errors.log.enable

类型:布尔值


描述:Connector是否应在日志文件中报告错误。将其设置为 以日志Connector遇到的所有错误。将其设置为

true

false以日志Connector不允许的错误。您可以使用errors.tolerancemongo.errors.tolerance

设置指定Connector应允许哪些错误。重要提示:此属性会覆盖错误。日志。启用Connect Framework属性。默认值:

false
接受的值:truefalse

mongo.errors.deadletterqueue.topic.name

类型:字符串


描述:用作死信队列(DLQ)的主题名称。如果指定一个值,Connector会将无效消息作为扩展JSON字符串写入死信队列(DLQ)主题。如果将此设置留空,Connector不会将无效消息写入任何主题。



重要提示:您必须设立 errors.tolerancemongo.errors.tolerance 设置设为"all" 才能启用此属性。默认值:

""
接受的值:有效的Kafka主题名称

offset.partition.name

类型:字符串


描述:要使用的自定义偏移分区名称。当现有偏移包含无效的恢复令牌时,您可以使用此选项指示Connector启动新的变更流。如果将此设置留空,Connector将根据连接详细信息使用默认分区名称。要查看偏移分区的命名策略,请参阅重置已存储的偏移。默认值:





""
接受的值:字符串。要学习;了解有关命名分区的更多信息,请参阅Apache Kafka API文档中的 SourceRecord。

heartbeat.interval.ms

类型:long


描述:Connector在发送心跳消息之间等待的毫秒数。当源记录未在指定时间间隔内发布时,Connector会发送心跳消息。此机制提高了Connector针对低容量命名空间的可恢复性。心跳消息包含

postBatchResumeToken数据字段。该字段的值包含Connector上次从变更流中读取的MongoDB服务器oplog条目。将其设置为

0以禁用心跳消息。要学习;了解更多信息,请参阅“无效恢复令牌”页面中的“预防”。默认值:



0
接受的值:整数

heartbeat.topic.name

类型:字符串


描述:Connector应在其上发布心跳消息的主题的名称。您必须在heartbeat.interval.ms 设置中提供正值才能启用此功能。默认值:

__mongodb_heartbeats
接受的值:有效的Kafka主题名称

如果您在 Kafka Connect 部署中启用心跳并指定单一消息转换 (SMT) ,则必须从 SMT 中排除心跳消息。 SMT 是 Kafka 的一项功能,可让您对通过源 connector 传递的消息指定转换,而无需部署 Atlas Stream Processing 应用程序。

要从 SMT 中排除心跳消息,您必须创建谓词并将其应用于 SMT。 谓词是 SMT 的一项功能,可让您在应用转换之前检查消息是否与条件语句匹配。

以下配置定义了IsHeartbeat谓词,用于匹配发送到默认心跳主题的心跳消息:

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

以下配置使用前面的谓词从ExtractField转换中排除心跳消息:

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

如果不从前面的转换中排除心跳消息,则connector在处理心跳消息后会引发以下错误:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

要学习;了解有关 SMT 的更多信息,请参阅如何通过 Confluence 在Kafka Connect 中使用单个消息转换

要学习;了解有关谓词的更多信息,请参阅 Confluent 中的筛选器 (Apache Kafka)

要学习;了解有关 ExtractField 转换的更多信息,请参阅 Confluence 中的 ExtractField

要了解有关默认密钥模式的更多信息,请参阅默认模式页面。