Overview
次の構成設定を使用して、MongoDB Kafka ソース コネクタがエラーを発生したときにどのように動作するかを指定し、中断された読み取りの再開に関連する設定を指定します。
設定
名前 | 説明 |
|---|---|
mongo.errors.tolerance | タイプ: string |
mongo.errors.log.enable | タイプ:ブール値値 説明:コネクタがログファイルのエラーを報告するかどうか。コネクタが発生したすべてのエラーをログには、これを |
mongo.errors.dealterqueue.token.name | タイプ: string 説明:デッドレターキュー 重要: |
distinct. partition.name | タイプ: string |
heartbeat.Interval.ms | タイプ: long |
heartbeat.token.name | タイプ: string |
単一メッセージ変換によるハートビート
Kafka Connect 配置でハートビートを有効にして単一メッセージ変換(SMT)を指定する場合は、SMT からハートビート メッセージを除外する必要があります。 SNT は Kafka Connect の機能であり、ストリーム処理アプリケーションを配置しなくても、ソース コネクタを通過するメッセージの変換を指定できます。
SMT からハートビート メッセージを除外するには、述語を作成して SNT に適用する必要があります。 述語は、変換を適用する前にメッセージが条件ステートメントと一致するかどうかを確認できる S付与 の機能です。
次の構成では、デフォルトのハートビート トピックに送信されたハートビート メッセージに一致するIsHeartbeat述語を定義します。
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
次の構成では、前述の述語を使用して、 ExtractField変換からハートビート メッセージを除外します。
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
前述の変換からハートビート メッセージを除外しない場合、コネクターはハートビート メッセージを処理すると次のエラーを発生させます。
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
SMD の詳細については、Confluent からのKafka Connect で単一メッセージ変換を使用する方法 を参照してください。
述語の詳細については、 Confluent のフィルター(Apache Kafka)を参照してください。
ExtractField 変換の詳細については、Confluent からの ExtractField を参照してください。
デフォルトのキースキーマの詳細については、「デフォルトのスキーマ」ページを参照してください。