Mongodb connector - How should be Automated

Hi. Recently, I am working with apache kafka and kafka connect. I have some doubts on kafka-connect. I have connected mongodb-sink connector via plugin.path(connect-distributed.properties). According to the connector properties I am able to create, update and delete the record into db using curl command. Here, before producing the message from producer curl command need to be registered in connector based on curl configuration connector will work.

Below I have given curl command for each operation,

For Create,

curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“test-testing-create”,
“config”:{“topics”:“sunflower”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“sunflower-collection”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”}}’ localhost:8083/connectors

For update,

curl -X POST -H “Content-Type: application/json” -d '{“name”:“test-testing-update”,
“config”:{“topics”:“testing”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“testing”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy.overwrite.existing”:true,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“id”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy”
}}’ localhost:8083/connectors

For delete,

“config”:{“topics”:“subjects”,
“connector.class”:“com.mongodb.kafka.connect.MongoSinkConnector”,
“tasks.max”:“1”,
“connection.uri”:“mongodb://localhost:27017”,
“database”:“flower”,
“collection”:“subjects”,
“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter.schemas.enable”:“false”,
“document.id.strategy”:“com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”,
“document.id.strategy.partial.value.projection.list”:“class”,
“document.id.strategy.partial.value.projection.type”:“AllowList”,
“writemodel.strategy”:“com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy”
}}’ localhost:8083/connectors

My queries are below,

  • should I register the curl command each time before producing the message? If yes, How can I pass different curl name for each time?

  • Is there any way to handle it automatic?