Atlas Kafka Connect Sink Connector - 'operationType' missing in the record

Hi,

it seems that the atlas connector is deviating from the official mongodb sink connector and that certain properties are missing such as:

key.converter & value.conveter

The confluent docs only specify e.g. “input.data.format”: “STRING”
i’m currently quite unsure what the value in kafka should look like that this error:

 "connector": {
        "state": "FAILED",
        "worker_id": "name",
        "trace": "Required field 'operationType' missing in the record. Please ensure the data in the topic is in the format expected by the MongoDB Atlas Sink connector.\n"
    }

can be resolved.

The sink connector config:

"config": {
        "cdc.handler": "MongoDbChangeStreamHandler",
        "cloud.environment": "prod",
        "cloud.provider": "aws",
        "collection": "some",
        "connection.host": "some",
        "connection.password": "some",
        "connection.user": "some",
        "connector.class": "MongoDbAtlasSink",
        "database": "test-support",
        "input.data.format": "STRING",
        "kafka.api.key": "SUGN6WZTQFC6L3KU",
        "kafka.api.secret": "",
        "kafka.auth.mode": "KAFKA_API_KEY",
        "kafka.endpoint": "",
        "kafka.region": "",
        "name": "",
        "tasks.max": "1",
        "topics": "test,test2",
        "write.strategy": "DefaultWriteModelStrategy"
    }

Would you be able to give me some guidance on how the kafka key + value must look like in order for this to work?

I’m currently using the official mongodb src connector but seemingly we must configure it slightly different from the official sink connector when using the atlas connector.

Also: is the source somehwhere available? That might have helped me :slight_smile:

Thank you in advance

Robin