Hi everyone,
I am trying to set up a CDC replication pipeline with the Debezium and Kafka Sink Connector but I’am having problems with Update Operations.
In on hand, I have a MongoDB source database configured as a single node replica set. Connected to the source DB, I have the Debezium source connector that is streaming all CDC events to a Kafka Topic.
On the other hand, I have a MongoDb acting as a sink database. The sink databased is feed by the MongoDb Sink Connector with the Debezium MongoDB CDC Handler.
The source data is properly replicated into the sink only in insertion and deletion operations. If I try to update a document in the source collection, the sink connector will raise the following exception for this CDC event:
DEBEZIUM CDC UPDATE EVENT
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "before"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "filter"
},
{
"type": "struct",
"fields": [
{
"type": "array",
"items": {
"type": "string",
"optional": false
},
"optional": true,
"field": "removedFields"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "updatedFields"
},
{
"type": "array",
"items": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "field"
},
{
"type": "int32",
"optional": false,
"field": "size"
}
],
"optional": false,
"name": "io.debezium.connector.mongodb.changestream.truncatedarray",
"version": 1
},
"optional": true,
"field": "truncatedArrays"
}
],
"optional": true,
"name": "io.debezium.connector.mongodb.changestream.updatedescription",
"version": 1,
"field": "updateDescription"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "string",
"optional": true,
"field": "lsid"
},
{
"type": "int64",
"optional": true,
"field": "txnNumber"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source",
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "src.metrics.customers.Envelope"
},
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1001\"},\"first_name\": \"Sallyddf\",\"last_name\": \"Thomas\",\"email\": \"sally.thomas@acme.com\"}",
"patch": null,
"filter": null,
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Sallyddf\"}",
"truncatedArrays": null
},
"source": {
"version": "2.0.0.Final",
"connector": "mongodb",
"name": "src",
"ts_ms": 1669244642000,
"snapshot": "false",
"db": "metrics",
"sequence": null,
"rs": "rs0",
"collection": "customers",
"ord": 2,
"lsid": null,
"txnNumber": null
},
"op": "u",
"ts_ms": 1669244642381,
"transaction": null
}
}
Sink Connector Exception:
ERROR Unable to process record SinkRecord{kafkaOffset=4, timestampType=CreateTime} ConnectRecord{topic='src.metrics.customers', kafkaPartition=0, key={id=1001}, keySchema=null, value=Struct{after={"_id": {"$numberLong": "1001"},"first_name": "Sallyddf","last_name": "Thomas","email": "sally.thomas@acme.com"},updateDescription=Struct{updatedFields={"first_name": "Sallyddf"}},source=Struct{version=2.0.0.Final,connector=mongodb,name=src,ts_ms=1669244642000,snapshot=false,db=metrics,rs=rs0,collection=customers,ord=2},op=u,ts_ms=1669244642381}, valueSchema=Schema{src.metrics.customers.Envelope:STRUCT}, timestamp=1669244642856, headers=ConnectHeaders(headers=)} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
metrics-sink-connect | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect | ... 22 more
metrics-sink-connect | [2022-11-23 23:04:02,876] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value expected to be of type STRING is of unexpected type NULL (org.apache.kafka.connect.runtime.WorkerSinkTask)
metrics-sink-connect | org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect | ... 22 more
metrics-sink-connect | [2022-11-23 23:04:02,878] ERROR WorkerSinkTask{id=metrics-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
metrics-sink-connect | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
metrics-sink-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
metrics-sink-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
metrics-sink-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
metrics-sink-connect | at java.base/java.lang.Thread.run(Thread.java:829)
metrics-sink-connect | Caused by: org.apache.kafka.connect.errors.DataException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:69)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:82)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$3(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at java.base/java.util.Optional.flatMap(Optional.java:294)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModelCDC$4(MongoProcessedSinkRecordData.java:99)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModelCDC(MongoProcessedSinkRecordData.java:98)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:101)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
metrics-sink-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
metrics-sink-connect | ... 10 more
metrics-sink-connect | Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type STRING is of unexpected type NULL
metrics-sink-connect | at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
metrics-sink-connect | at org.bson.BsonValue.asString(BsonValue.java:69)
metrics-sink-connect | at org.bson.BsonDocument.getString(BsonDocument.java:252)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.handleOplogEvent(MongoDbUpdate.java:80)
metrics-sink-connect | at com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbUpdate.perform(MongoDbUpdate.java:61)
metrics-sink-connect | ... 22 more
I followed all the examples and documentation from Debezium and MondoDb Sink Connector and I still have no clue why this is happening.
Please find below the dockerfiles and my configurations:
Debezium Sink Connector Dockefile
FROM quay.io/debezium/connect:2.0
ENV KAFKA_CONNECT_MONGODB_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-mongodb
USER root
RUN microdnf -y install git maven java-11-openjdk-devel && microdnf clean all
USER kafka
# Deploy MongoDB Sink Connector
RUN mkdir -p $KAFKA_CONNECT_MONGODB_DIR && cd $KAFKA_CONNECT_MONGODB_DIR && \
git clone https://github.com/hpgrahsl/kafka-connect-mongodb.git && \
cd kafka-connect-mongodb && \
git fetch --tags && \
git checkout tags/v1.2.0 && \
mvn clean package -DskipTests=true -DskipITs=true && \
mv target/kafka-connect-mongodb/kafka-connect-mongodb-1.2.0-jar-with-dependencies.jar $KAFKA_CONNECT_MONGODB_DIR && \
cd .. && rm -rf $KAFKA_CONNECT_MONGODB_DIR/kafka-connect-mongodb
Please find below the dockerfiles and my configurations:
MongoDB Kafka Sink Connector Dockerfile
FROM confluentinc/cp-kafka-connect:7.2.2
RUN confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
Debezium Source Connector Configuration
{
"name": "metrics",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.name": "metrics-src",
"mongodb.user": "admin",
"mongodb.password": "admin",
"mongodb.authsource": "admin",
"mongodb.hosts": "rs0/metrics-src:27017",
"topic.prefix": "src",
"database.include.list": "metrics"
}
}
MongoDb Sink Configuration with CDC Handler
{
"name": "metrics",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.MongoDbHandler",
"connection.uri": "mongodb://metrics-sink:27017/metrics",
"database": "metrics",
"collection": "metrics",
"topics": "src.metrics.customers"
}
}
Docker Compose File
version: '3.4'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
restart: always
networks:
- sync-network
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOO_4LW_COMMANDS_WHITELIST: "*"
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=ruok"
healthcheck:
test: nc -z localhost 2181 || exit -1
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
extra_hosts:
- "moby:127.0.0.1"
broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
restart: always
networks:
- sync-network
ports:
- "9092:9092"
- "39092:39092"
depends_on:
zookeeper:
condition: service_healthy
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://broker:19092,EXTERNAL_LISTENER://0.0.0.0:39092
KAFKA_ADVERTISED_LISTENERS: DOCKER_LISTENER://broker:9092,HOST_LISTENER://localhost:19092,EXTERNAL_LISTENER://150.230.85.73:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_LISTENER:PLAINTEXT,HOST_LISTENER:PLAINTEXT,EXTERNAL_LISTENER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_LISTENER
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
extra_hosts:
- "moby:127.0.0.1"
healthcheck:
test: echo "ruok" | timeout 2 nc -w 2 zookeeper 2181 | grep imok
interval: 10s
timeout: 5s
retries: 3
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
# network_mode: host
ports:
- 9000:9000
networks:
- sync-network
depends_on:
broker:
condition: service_healthy
environment:
KAFKA_BROKERCONNECT: broker:9092
metrics-src:
image: mongo:5.0.5
hostname: metrics-src
restart: always
container_name: metrics-src
ports:
- 27040:27017
networks:
- sync-network
environment:
MONGO_INITDB_DATABASE: metrics
volumes:
- ./scripts:/scripts
healthcheck:
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
interval: 10s
start_period: 30s
command: --replSet rs0 --bind_ip_all
metrics-sink:
image: mongo:5.0.5
hostname: metrics-sink
restart: always
container_name: metrics-sink
ports:
- 27020:27017
networks:
- sync-network
environment:
MONGO_INITDB_DATABASE: metrics
volumes:
- ./scripts:/scripts
healthcheck:
test: test $$(echo "rs.initiate().ok || rs.status().ok" | mongo -u admin -p admin --quiet) -eq 1
interval: 10s
start_period: 30s
command: --replSet rs0 --bind_ip_all
metrics-src-connect:
image: quay.io/debezium/connect:2.0
container_name: metrics-connect
ports:
- 8083:8083
links:
- broker
- metrics-src
networks:
- sync-network
volumes:
- kafka-src-config:/kafka/config
environment:
- BOOTSTRAP_SERVERS=broker:9092
- REST_HOST_NAME=0.0.0.0
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=metrics_src_connect_configs
- OFFSET_STORAGE_TOPIC=metrics_src_connect_offsets
- STATUS_STORAGE_TOPIC=metrics_src_connect_status
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
# container with mongo kafka plugins
metrics-sink-connect:
image: confluentinc/cp-kafka-connect-base:7.2.2
build:
context: ./mongodb-kafka-connect
ports:
- "8084:8083"
hostname: metrics-sink-connect
container_name: metrics-sink-connect
depends_on:
- zookeeper
- broker
networks:
- sync-network
volumes:
- kafka-sink-config:/kafka/config
environment:
KAFKA_JMX_PORT: 35000
KAFKA_JMX_HOSTNAME: localhost
CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: metrics-sink-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster-group
CONNECT_CONFIG_STORAGE_TOPIC: metrics_sink_connect_configs
CONNECT_OFFSET_STORAGE_TOPIC: metrics_sink_connect_offsets
CONNECT_STATUS_STORAGE_TOPIC: metrics_sink_connect_status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_METADATA_MAX_AGE_MS: 180000
CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
networks:
sync-network:
driver: bridge
volumes:
kafka-sink-config:
driver: local
driver_opts:
type: none
o: bind
device: ./kafka/sink-config
kafka-src-config:
driver: local
driver_opts:
type: none
o: bind
device: ./kafka/src-config
Could someone help figure what I could be possibly missing in the configuration?
Best regards,
Paulo
However I try to highlight a few things that caught my eye and point you to some potential workarounds: