Docs Menu

Docs HomeMongoDB Kafka Connector

Error Handling and Resuming from Interruption Properties

On this page

  • Overview
  • Settings
  • Heartbeats with Single Message Transforms

Use the following configuration settings to specify how the source connector behaves when it encounters errors and to specify settings related to resuming interrupted reads.

Name
Description
offset.partition.name
Type: string

Description:
The custom offset partition name to use. You can use this option to instruct the connector to start a new change stream when an existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name based on the connection details. To view a strategy for naming offset partitions, see the Reset Stored Offsets guide.
Default: ""
Accepted Values: A string. To learn more about naming a partition, see SourceRecord in the Apache Kafka API documentation.
heartbeat.interval.ms
Type: long

Description:
The number of milliseconds the connector waits between sending heartbeat messages. The connector sends heartbeat messages when source records are not published in the specified interval.
Heartbeat messages contain a postBatchResumeToken data field. The value of this field contains the MongoDB server oplog entry that the connector last read from the change stream.
This mechanism improves resumability of the connector for low volume namespaces. See the Invalid Resume Token page for more information on this feature.
Set this to 0 to disable heartbeat messages.
Default: 0
Accepted Values: An integer
heartbeat.topic.name
Type: string

Description:
The name of the topic on which the connector should publish heartbeat messages. You must provide a positive value in the heartbeat.interval.ms setting to enable this feature.

Default: __mongodb_heartbeats
Accepted Values: A valid Kafka topic name

If you enable heartbeats and specify Single Message Transforms (SMTs) in your Kafka Connect deployment, you must exclude your heartbeat messages from your SMTs. SMTs are a feature of Kafka Connect that enables you to specify transformations on the messages that pass through your MongoDB Kafka source connector without having to deploy a stream processing application.

To exclude heartbeat messages from your SMTs, you must create and apply a predicate to your SMTs. Predicates are a feature of SMTs that enables you to check if a message matches a conditional statement before applying a transformation.

The following configuration defines the IsHeartbeat predicate which matches heartbeat messages sent to the default heartbeat topic:

predicates=IsHeartbeat
predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsHeartbeat.pattern=__mongodb_heartbeats

The following configuration uses the preceding predicate to exclude heartbeat messages from an ExtractField transformation:

transforms=Extract
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=<the field to extract from your Apache Kafka key>
transforms.Extract.predicate=IsHeartbeat
transforms.Extract.negate=true
# apply the default key schema as the extract transformation requires a struct object
output.format.key=schema

If you do not exclude your heartbeat messages from the preceding transformation, your connector raises the following error once it processes a heartbeat message:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ...
...
Only Struct objects supported for [field extraction], found: java.lang.String

To learn more about SMTs, see How to Use Single Message Transforms in Kafka Connect from Confluent.

To learn more about predicates, see Filter (Apache Kafka) from Confluent.

To learn more about the ExtractField transformation, see ExtractField from Confluent.

To learn more about the default key schema, see the Default Schemas page.

←  Copy Existing PropertiesAll Source Connector Configuration Properties →