Overview
MongoDB Kafka ソース コネクタで無効な再開トークンから回復する方法を学びます。
スタック トレース
次のスタック トレースは、ソース コネクタに無効な再開トークンがあることを示しています。
... org.apache.kafka.connect.errors.ConnectException: ResumeToken not found. Cannot create a change stream cursor ... Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog ... 
原因
ソース コネクタの再開トークンの ID が MongoDB 配置の oplog のどのエントリにも対応しない場合、コネクタは MongoDB 変更ストリームの処理を開始する場所を判断する方法がありません。 この問題が発生する可能性のあるシナリオを確認するには、次のタブをクリックします。
このシナリオでは、ソース コネクタを一時停止し、MongoDB 配置の oplog を入力します。
- MongoDB Kafka ソース コネクタを使用して Kafka 配置を開始します。 
- ソース MongoDB 名前空間に変更イベントを生成し、connector はこのイベントに対応する再開トークンを保存します。 
- ソース コネクタを一時停止します。 
- connectorが一時停止している間に MongoDB oplog を埋めて、MongoDB は再開トークンに対応する oplog エントリを削除します。 
- ソース コネクタを再起動しても、再開トークンが MongoDB oplog に存在しないため、処理を再開できません。 
このシナリオでは、ソース コネクタは、アップデートの頻度が低い MongoDB 名前空間 の変更をリッスンし、ハートビート機能が有効になっていない場合します。
- MongoDB Kafka ソース コネクタを使用して Kafka 配置を開始します。 
- ソース MongoDB 名前空間に変更イベントを生成し、connector はこのイベントに対応する再開トークンを保存します。 
- MongoDB 配置が oplog から再開トークンを発行するために使用する変更イベントをローテーションするのにかかる時間は、ソース MongoDB 名前空間は更新されません。 
- ソース MongoDB 名前空間に変更イベントが発生しており、ソース コネクタは MongoDB oplog に再開トークンが存在しないため、処理を再開できません。 
oplog の詳細については、 MongoDB マニュアルを参照してください。
変更ストリームの詳細については、 Change Streamsガイドをご覧ください。
ソリューション
次のいずれかの方法を使用して、無効な再開トークンから回復できます。
エラーを一時的に許容する
コネクタの再開トークンをアップデートする変更ストリーム イベントを生成する間、エラーを許容するようにソース コネクタを構成できます。 この回復戦略は最も単純ですが、無効な再開トークンとは無関係なエラーをコネクターが一時的に無視するリスクがあります。 配置内のエラーを一時的に許容できない場合は、代わりに保存されたオフセットを削除できます。
エラーを一時的に許容するようにソース コネクタを構成するには、次の手順に従います。
- すべてのエラーを許容するには、 - errors.toleranceオプションを に設定します。- errors.tolerance=all 
- ソース コネクタが参照するコレクション内のドキュメントを挿入、アップデート、または削除して、コネクタの再開トークンをアップデートする変更ストリーム イベントを生成します。 
- 変更ストリーム イベントを生成したら、次のように - errors.toleranceオプションを に設定してエラーを許容されなくなります。- errors.tolerance=none 
errors.toleranceオプションの詳細については、「エラー処理と中断からの再開のプロパティ」ページを参照してください。
保存されたオフセットをリセットする
再開トークンを含む Kafka Connect オフセット データをリセットして、コネクタが変更ストリームの処理を再開できるようにすることができます。
オフセット データをリセットするには、 offset.partition.name構成プロパティの値を、 Kafka 配置に存在しないパーティション名に変更します。 offset.partition.nameプロパティは次のように設定できます。
offset.partition.name=<a string> 
Tip
オフセット パーティションの名前付け
オフセット パーティションに名前を付けるには、次のパターンを使用することを検討してください。
offset.partition.name=<source connector name>.<monotonically increasing number> 
このパターンには、次の利点があります。
- コネクタをリセットした回数を記録 
- オフセット パーティションが属するコネクターのドキュメント 
例、ソースコネクタに"source-values"という名前を付け、 offset.partition.nameプロパティを初めて設定する場合は、コネクタを次のように構成します。
offset.partition.name=source-values.1 
次回コネクタのオフセット データをリセットするときは、コネクタを次のように構成します。
offset.partition.name=source-values.2 
offset.partition.name構成プロパティの詳細については、「エラー処理と中断プロパティからの再開」ページを参照してください。
コネクタの名前付けの詳細については、 公式Apache Kafka ドキュメントを参照してください。
防止
更新頻度の低い 名前空間によって発生する無効な再開トークン エラーを防ぐには、ハートビートを有効にします。 ハートビートは、ソース MongoDB 名前空間の内容が変更された場合に、コネクターに定期的に再開トークンを更新させるソース コネクタの機能です。
ハートビートを有効にするには、ソース コネクタ構成で次のオプションを指定します。
heartbeat.interval.ms=<a positive integer> 
ハートビートの詳細については、「エラー処理と中断からの再開」ガイドを参照してください。