How to flat data in fullDocument with Kafka connector?

Hi all,

I’m tried to get data from mongoDB → Kafka topic → ksqlDB processing

My expected data via mongoDB atlas source connector able capture insert,update and delete event and also read all existing to Kafka topic and my connector configure as:

"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connection.uri":"XXX",
	  "database":"XXX",
      "collection":"XXX",
      "pipeline":"[{\"$match\": { \"$or\": [{\"operationType\": \"insert\"},{\"operationType\": \"delete\"},{\"operationType\": \"update\"}]}}]", 
	  "copy.existing":"true",
	  "key.converter.schemas.enable":"true",
      "value.converter.schemas.enable":"true"

Data in Kafka topic as:

{
  "_id": {
    "_id": "XXX",
    "copyingData": true
  },
  "operationType": "insert",
  "documentKey": {
    "_id": "XXX"
  },
  "fullDocument": { ... }

So I need to flat/unwrap data under fullDocument but still have operationType.

How to do that?

if you just want the fullDocument just set

 "publish.full.document.only": true

Ah I re-read your question, if you still want operationType and fiullDocument, I think you can just $project these two fields in the pipeline. or use an SMT to parse the fields. An SMT would require you use schemas though.