Overview
了解如何从 MongoDB Kafka 源连接器中的无效恢复令牌中恢复。
堆栈跟踪
以下堆栈追踪显示,源连接器有无效的恢复令牌:
... org.apache.kafka.connect.errors.ConnectException: ResumeToken not found. Cannot create a change stream cursor ... Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog ... 
原因
当源连接器的恢复令牌 ID 与 MongoDB 部署的 oplog 中的任何条目都不对应时,连接器就无法确定从哪里开始处理 MongoDB 变更流。单击以下标签页,查看您可能遇到此问题的场景:
在此情况下,暂停源连接器,然后填写 MongoDB 部署的 oplog:
- 您可以使用 MongoDB Kafka 源连接器启动 Kafka 部署。 
- 源 MongoDB 命名空间中生成变更事件后,连接器就会存储与该事件对应的恢复令牌。 
- 暂停您的源连接器。 
- 在连接器暂停时填写 MongoDB oplog,这样会让 MongoDB 删除与恢复令牌对应的 oplog 条目。 
- 此时重新启动连接器就无法继续处理,因为 MongoDB oplog 中不存在相应的恢复令牌。 
在这种情况下,您的源connector会侦听不常更新的MongoDB命名空间上的更改,并且不会启用心跳功能:
- 您可以使用 MongoDB Kafka 源连接器启动 Kafka 部署。 
- 源 MongoDB 命名空间中生成变更事件后,连接器就会存储与该事件对应的恢复令牌。 
- 在MongoDB 部署从其oplog中轮换与恢复令牌对应的变更事件时,源MongoDB命名空间不会进行更新。 
- 您在源MongoDB命名空间中生成变更事件,而源connector无法恢复处理,因为MongoDB oplog中不存在其恢复令牌。 
有关 oplog 的更多信息,请参阅MongoDB 手册。
有关变更流的更多信息,请参阅 Change Streams指南。
解决方案
您可以使用以下策略之一从无效的恢复令牌中恢复:
暂时容许的错误
您可以配置源连接器,使其在生成变更流事件以更新连接器恢复令牌时,可容许错误。此恢复策略最为简单,但是存在一个风险,即连接器会暂时忽略与无效恢复令牌无关的错误。如果您不能暂时容许部署中的错误,则可以删除存储的偏移量。
要将源连接器配置为暂时容许错误,请执行以下操作:
- 设置 - errors.tolerance选项以容许所有错误:- errors.tolerance=all 
- 在源连接器引用的集合中插入、更新或删除文档,生成变更流事件以更新连接器的恢复令牌。 
- 生成变更流事件后,将 - errors.tolerance选项设置为不再容忍错误:- errors.tolerance=none 
有关 errors.tolerance 选项的更多信息,请参阅错误处理和从中断属性恢复页面。
重置存储的偏移
您可以重置包含恢复令牌的 Kafka Connect 偏移数据,支持连接器恢复处理变更流。
要重置偏移数据,请将 offset.partition.name 配置属性的值变更为 Kafka 部署中不存在的分区名称。您可以按如下方式设置 offset.partition.name 属性:
offset.partition.name=<a string> 
提示
为偏移分区命名
请考虑使用以下模式命名偏移分区:
offset.partition.name=<source connector name>.<monotonically increasing number> 
此模式具有以下优点:
- 记录重置连接器的次数 
- 偏移分区所属连接器的文档 
示例,如果您将源Connector命名为"source-values" ,并且是首次设置offset.partition.name属性,请按如下方式配置Connector:
offset.partition.name=source-values.1 
下次重置连接器的偏移数据时,请按以下方式配置连接器:
offset.partition.name=source-values.2 
要了解有关 offset.partition.name 配置属性的详情,请参阅错误处理和从中断恢复属性页面。
要学习;了解如何命名Connector,请参阅Apache Kafka 官方文档。
防止
要防止不经常更新的命名空间导致的无效恢复令牌错误,请启用 heartbeats。heartbeats 是源连接器的一项功能,可让连接器定期以及在源 MongoDB 命名空间的内容发生变更时更新其恢复令牌。
在源连接器配置中指定以下选项以启用 heartbeats:
heartbeat.interval.ms=<a positive integer> 
要了解有关心跳的更多信息,请参阅中断属性中的错误处理和恢复指南。