Extract ObjectID value without $oid in MongoDB Kafka SourceConnector

Hi, I am using com.mongodb.kafka.connect.MongoSourceConnector to stream data from MongoDB into Kafka. My partition key is an ObjectID (ex: company : {"$oid": “value”}. I need to extract the value from the object id and set it as the Kafka message key.

This is my current configuration:

‘{
“connector.class”: “com.mongodb.kafka.connect.MongoSourceConnector”,
“publish.full.document.only”: “false”,
“mongo.errors.log.enable”: “true”,
“tasks.max”: “1”,
“output.format.value”: “json”,
“change.stream.full.document”: “updateLookup”,
“collection”: “job”,
“output.schema.key”: “{“name”:“JobKeySchema”,“type”:“record”,“namespace”:“com.rippling.main.avro”,“fields”:[{“name”:“fullDocument”,“type”:{“name”:“fullDocument”,“type”:“record”,“fields”:[{“name”:“company”,“type”:“string”}]}}]}”,
“output.format.key”: “schema”,
“mongo.errors.tolerance”: “all”,
“database”: “infra1”,
“topic.prefix”: “cdc”,
“output.json.formatter”: “com.mongodb.kafka.connect.source.json.formatter.ExtendedJson”,
“name”: “cdc-infra1-job”,
“copy.existing”: “true”,
“value.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”
}’

The above configuration results in the following message key: H{"$oid": “value”}. How can I extract the value only? Setting the type of company as record does not work, I get an error that actual type is object id not record. It looks like the connector converter is serializing the whole object id as a single string.

I have also attempted to use StringConverter instead of AvroConverter and got the same result. Any help is appreciated.

This might work, try the ExtractField SMT

"transforms": "ExtractField",
"transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field": "$oid"
1 Like