Overview
Utiliza los siguientes ajustes de configuración para especificar cómo debe comportarse el conector de origen MongoDB Kafka cuando encuentre errores y para especificar los ajustes relacionados con la reanudación de lecturas interrumpidas.
Configuraciones
Nombre | Descripción |
|---|---|
mongo.errors.tolerance | Type: string Description: Whether to continue processing messages when the connector encounters
an error. Set this to "none" if you want the connector to stop
processing messages and report the issue if it encounters an
error.Set this to "all" if you want the connector to continue
processing messages and ignore any errors it encounters.IMPORTANT: This property overrides the
errors.tolerance
Connect Framework property. Default: "none"Accepted Values: "none" or "all" |
mongo.errors.log.enable | Type: boolean Description: Whether the connector should report errors in the log file. Set this to true to log all errors the connector encounters.Set this to false to log errors that are not tolerated by the
connector. You can specify which errors the connector should
tolerate using the errors.tolerance or mongo.errors.tolerance
setting.IMPORTANT: This property overrides the
errors.log.enable
Connect Framework property. Default: falseAccepted Values: true or false |
mongo.errors.deadletterqueue.topic.name | Type: string Description: The name of topic to use as the dead letter queue. If you specify a value, the connector writes invalid messages to the
dead letter queue topic as extended JSON strings. If you leave this setting blank, the connector does not write
invalid messages to any topic. IMPORTANT: You must set errors.tolerance or mongo.errors.tolerance
setting to "all" to enable this property.Default: ""Accepted Values: A valid Kafka topic name |
offset.partition.name | Type: string Description: The custom offset partition name to use. You can use this option
to instruct the connector to start a new change stream when an
existing offset contains an invalid resume token. If you leave this setting blank, the connector uses the default partition name
based on the connection details. To view a strategy for naming
offset partitions, see Reset Stored Offsets. Default: ""Accepted Values: A string. To learn more about naming a partition,
see
SourceRecord
in the Apache Kafka API documentation. |
heartbeat.interval.ms | Type: long Description: The number of milliseconds the connector waits between sending
heartbeat messages. The connector sends heartbeat messages when
source records are not published in the specified interval. This mechanism improves
resumability of the connector for low volume namespaces. Heartbeat messages contain a postBatchResumeToken data field.
The value of this field contains the MongoDB server oplog entry that
the connector last read from the change stream.Set this to 0 to disable heartbeat messages.To learn more, see Prevention
in the Invalid Resume Token
page. Default: 0Accepted Values: An integer |
heartbeat.topic.name | Type: string Description: The name of the topic on which the connector should publish
heartbeat messages. You must provide a positive value in the
heartbeat.interval.ms setting to enable this feature.Default: __mongodb_heartbeatsAccepted Values: A valid Kafka topic name |
Latidos cardíacos con transformaciones de mensaje único
Si habilita los latidos y especifica las Transformaciones de mensaje único (SMT) en su implementación de Kafka Connect, debe excluir sus mensajes de latido de sus SMT. Las SMT son una funcionalidad de Kafka Connect que le permite especificar transformaciones en los mensajes que pasan a través de su conector de origen sin necesidad de implementar una aplicación de Stream Processing.
Para excluir los mensajes de latidos de tus SMT, debes crear y aplicar un predicado a tus SMT. Los predicados son una funcionalidad de los SMT que te permite comprobar si un mensaje coincide con una instrucción condicional antes de aplicar una transformación.
La siguiente configuración define el predicado IsHeartbeat que coincide con los mensajes de latido enviados al tema de latido predeterminado:
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
La siguiente configuración utiliza el predicado anterior para excluir los mensajes de latido de una transformación ExtractField:
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
Si no excluyes tus mensajes de latido de la transformación anterior, tu conector generará el siguiente error una vez que procese un mensaje de latido:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
Para aprender más sobre los SMT, consulta Cómo usar las transformaciones de un solo mensaje (SMT) en Kafka Connect de Confluent.
Para obtener más información sobre los predicados, consulte Filtro (Apache Kafka) de Confluent.
Para obtener más información sobre la transformación ExtractField, consulta ExtractField de Confluent.
Para obtener más información sobre el esquema de claves por defecto, consulte la página Esquemas por defecto.