개요
다음 구성 설정을 사용하여 MongoDB Kafka 소스 커넥터에서 오류가 발생할 때 작동하는 방식을 지정하고 중단된 읽기 재개와 관련된 설정을 지정할 수 있습니다.
설정
이름 | 설명 |
|---|---|
mongo.errors.tolerance | 유형: 문자열 설명: connector 오류가 발생했을 |
mongo.errors.log.enable | 유형: 부울 |
mongo.errors.deadletterqueue.topic.name | 유형: 문자열 중요: 이 속성 활성화하려면 |
offset.partition.name | 유형: 문자열 |
heartbeat.interval.ms | 유형: long |
heartbeat.topic.name | 유형: 문자열 |
단일 메시지 변환으로 하트비트
Kafka Connect 배포에서 하트비트를 활성화하고 SMT(단일 메시지 변환) 를 지정하는 경우 SMT에서 하트비트 메시지를 제외해야 합니다. SMT는 Atlas Stream Processing 애플리케이션을 배포할 필요 없이 소스 connector를 통과하는 메시지에 대한 변환을 지정할 수 있는 Kafka의 기능입니다.
SMT에서 하트비트 메시지를 제외하려면 조건 자를 생성하여 SMT에 적용해야 합니다. 조건자는 변환을 적용하기 전에 메시지가 조건문과 일치하는지 확인할 수 있는 SMT의 기능입니다.
다음 구성은 기본 하트비트 주제로 전송된 하트비트 메시지와 일치하는 IsHeartbeat 조건자를 정의합니다.
predicates=IsHeartbeat predicates.IsHeartbeat.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches predicates.IsHeartbeat.pattern=__mongodb_heartbeats
다음 구성에서는 앞의 조건자를 사용하여 ExtractField 변환에서 하트비트 메시지를 제외합니다.
transforms=Extract transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key transforms.Extract.field=<the field to extract from your Apache Kafka key> transforms.Extract.predicate=IsHeartbeat transforms.Extract.negate=true # apply the default key schema as the extract transformation requires a struct object output.format.key=schema
이전 변환에서 하트비트 메시지를 제외하지 않으면 connector가 하트비트 메시지를 처리한 후 다음 오류가 발생합니다.
ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed ... ... Only Struct objects supported for [field extraction], found: java.lang.String
SMT에 대해 자세히 학습하려면 Confluent의 Kafka Connect에서 단일 메시지 변환을 사용하는 방법 을 참조하세요.
조건자에 대해 자세히 학습하려면 Confluent에서 필터링(Apache Kafka) 을 참조하세요.
ExtractField 변환에 대해 자세히 학습하려면 Confluent의 ExtractField 를 참조하세요.
기본 키 스키마에 대해 자세히 알아보려면 기본 스키마 페이지를 참조하세요.