MongoDB Connector for Apache Kafka 1.5 Available Now

Today, MongoDB has released version 1.5 of the MongoDB Connector for Apache Kafka! This article highlights some of the key features of this new release in addition to continuing to improve the overall quality & stability of the Connector.

DeleteOne write model strategy

When messages arrive on Kafka topics, the MongoDB Sink Connector reads them and by default will upsert them into the MongoDB cluster specified in the sink configuration. However, what if you didn’t want to always upsert them? This is where write strategies come in and provide you with the flexibility to define what you want to do with the document.

While the concept of write strategies is not new to the connector, in this release there is a new write strategy available called DeleteOneBusinessKeyStrategy. This is useful for when a topic contains records identifying data that should be removed from a collection in the MongoDB sink. Consider the following:

You run an online store selling fashionable face masks. As part of your architecture, the website sends orders to a Kafka topic, “web-orders” which upon message arrival kicks off a series of actions such as sending an email confirmation, and inserting the order details into an “Orders” collection in a MongoDB cluster.

A sample Orders document:

{
  _id: ObjectId("6053684f2fe69a6ad3fed028"),
  'customer-id': 123,
  'order-id': 100,
  order: { lineitem: 1, SKU: 'FACE1', quantity: 1 }
}

This process works great, however, when a customer cancels an order, we need to have another business process to update our inventory, send the cancellation, email and remove the order from our MongoDB sink. In this scenario a cancellation message is sent to another Kafka topic, “canceled-orders”. For messages in this topic, we don’t just want to upsert this into a collection, we want to read the message from the topic and use a field within the document to identify the documents to delete in the sink. For this example, let’s use the order-id key field and define a sink connector using the DeleteOneBusinessKeyStrategy as follows:

"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
  "topics":"FaceMaskWeb.OrderCancel",
  "connection.uri":"mongodb://mdb1",  
  "database":"FaceMaskWeb",
  "collection":"Orders",
  "writemodel.strategy": 
"com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy",
  "document.id.strategy": 
"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
  "document.id.strategy.partial.value.projection.type": "AllowList",
  "document.id.strategy.partial.value.projection.list": "order-id",
  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable":false,
  "document.id.strategy.overwrite.existing": true

Now when messages arrive in the “FakeMaskWeb.OrderCancel” topic, the “order-id” field is used to delete documents in the Orders collection.

For example, using the sample document above, if we put this value into the OrderCancel topic

{ “order-id”: 100 }

It would cause the document in the Orders collection with order-id and value 100 to be deleted.

For a complete list of write model strategies check out the MongoDB Kafka Connector Sink documentation.

Qlik Replicate

Qlik Replicate is recognized as an industry leader in data replication and ingestion. With this new release of the Connector, you can now replicate and stream heterogeneous data from data sources like Oracle, MySQL, PostGres and others to MongoDB via Kafka and the Qlik Replicate CDC handler.

To configure the MongoDB Connector for Apache Kafka to consume Qlik Replicate CDC events, use “com.mongodb.kafka.connect.sink.cdc.qlik.rdbms.RdbmsHandler” as the value for the change data capture handler configuration parameter. The handler supports, insert, refresh, read, update and delete events.

Errant Record Reporting

Kafka Connect, the service which manages connectors that integrate with a Kafka deployment, has the ability to write records to a dead letter queue (DLQ) topic if those records could not be serialized or deserialized. Starting with Apache Kafka version 2.6, there was added support for error reporting within the sink connectors. This gives sink connectors the ability to send individual records to the DLQ if the connector deems the records to be invalid or problematic. For example, if you are projecting fields in the sink that do not exist in the kafka message or if your sink is expecting a JSON document and the message arrives in a different format. In these cases an error is written to the DLQ versus failing the connector.

Various Improvements

As with every release of the connector, we are constantly improving the quality and functionality. This release is no different. You’ll also see pipeline errors now showing up in the connect logs, and the sink connector can now be configured to write to the dead letter queue!

Next Steps

Download the latest MongoDB Connector for Apache Kafka 1.5 from the Confluent Hub!

Read the MongoDB Connector for Apache Kafka documentation.

Questions/Need help with the connector? Ask the Community.

Have a feature request? Provide Feedback or a file a JIRA.