Hi
We are already using kafka source connector (1.9.1).
Sometimes i need to copy all the data from scratch. I#M using “startup.mode: copy_existing” and this works so far.
Sometimes i want copy all the data but only some selected ones. So i tried the filter option described here
Copy Existing Data (I used the same example)
My connector looks like this:
connection.uri:
topic.prefix: mongo
database:
collection: kafkatest
topic.suffix: dev
output.json.formatter: com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
key.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable: false
publish.full.document.only: true
offset.partition.name: kafkatestdev.12
startup.mode: copy_existing
startup.mode.copy.existing.pipeline: [{ “$match”: { “country”: “Mexico” } }]
errors.tolerance: none
pipeline:
[{“$match”:{“operationType”:{“$in”:[“insert”, “update”]}}},{“$project”: {“_id”:1,“fullDocument”:1,“ns”:1,“documentKey”:1}}]
Without the startup.pipeline i get all data but with the startup.pipeline i dont get any existing entries.
Does anyone has an idea whats wrong?
Thank you