Hi,
it seems that the atlas connector is deviating from the official mongodb sink connector and that certain properties are missing such as:
key.converter & value.conveter
The confluent docs only specify e.g. “input.data.format”: “STRING”
i’m currently quite unsure what the value in kafka should look like that this error:
"connector": {
"state": "FAILED",
"worker_id": "name",
"trace": "Required field 'operationType' missing in the record. Please ensure the data in the topic is in the format expected by the MongoDB Atlas Sink connector.\n"
}
can be resolved.
The sink connector config:
"config": {
"cdc.handler": "MongoDbChangeStreamHandler",
"cloud.environment": "prod",
"cloud.provider": "aws",
"collection": "some",
"connection.host": "some",
"connection.password": "some",
"connection.user": "some",
"connector.class": "MongoDbAtlasSink",
"database": "test-support",
"input.data.format": "STRING",
"kafka.api.key": "SUGN6WZTQFC6L3KU",
"kafka.api.secret": "",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.endpoint": "",
"kafka.region": "",
"name": "",
"tasks.max": "1",
"topics": "test,test2",
"write.strategy": "DefaultWriteModelStrategy"
}
Would you be able to give me some guidance on how the kafka key + value must look like in order for this to work?
I’m currently using the official mongodb src connector but seemingly we must configure it slightly different from the official sink connector when using the atlas connector.
Also: is the source somehwhere available? That might have helped me
Thank you in advance
Robin