Can we capture CDC at attribute level instead of whole collection

Hi All,

I have a requirement , where i need to push change to kafka topic using " Kafka Source Connector" from a mongo collection . Only if there is a change in specific attribute only …

Collection example :

{
   _id: 123
   name: xyz
   class: 2
}

I want to push document in kafka topic only in case of any update in “name” . If any update happens in “class” i don’t want to push message.

I tried reading below link but as per this we can only capture change at collection level .

You can use the pipeline configuration parameter to specify an aggregation pipeline that should $match the condition you are trying to achieve. In your case you want to match where operationType is Update and the field ‘class’ exists within updateDecription.UpdateFields as follows:

[{
    $match: {
      $and: [
        { "updateDescription.updatedFields.class": { $exists: true } },
        { operationType: "update" }
      ]
    }]

Here is an example connector configuration: (note I had to escape the quotes in the pipeline so curl would accept it)

curl -X POST -H "Content-Type: application/json" --data '
{"name": "mongo-source-tutorial-update-value-changed",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"pipeline":"[{\"$match\": { \"$and\": [{\"updateDescription.updatedFields.class\": { \"$exists\" : \"true\"}},{\"operationType\":\"update\"}] } }]","database":"UpdateExample","collection":"Source"}}' http://localhost:8083/connectors -w "\n" 
1 Like