Hi All,
I have a requirement , where i need to push change to kafka topic using " Kafka Source Connector" from a mongo collection . Only if there is a change in specific attribute only …
Collection example :
{
_id: 123
name: xyz
class: 2
}
I want to push document in kafka topic only in case of any update in “name” . If any update happens in “class” i don’t want to push message.
I tried reading below link but as per this we can only capture change at collection level .
You can use the pipeline configuration parameter to specify an aggregation pipeline that should $match the condition you are trying to achieve. In your case you want to match where operationType is Update and the field ‘class’ exists within updateDecription.UpdateFields as follows:
[{
$match: {
$and: [
{ "updateDescription.updatedFields.class": { $exists: true } },
{ operationType: "update" }
]
}]
Here is an example connector configuration: (note I had to escape the quotes in the pipeline so curl would accept it)
curl -X POST -H "Content-Type: application/json" --data '
{"name": "mongo-source-tutorial-update-value-changed",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"pipeline":"[{\"$match\": { \"$and\": [{\"updateDescription.updatedFields.class\": { \"$exists\" : \"true\"}},{\"operationType\":\"update\"}] } }]","database":"UpdateExample","collection":"Source"}}' http://localhost:8083/connectors -w "\n"
1 Like