Overview
Use the following configuration settings to specify how the MongoDB Kafka source connector behaves when it encounters errors and to specify settings related to resuming interrupted reads.
Settings
Name | Description |
|---|---|
mongo.errors.tolerance | Type: string |
mongo.errors.log.enable | Type: boolean |
mongo.errors.deadletterqueue.topic.name | Type: string IMPORTANT: You must set |
offset.partition.name | Type: string |
heartbeat.interval.ms | Type: long |
heartbeat.topic.name | Type: string |
Heartbeats with Single Message Transforms
If you enable heartbeats and specify Single Message Transforms (SMTs) in your Kafka Connect deployment, you must exclude your heartbeat messages from your SMTs. SMTs are a feature of Kafka Connect that enables you to specify transformations on the messages that pass through your source connector without having to deploy a stream processing application.
To exclude heartbeat messages from your SMTs, you must create and apply a predicate to your SMTs. Predicates are a feature of SMTs that enables you to check if a message matches a conditional statement before applying a transformation.
The following configuration defines the IsHeartbeat predicate which matches heartbeat messages sent to the default heartbeat topic:
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
The following configuration uses the preceding predicate to exclude heartbeat messages from an ExtractField transformation:
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
If you do not exclude your heartbeat messages from the preceding transformation, your connector raises the following error once it processes a heartbeat message:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
To learn more about SMTs, see How to Use Single Message Transforms in Kafka Connect from Confluent.
To learn more about predicates, see Filter (Apache Kafka) from Confluent.
To learn more about the ExtractField transformation, see ExtractField from Confluent.
To learn more about the default key schema, see the Default Schemas page.