How to flat data in fullDocument with Kafka connector?

Hi all,

I’m tried to get data from mongoDB → Kafka topic → ksqlDB processing

My expected data via mongoDB atlas source connector able capture insert,update and delete event and also read all existing to Kafka topic and my connector configure as:

      "pipeline":"[{\"$match\": { \"$or\": [{\"operationType\": \"insert\"},{\"operationType\": \"delete\"},{\"operationType\": \"update\"}]}}]", 

Data in Kafka topic as:

  "_id": {
    "_id": "XXX",
    "copyingData": true
  "operationType": "insert",
  "documentKey": {
    "_id": "XXX"
  "fullDocument": { ... }

So I need to flat/unwrap data under fullDocument but still have operationType.

How to do that?

if you just want the fullDocument just set

 "publish.full.document.only": true

Ah I re-read your question, if you still want operationType and fiullDocument, I think you can just $project these two fields in the pipeline. or use an SMT to parse the fields. An SMT would require you use schemas though.