Overview
次の構成設定を使用して、MongoDB Kafka ソース コネクタがエラーを発生したときにどのように動作するかを指定し、中断された読み取りの再開に関連する設定を指定します。
設定
| 名前 | 説明 | 
|---|---|
| mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to  "none"if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to  "all"if you want the connector to continue
processing messages and ignore any errors it encounters.IMPORTANT: This property overrides the
errors.tolerance
Connect Framework property. Default:  "none"Accepted Values:  "none"or"all" | 
| mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to  trueto log all errors the connector encounters.Set this to  falseto log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using theerrors.toleranceormongo.errors.tolerancesetting.IMPORTANT: This property overrides the
errors.log.enable
Connect Framework property. Default:  falseAccepted Values:  trueorfalse | 
| mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. IMPORTANT: You must set  errors.toleranceormongo.errors.tolerancesetting to"all"to enable this property.Default:  ""Accepted Values: A valid Kafka topic name | 
| offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see Reset Stored Offsets. Default:  ""Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. | 
| heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. This mechanism improves
resumability of the connector for low volume namespaces. Heartbeat messages contain a  postBatchResumeTokendata field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.Set this to  0to disable heartbeat messages.To learn more, see Prevention
in the Invalid Resume Token
page. Default:  0Accepted Values: An integer | 
| heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
 heartbeat.interval.mssetting to enable this feature.Default:  __mongodb_heartbeatsAccepted Values: A valid Kafka topic name | 
単一メッセージ変換によるハートビート
Kafka Connect 配置でハートビートを有効にして単一メッセージ変換(SMT)を指定する場合は、SMT からハートビート メッセージを除外する必要があります。 SNT は Kafka Connect の機能であり、ストリーム処理アプリケーションを配置しなくても、ソース コネクタを通過するメッセージの変換を指定できます。
SMT からハートビート メッセージを除外するには、述語を作成して SNT に適用する必要があります。 述語は、変換を適用する前にメッセージが条件ステートメントと一致するかどうかを確認できる S付与 の機能です。
次の構成では、デフォルトのハートビート トピックに送信されたハートビート メッセージに一致するIsHeartbeat述語を定義します。
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats 
次の構成では、前述の述語を使用して、 ExtractField変換からハートビート メッセージを除外します。
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema 
前述の変換からハートビート メッセージを除外しない場合、コネクターはハートビート メッセージを処理すると次のエラーを発生させます。
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String 
SMD の詳細については、Confluent からのKafka Connect で単一メッセージ変換を使用する方法 を参照してください。
述語の詳細については、 Confluent のフィルター(Apache Kafka)を参照してください。
ExtractField 変換の詳細については、Confluent からの ExtractField を参照してください。
デフォルトのキースキーマの詳細については、「デフォルトのスキーマ」ページを参照してください。