Overview
使用以下配置设置指定 MongoDB Kafka Source 连接器在遇到错误时的行为方式,并指定与恢复中断读取相关的设置。
设置
名称 | 说明 |
|---|---|
mongo.errors.tolerance | |
mongo.errors.log.enable | |
mongo.errors.deadletterqueue.topic.name | |
offset.partition.name | |
heartbeat.interval.ms | |
heartbeat.topic.name | 类型:字符串 |
使用单个消息转换的心跳
如果您在 Kafka Connect 部署中启用心跳并指定单一消息转换 (SMT) ,则必须从 SMT 中排除心跳消息。 SMT 是 Kafka 的一项功能,可让您对通过源 connector 传递的消息指定转换,而无需部署 Atlas Stream Processing 应用程序。
要从 SMT 中排除心跳消息,您必须创建谓词并将其应用于 SMT。 谓词是 SMT 的一项功能,可让您在应用转换之前检查消息是否与条件语句匹配。
以下配置定义了IsHeartbeat谓词,用于匹配发送到默认心跳主题的心跳消息:
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
以下配置使用前面的谓词从ExtractField转换中排除心跳消息:
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
如果不从前面的转换中排除心跳消息,则connector在处理心跳消息后会引发以下错误:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
要学习;了解有关 SMT 的更多信息,请参阅如何通过 Confluence 在Kafka Connect 中使用单个消息转换。
要学习;了解有关谓词的更多信息,请参阅 Confluent 中的筛选器 (Apache Kafka)。
要学习;了解有关 ExtractField 转换的更多信息,请参阅 Confluence 中的 ExtractField。
要了解有关默认密钥模式的更多信息,请参阅默认模式页面。