AIエージェントの場合: ドキュメントインデックスはhttps://www.mongodb.com/ja-jp/docs/llms.txt で利用可能です。任意のURLパスに .md を追加することで、すべてのページのマークダウン バージョンが利用できます。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

エラー処理と中断からの再開のプロパティ

次の構成設定を使用して、MongoDB Kafka ソース コネクタがエラーを発生したときにどのように動作するかを指定し、中断された読み取りの再開に関連する設定を指定します。

名前
説明

mongo.errors.tolerance

タイプ: string


説明:コネクタがエラーを発生した場合にメッセージの処理を続行するかどうか。コネクタでメッセージの処理を停止し、エラーが発生した場合に問題を報告する場合は、これを に設定します。コネクタでメッセージの処理を続行し、発生したエラーを無視する場合は、これを に設定します。重要:

"none"

"all"

このプロパティは、 errors.tolerance Connect Frameworkプロパティを上書きします。デフォルト:

"none"
許容値:"none" または"all"

mongo.errors.log.enable

タイプ:ブール値値 説明:コネクタがログファイルのエラーを報告するかどうか。コネクタが発生したすべてのエラーをログには、これを




true

falseに設定します。 コネクタが許容しないエラーをログに記録するには、これを に設定します。errors.tolerance mongo.errors.toleranceまたは

設定を使用して、コネクタが許容するエラーを指定できます。重要: このプロパティはエラーを上書きします。 ログ.enable Connect

Frameworkプロパティ.デフォルト:false
許容値:true またはfalse

mongo.errors.dealterqueue.token.name

タイプ: string 説明:デッドレターキュー




(DLQ)として使用するトピックの名前。値を指定すると、コネクタは無効なメッセージを拡張JSON string としてデッドレターキュー

(DLQ)トピックに書込みます。この設定を空白のままにすると、コネクタはトピックに無効なメッセージを書込みません。

重要: errors.tolerancemongo.errors.tolerance"all"このプロパティを有効にするには、 または の設定を に設定する必要があります。デフォルト:

""
許容値: 有効なKafkaトピック名

distinct. partition.name

タイプ: string

説明:
使用するカスタム オフセット パーティション名。このオプションを使用すると、既存のオフセットに無効な再開トークンが含まれている場合に、コネクタに新しい変更ストリームを開始するように指示できます。この設定を空白のままにすると、

コネクタ は接続の詳細に基づいてデフォルトのパーティション名を使用します。オフセット

パーティションの名前付け戦略については、「 保存されたオフセットをリセットする 」を参照してください。デフォルト:

許容値:""
string。パーティションの名前の詳細については、 Apache Kafka APIドキュメントの「 SourceRecord 」を参照してください。

heartbeat.Interval.ms

タイプ: long

説明:コネクタがハートビート


postBatchResumeTokenメッセージの送信間で待機するミリ秒数。 コネクタは、ソース レコードが指定された間隔で公開されない場合に、ハートビート メッセージを送信します。このメカニズムにより、低ボリューム名空間でのコネクタの再開可能性が向上します。ハートビート メッセージには データフィールドが含まれます。このフィールドの値には、コネクタが変更ストリームから最後に読み込んだMongoDBサーバーのoplog のエントリが含まれます。ハートビート メッセージを無効にするには、これを に設定します。詳細については、「

0

無効な再開トークン 」ページの「 無効な再開トークンの防止

」を参照してください。デフォルト:0
許容値: 整数

heartbeat.token.name

タイプ: string

説明:コネクタがハートビート
メッセージを公開するトピックの名前。この機能を有効にするには、 設定に正の値を指定する必要があります。デフォルト:heartbeat.interval.ms

__mongodb_heartbeats
許容値: 有効なKafkaトピック名

Kafka Connect 配置でハートビートを有効にして単一メッセージ変換(SMT)を指定する場合は、SMT からハートビート メッセージを除外する必要があります。 SNT は Kafka Connect の機能であり、ストリーム処理アプリケーションを配置しなくても、ソース コネクタを通過するメッセージの変換を指定できます。

SMT からハートビート メッセージを除外するには、述語を作成して SNT に適用する必要があります。 述語は、変換を適用する前にメッセージが条件ステートメントと一致するかどうかを確認できる S付与 の機能です。

次の構成では、デフォルトのハートビート トピックに送信されたハートビート メッセージに一致するIsHeartbeat述語を定義します。

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

次の構成では、前述の述語を使用して、 ExtractField変換からハートビート メッセージを除外します。

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

前述の変換からハートビート メッセージを除外しない場合、コネクターはハートビート メッセージを処理すると次のエラーを発生させます。

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

SMD の詳細については、Confluent からのKafka Connect で単一メッセージ変換を使用する方法 を参照してください。

述語の詳細については、 Confluent のフィルター(Apache Kafka)を参照してください。

ExtractField 変換の詳細については、Confluent からの ExtractField を参照してください。

デフォルトのキースキーマの詳細については、「デフォルトのスキーマ」ページを参照してください。