Kafka copy_existing in combination with pipeline doesnt work


We are already using kafka source connector (1.9.1).
Sometimes i need to copy all the data from scratch. I#M using “startup.mode: copy_existing” and this works so far.
Sometimes i want copy all the data but only some selected ones. So i tried the filter option described here
Copy Existing Data (I used the same example)

My connector looks like this:


topic.prefix: mongo
collection: kafkatest
topic.suffix: dev
output.json.formatter: com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable: false
publish.full.document.only: true
offset.partition.name: kafkatestdev.12
startup.mode: copy_existing
startup.mode.copy.existing.pipeline: [{ “$match”: { “country”: “Mexico” } }]
errors.tolerance: none
[{“$match”:{“operationType”:{“$in”:[“insert”, “update”]}}},{“$project”: {“_id”:1,“fullDocument”:1,“ns”:1,“documentKey”:1}}]

Without the startup.pipeline i get all data but with the startup.pipeline i dont get any existing entries.
Does anyone has an idea whats wrong?

Thank you

Fix for anyone who got the same problem.

startup.mode.copy.existing.pipeline: ‘[{ “$match”: { “country”: “Mexico” } }]’

I used single quotes for the pipeline.