MongoDB Connector for Apache Kafka 1.7 Available Now

Robert Walters


Today, MongoDB has released version 1.7 of the MongoDB Connector for Apache Kafka! This article highlights some of the key features of this new release!

MongoDB errors to the Dead Letter Queue

Apache Kafka version 2.6 added support for handling errant records. The MongoDB Kafka Connector for Apache Kafka automatically sends messages that it cannot process to the dead letter queue. This includes messages that fail during conversion but up until this release did not include errors that were generated within MongoDB. For example, consider the scenario where we have a topic, “Sales.OrderStaging” This topic includes messages that contain an ‘order-id’ field. The application needs to insert a new document into MongoDB and use that order-id as the primary key or ‘_id’ of the document. If there happens to be a duplicate order-id entered on the kafka topic, the kafka message should be routed to a dead letter queue topic and the mongodb connector should continue to process other orders.

the following sink configuration highlights the configuration parameters that support this scenario:



"": "AllowList",
"": "order-id"

For example consider a kafka message with an order-id=5 and another message with the same order-id of 5. The sink connector will try to insert that second message with the same _id and there will be a MongoDB Error generated as expected. The kafka topic message which caused the error will be written to the orders.deadletterqueue topic. Once on the dead letter queue, you can inspect the errant records, update them, and resubmit them for processing. Setting the errors.deadletterqueue.context.headers.enable to true will add metadata to the DLQ message. This extra information may help with any automatic processing of errors in the queue. In addition to the DLQ, you can set errors.log.enable and error.log.include.messages configuration to write errors in the kafka connect log. Here is an example error from our scenario above:

com.mongodb.kafka.connect.sink.dlq.WriteException: v=1, code=11000, message=E11000 duplicate key error collection: Sales.Orders index: id dup key: { _id: { order-id: 5 } }, details={}

Bulk write improvements

Today the connector sink process works with a bulk insert in an ordered fashion. For example, consider these 10 documents in a bulk operation:


If document number 5 failed, perhaps due to a duplicate _id error, the MongoDB driver would return this error back to the connector and the rest of the documents would not be written to MongoDB e.g. only [1,2,3,4] is written in MongoDB in the above example. While this might be acceptable for some use cases, for other scenarios with large batch sizes this can make reprocessing messages cumbersome.

In Kafka 1.7, we introduced a new parameter bulk.write.ordered that by default is set to true which is the behavior as it exists today with the Kafka connector. Setting to false and running the above scenario will result in an end state of [1,2,3,4,6,7,8,9,10] written to MongoDB with 5 being written to the topic defined in the dead letter queue. Note that the actual order of the documents may be different since we specified false to bulk.write.ordered.

For more information on error handling including information on the format of the DLQ headers check out the MongoDB Kafka Connector documentation. To setup a dead letter queue, check out the Creating a dead letter queue section within the Confluent Kafka Connect documentation.

Changed retry logic

Currently the Kafka Connector manages retries of the writes to MongoDB using the max.num.retries and retries.defer.timeout configuration properties. This feature was originally intended to address challenges such as network connection issues. Since that time the MongoDB drivers have implemented native capabilities that handle retry logic: The Kafka Connector uses the MongoDB Java driver and has retries enabled by default so there are no changes or extra configuration you need to do to enable retry in the Kafka Connector. Note: If you set retryWrites to false in the connection.uri configuration property, then retries are disabled for the sink connector. If you would like to leverage the drivers native retry capability, simply remove the “retryWrites”' parameter from the connection.uri.

Allow disk use when copying

The copy.existing.allow.disk.use configuration copies existing data from the source. It uses an aggregation pipeline that filters change stream events coming from the MongoDB source. In certain situations this pipeline can use up large amounts of memory. This flag enabled by default allows the copy existing aggregation to use temporary disk storage if required by the query. Note the default is true, but set to false if the process running MongoDB doesn't have the permissions for disk access. For more information see the ‘allowDiskuse’ option in the aggregate() documentation.