Atlas MongoDB sink connector for strimzi kafka setup

Hi,

I am new kafka space and I have setup Strimzi cluster operator, Kafka bootstrap server, entity operator, and kafka connect in Kubernetes following the below guidelines:

Deploying and Upgrading (0.40.0)

How do I setup kafka mongo sink connector for strimzi kafka connect cluster ?

I have the official mongodb connector plugin. Can I use this plugin to connect to atlas mongodb ?

Most of the forums have explanation on confluent kafka but not strimzi kafka.

Below is my kafka connect config:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnect

metadata:

  name: my-mongo-connect

  annotations:

    strimzi.io/use-connector-resources: "true"

spec:

  image: STRIMZI KAFKA CONNECT IMAGE WITH MONGODB PLUGIN

  version: 3.2.1

  replicas: 1

  bootstrapServers:  my-cluster-kafka-bootstrap:9092

  logging:

    type: inline

    loggers:

      connect.root.logger.level: "INFO"

  config:

    group.id:  my-cluster

    offset.storage.topic: mongo-connect-cluster-offsets

    config.storage.topic: mongo-connect-cluster-configs

    status.storage.topic: mongo-connect-cluster-status

    key.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: true

    value.converter.schemas.enable: true

    config.storage.replication.factor: -1

    offset.storage.replication.factor: -1

    status.storage.replication.factor: -1

Below is my sink connector config:

apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaConnector

metadata:

  name: mongodb-sink-connector

  labels:

    strimzi.io/cluster: my-cluster

spec:

  class: com.mongodb.kafka.connect.MongoSinkConnector

  tasksMax: 2

  config:

    topics: my-topic

    connection.uri: "MONGO ATLAS CONNECTION STRING"

    database: my_database

    collection: my_collection

    post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

    key.converter: org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable: false

    value.converter: org.apache.kafka.connect.json.JsonConverter

    value.converter.schemas.enable: false

But the above setup is not working though my kafka server is up and running producer-consumer example works.

Is the official mongodb plugin (Maven Central Repository Search) appropriate for this ? or do I use debezium mongodb connector ?

If anyone can shed some light on step-by-step guideline with this regard, that would of great help.

Thanks in advance.

The mongodb connector just talks with Kafka Connect so it doesn’t matter much where Kafka itself is running. K8S is fine, you should be good to go.

By not work, is there an error message? Is both the source and sink not working? Is the MongoDB connector stopped or is it running? check the Kafka Connect Logs

There is no error message in the kafka-connect logs. How do I verify if the MongoDB connector is up and running ?

kubectl get kafkaconnectors -n kafka shows:

NAME                                CLUSTER   CONNECTOR CLASS                                             MAX  TASKS READY
mongodb-sink-connector   my-cluster   com.mongodb.kafka.connect.MongoSinkConnector   2

The ready column is empty. How do I make sure this is running?

Here is a script I use to enumerate, you may have to tweak the hostnames to your environment and network situation.

echo "\nKafka topics:\n"

curl --silent "http://localhost:8082/topics" | jq

echo "\nThe status of the connectors:\n"

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

echo "\nCurrently configured connectors\n"
curl --silent -X GET http://localhost:8083/connectors | jq

echo "\n\nVersion of MongoDB Connector for Apache Kafka installed:\n"
curl --silent http://localhost:8083/connector-plugins | jq -c '.[] | select( .class == "com.mongodb.kafka.connect.MongoSourceConnector" or .class == "com.mongodb.kafka.connect.MongoSinkConnector" )'