Mongo Kafka Source Connector Latency

Hello all,

We have 2 Mongo Source connectors running in separate docker containers. The first one is the “critical” one that uses a change-stream pipeline to copy data from 2 specific collections (see configuration below).
The second connector (“heavy”) is fairly identical, except that it watches those 2 collections as well as another 10, so it copies significantly more data.

We split them into two workloads following scaling recommendations, such that write loads on non critical collections would not affect the throughput of the critical ones.

However, we are noticing that during periods of average load on our Mongo cluster, the heavy connector consistently shows batter performance than the critical one. The heavy connector has a lag of approximately 5 seconds, while the critical connector has an average lag of approximately 30 seconds.

Note: We calculate lag using a custom monitor that continuously polls messages published by each connector and calculating the difference between mongoClusterTime (CDC event available) and the kafka record timestamp (when the connector publishes to kafka).

We went over this guide but cannot find any information as to why the heavy connector would do better than the critical one.

My understanding is that having linger time set to 0, the connector would publish records to Kafka as soon as they are available.

Are there any configurations that we can tweak to get better performance? Thanks in advance.

Connector

confluent-hub install --no-prompt mongodb/kafka-connect-mongodb:1.8.0

Connect configs:

linger.ms = 0
max.poll.interval.ms = 300000

batch.size = 16384
max.poll.records = 500
max.request.size = 20971520
max.in.flight.requests.per.connection = 1
max.block.ms = 9223372036854775807

Connector config:

{
  "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
  "producer.max.request.size": "20971520",
  "topic.prefix": "core-${env}",
  "connection.uri": "${MONGO_URI}",
  "database": "${DB}",
  "pipeline": "[{\"$match\":{\"operationType\":{\"$nin\":[\"rename\",\"drop\"]},\"ns.coll\": {\"$in\": [\"collection1\",\"collection2\"]}}}]",
  "change.stream.full.document": "updateLookup",
  "mongo.errors.tolerance": "none",
  "mongo.errors.log.enable": "true",
  "heartbeat.interval.ms": 3000,
  "heartbeat.topic.name": "mongo-source-${env}.heartbeats",
  "output.format.key": "schema",
  "output.format.value": "json",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "output.schema.key": "{\"type\":\"record\",\"name\":\"DocumentKey\",\"fields\":[{\"name\":\"documentKey._id\",\"type\":\"string\"}]}",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
  "offset.partition.name": "mongo-source-connector-${env}.1"
}