Hi. Recently, I am working with apache kafka and kafka connect. I have some doubts on kafka-connect. I have connected mongodb-sink connector via plugin.path(connect-distributed.properties). According to the connector properties I am able to create, update and delete the record into db using curl command. Here, before producing the message from producer curl command need to be registered in connector based on curl configuration connector will work.
Below I have given curl command for each operation,
For Create,
curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“test-testing-create”,
“config”:{“topics”:“sunflower”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“sunflower-collection”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”}}’ localhost:8083/connectors
For update,
curl -X POST -H “Content-Type: application/json” -d '{“name”:“test-testing-update”,
“config”:{“topics”:“testing”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“testing”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy.overwrite.existing”:true,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“id”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy”
}}’ localhost:8083/connectors
For delete,
“config”:{“topics”:“subjects”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“subjects”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“class”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy”
}}’ localhost:8083/connectors
My queries are below,
-
should I register the curl command each time before producing the message? If yes, How can I pass different curl name for each time?
-
Is there any way to handle it automatic?