Visão geral
Use as seguintes definições de configuração para especificar como o conector de origem do MongoDB Kafka se comporta quando encontra erros e para especificar as configurações relacionadas à retomada de leituras interrompidas.
Configurações
Nome | Descrição |
|---|---|
mongo.errors.tolerance | Tipo: string |
mongo.errors.log.enable | Tipo: booleano |
mongo.errors.deadletterqueue.topic.name | Tipo: string IMPORTANTE: você deve definir |
offset.partition.name | Tipo: string |
heartbeat.interval.ms | Tipo: longo |
heartbeat.topic.name | Tipo: string |
Heartbeats com transformações de mensagem única
Se você habilitar pulsações e especificar Transformações de Mensagem Única (SMTs) em sua implantação do Kafka Connect, deverá excluir suas mensagens de pulsação dos SMTs. Os SMTs são uma funcionalidade do Kafka Connect que permite a você especificar transformações nas mensagens que passam pelo connector de origem sem ter que implantar uma aplicação de Atlas Stream Processing.
Para excluir mensagens de pulsação de seus SMTs, você deve criar e aplicar um predicado aos seus SMTs. Os predicados são um recurso dos SMTs que permite verificar se uma mensagem corresponde a uma declaração condicional antes de aplicar uma transformação.
A seguinte configuração define o predicado IsHeartbeat que corresponde a mensagens de pulsação enviadas para o tópico de pulsação padrão:
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
A seguinte configuração utiliza o predicado anterior para excluir mensagens de pulsação de uma transformação ExtractField :
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
Se você não excluir suas mensagens de pulsação da transformação anterior, o connector gerará o seguinte erro após processar uma mensagem de pulsação:
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
Para saber mais sobre SMTs, consulte Como usar transformações de mensagem única no Kafka Connect da Confluent.
Para saber mais sobre predicados, consulte Filtro (Apache Kafka) do Confluent.
Para saber mais sobre a transformação ExtractField, consulte ExtrairField do Confluent.
Para saber mais sobre o esquema de chave padrão, consulte a página Esquemas Padrão .