Error Handling and Resuming from Interruption Properties
Overview
Use the following configuration settings to specify how the MongoDB Kafka source connector behaves when it encounters errors and to specify settings related to resuming interrupted reads.
Settings
Name | Description |
---|---|
mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to "none" if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to "all" if you want the connector to continue
processing messages and ignore any errors it encounters.ImportantThis property overrides the errors.tolerance Connect Framework property. Default: "none" Accepted Values: "none" or "all" |
mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to true to log all errors the connector encounters.Set this to false to log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using the errors.tolerance or mongo.errors.tolerance
setting.ImportantThis property overrides the errors.log.enable Connect Framework property. Default: false Accepted Values: true or false |
mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. ImportantYou must set Default: "" Accepted Values: A valid Kafka topic name |
offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave
this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see the Reset Stored Offsets
guide. Default: "" Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. |
heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. Heartbeat messages contain a postBatchResumeToken data field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.This mechanism improves resumability of the connector for low volume
namespaces. See the Invalid Resume Token
page for more information on this feature. Set this to 0 to disable heartbeat messages.Default: 0 Accepted Values: An integer |
heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms setting to enable this feature.Default: __mongodb_heartbeats Accepted Values: A valid Kafka topic name |
Heartbeats with Single Message Transforms
If you enable heartbeats and specify Single Message Transforms (SMTs) in your Kafka Connect deployment, you must exclude your heartbeat messages from your SMTs. SMTs are a feature of Kafka Connect that enables you to specify transformations on the messages that pass through your source connector without having to deploy a stream processing application.
To exclude heartbeat messages from your SMTs, you must create and apply a predicate to your SMTs. Predicates are a feature of SMTs that enables you to check if a message matches a conditional statement before applying a transformation.
The following configuration defines the IsHeartbeat
predicate which matches
heartbeat messages sent to the default heartbeat topic:
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
The following configuration uses the preceding predicate to exclude heartbeat
messages from an ExtractField
transformation:
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
If you do not exclude your heartbeat messages from the preceding transformation, your connector raises the following error once it processes a heartbeat message:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
To learn more about SMTs, see How to Use Single Message Transforms in Kafka Connect from Confluent.
To learn more about predicates, see Filter (Apache Kafka) from Confluent.
To learn more about the ExtractField
transformation, see
ExtractField
from Confluent.
To learn more about the default key schema, see the Default Schemas page.