MongoDB Connector for Apache Kafka 1.3 Available Now

Robert Walters

#Kafka

Today marks the day MongoDB ships the most significant release ever of the MongoDB Connector for Apache Kafka. Over the past few months, we’ve been busy taking your feedback and pull requests and building a Kafka connector that deeply integrates within the Kafka ecosystem. This article highlights some of the key features of the new release. All code is available from the Financial Securities demo available on Github.

JSON String Format Option

How hard can it be to make a string out of a document in MongoDB? It is easy, and all the supported drivers have this capability. The interesting part, however, comes in when you need to create a string out of a document that contains BSON types. Recall that BSON datatypes are a superset of what is defined in JSON, so datatypes such as decimal and ISODate are not defined in JSON. Prior to MongoDB Connector for Apache Kafka version 1.3, the source connector always provided a JSON string as an output in the Canonical Extended JSON format. However, this format provides type fidelity for complex types. For example, a Double would be represented as

"Double": {
"$numberDouble": "42.42"
}

In version 1.3, you can set output.json.formatter to com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson. SimplifiedJson is related to Relaxed Extended JSON and relaxes additional data types such as Dates as strings, Object ID as hex strings, and Binary values as base64 encoded strings. SimplifiedJson allows common types to be used across systems where type fidelity isn’t as important. In the above example, the Double becomes:

{
"Double": 42.42
}

In some situations, SimplifiedJson can make it easier to process MongoDB data in non-MongoDB systems.

Source Schema Support

Part of what makes Apache Kafka really great at scale and performance is the fact that to Kafka, all messages in Kafka topics are just binary data. Kafka does not parse or optimize the data; it simply offers the ability to easily publish as well as subscribe to streams of records, whatever data those records contain.

So where is the order among the chaos of random binary data in a message? This is where serializers come into play. Serialization is the process of converting an object into a stream of bytes for the purpose of network transmission. For example, the producer of a message who has a JSON object he or she wants to put in a Kafka topic would pass this object through a serializer that converts the object into a byte array. The byte array would be written to the Kafka topic, and a consumer reading that message would pass it through a deserializer to convert the byte array back into a desired format such as a JSON object, as shown in our example below.

Kafka Serializer

Apache Kafka includes serializers such as StringConverter that you can use out of the box, and you can write your own serializer and deserializer as well.

Now that producers and consumers can pass data through Kafka, we’re done, right? Well, not exactly. Whenever we move data between producers and consumers, we should establish a contract that describes answers to questions such as what kind of fields the data contains, which fields are optional and which are mandatory, and so forth. If we lacked this definition, it would be inefficient for producers and consumers to pass data, because there would need to be a lot of extra code to perform things such as data validation and integrity checks. This is where schemas come into play. Schemas define the contract that states how the data should be structured.

The MongoDB Connector for Apache Kafka source connector supports defining a schema for the data that comes from MongoDB. The schema is defined via Apache Avro. Why Avro and not JSON Schema? Because Avro is a data serialization system that relies on schema, and when defining a schema, Avro is much more succinct, compared to JSON Schema. What would take you a page of typing in JSON Schema takes just a few lines in Avro. This does not mean you have to use Avro as the converter to the Kafka topic, however. For this, you are free to use any converter you choose, including JSON.

Assume that you would like to define a schema for a financial stock security that has this document structure:

{
"_id" : ObjectId("5f466d42779904e2c633f3bf"),
"company_symbol" : "FVF",
"company_name" : "FLAT VODKA FOODS",
"price" : 10.22,
"tx_time" : "2020-08-26T14:10:10Z"
}

Again, schemas in the MongoDB Connector for Apache Kafka are defined using Avro. Below is the Avro schema for the above document structure:

{
"name": "MongoExchangeSchema",
"type": "record",
"namespace": "com.mongoexchange.avro",
"fields": [
{
"name": "_id",
"type": "string"
},
{
"name": "company_symbol",
"type": "string"
},
{
"name": "company_name",
"type": "string"
},
{
"name": "price",
"type": "float"
},
{
"name": "tx_time",
"type": "string"
}
]
}

The parameter for defining this schema is output.schema.value. To tell the connector that you want to output a schema instead of a JSON string (which was the default in previous versions of the connector), you change the output.format.value parameter:

"Output.format.value":"schema"

You can also define a schema for keys by using output.schema.key. See the section Write Data to Specific Partitions later in this article for deeper discussion on schemas in keys.

By defining a schema as an output to the source connector, you can now leverage the Single Message Transforms (SMTs) API and even the schema registry.

Auto-Generated Schema

There may be a scenario where you do not know the exact schema that is coming from MongoDB. After all, MongoDB by design is schemaless. In this case, you can tell the connector to infer a schema based on the document itself by using the following configuration:

Output.schema.infer.value : true

Keep in mind with this configuration that a schema is inferred for each document output by the change stream, so each document could have its own schema definition. You can see this behavior via the following demo.

First let’s define the connector to be a source on the Customers collection in the Stocks database:

curl -X POST -H "Content-Type: application/json" --data '
{"name": "mongo-source-auto",
"config": {
"tasks.max":"1",

"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",

"output.json.formatter":"com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
	"output.format.value":"schema",
	"output.schema.infer.value":true,
	"output.format.key":"json",

"key.converter":"org.apache.kafka.connect.storage.StringConverter",
	"value.converter":"io.confluent.connect.avro.AvroConverter",

"value.converter.schema.registry.url":"http://schema-registry:8081",
	"publish.full.document.only": true,

"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
	"topic.prefix":"schematest",
	"database":"Stocks",
	"Collection":"customers"
}}' http://localhost:8083/connectors -w "\n"

Note that we are outputting the JSON as SimplifiedJson, and the output schema of the value is set to infer the schema.

When we issue an insert db.customers.insert({"first_name":"John","last_name":"Doe", "age":51})

we can see the schema for our new data was automatically created for us in the Schema Registry:

curl --silent -X GET http://localhost:8081/subjects/schematest.Stocks.customers-value/versions/1 | jq

{
"subject": "schematest.Stocks.customers-value",
"version": 1,
"id": 5,
"schema": "{\"type\":\"record\",\"name\":\"inferred_name__1135654456\",\"fields\":[{\"name\":\"_id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}],\"connect.name\":\"inferred_name__1135654456\"}"
}

We can insert additional documents, and they will use this schema: db.customers.insert({"first_name":"Mary","last_name":"Jane", "age":52})

Now let’s add another document, this time with a new field: db.customers.insert({"first_name":"Rob","last_name":"Walters", "age":21,"last_logged_in":Date()})

Now if we run the command to view the versions of our schema in the Schema Registry, you will see we have another version of our schema registered.

curl -X GET http://localhost:8081/subjects/schematest.Stocks.customers-value/versions

[1,2]%

By default, inserting a new document that has a different schema, even if it’s just an added field, will create a new schema in the Schema Registry. This behavior is defined by the compatibility setting within the Schema Registry.

You can tell what compatibility level your Schema Registry is set to by querying the /config API as follows:

curl -X GET http://localhost:8081/config

{"compatibilityLevel":"BACKWARD"}

The backward compatibility level means that data using older schemas will still be readable by consumers using the latest schemas. Note that documents that have the same fields but are not in the same order within the document are not considered unique documents: they will effectively use the same schema. For more information regarding this Kafka compatibility setting, see the online documentation.

For a complete demo of source schema support in the connector, check out the Financial Securities source-schema-demo.

Error Handling

Certain error conditions, such as a missing change stream resume token, could cause the connector to fail. In MongoDB Connector for Apache Kafka version 1.3, the source connector has added support for the following error configuration settings:

errors.tolerance — The default option is to fail the connector for any error. Setting this option to “all” will cause the connector to skip over errors as they occur, allowing your connector to keep processing messages.

errors.log.enable — When set to "true", details of failed operations will be written to the Kafka Connect application log.

errors.deadletterqueue.topic.name — When an error has occurred, the message that resulted in the error is copied to this dead letter queue.

Dealing with Infrequently Updated Sources

In some scenarios, you are sourcing from a collection where there may be no updates for a long period of time — long enough that the resume token gets purged off the oplog, causing an error upon the next source-collection update. In MongoDB Connector for Apache Kafka version 1.3, you can set up the connector to store a postback resume token in a separate Kafka topic. This topic is called the heartbeat topic, and you can define it by setting the heartbeat.topic.name parameter and specifying the heartbeat interval via heartbeat.interval.ms.

Copy Existing Enhancements

When the connector is first configured, the data that appears in the source collection is processed as it is written to the collection. What happens if data is already in the source collection? Prior to version 1.3, you could set the copy.existing parameter on the source, and this would copy the existing data in the source collection, convert that data to Change Stream events, and process them as if it were happening in real time. Effectively, this allowed you to copy the existing data in the source collection first and then continue processing the data from the source collection as it was written. One challenge with this approach is if your source collection is very large (e.g., millions of documents), you may not want to copy all of these documents as part of your use case. In MongoDB Connector for Apache Kafka version 1.3, you have two additional options for fine-tuning what data to copy from the source: copy.existing.namepsace.regex and copy.existing.pipeline.

To filter which namespaces to include, optionally specify a regex string in copy.existing.namespace.regex. To filter the data within the namespace, optionally specify an aggregation pipeline to filter on by setting the copy.existing.pipeline parameter. For example, this setting will initially copy all documents from the Sales.Customers collection where the active field within the document is set to true:

copy.existing.namespace.regex : “Sales\.Customers”

copy.existing.pipeline : [{“$match” : { “active”: “true” } }]

Write Data to Specific Partitions

In Kafka, topics have one or more partitions. Part of the power of Kafka is that these partitions can be spread across brokers, providing the best scalable solution. Kafka determines which partition to write data to via a hash of the key. If no key is present, Kafka will write the message to partitions using the round-robin technique. Since each partition is its own order commit log, data within a partition is considered ordered; however, data within the topic itself isn’t ordered. Prior to version 1.3, the MongoDB source provided a key with the value of the _id of the document. This made it difficult to craft a key that would force Kafka to write to a specific partition consistently. In version 1.3, you can specify a custom document field to use as the key, enabling you to group together messages in partitions.

To do this, you need to configure the source connector with the output.schema.key defined as:

{
"name”:”AnimalType”,
"type":"record",
"namespace":"com.mongoexchange.avro",
"fields": [ {"name": "fullDocument",
	"type": {"name": "fullDocument","type": "record",
		"fields": [ { "name":"animal_type","type": "string"} ]
		}
	}]
}

In this example, we have documents in MongoDB that have a field called “animal_type” that we will use as the key to our Kafka message.

As we write data into MongoDB

db.pets.insert({"animal_type" : "dog", "name" : "susan" })
db.pets.insert({"animal_type" : "dog", "name" : "roscoe" })
db.pets.insert({"animal_type" : "dog", "name" : "zeus" })
db.pets.insert({"animal_type" : "dog", "name" : "lady" })
db.pets.insert({"animal_type" : "iguana", "name" : "roger" })
db.pets.insert({"animal_type" : "iguana", "name" : "george" })
db.pets.insert({"animal_type" : "elephant", "name" : "JJ" })
db.pets.insert({"animal_type" : "elephant", "name" : "Betty" })

we can see from the topic output that our animal_type keys are always grouped together.

Key :  dog
Value :  {"_id": "5f4a539ef8bcbd07d8023a68", "animal_type": "dog", "name": "susan"}
Partition: 1  Offset: 0

Key :  dog
Value :  {"_id": "5f4a539ef8bcbd07d8023a69", "animal_type": "dog", "name": "roscoe"}
Partition: 1  Offset: 1

Key :  dog
Value :  {"_id": "5f4a539ef8bcbd07d8023a6a", "animal_type": "dog", "name": "zeus"}
Partition: 1  Offset: 2

Key :  dog
Value :  {"_id": "5f4a539ef8bcbd07d8023a6b", "animal_type": "dog", "name": "lady"}
Partition: 1  Offset: 3

Key :  iguana
Value :  {"_id": "5f4a539ef8bcbd07d8023a6c", "animal_type": "iguana", "name": "roger"}
Partition: 1  Offset: 4

Key : iguana
Value :  {"_id": "5f4a539ef8bcbd07d8023a6d", "animal_type": "iguana", "name": "george"}
Partition: 1  Offset: 5

Key :  elephant
Value :  {"_id": "5f4a539ef8bcbd07d8023a6e", "animal_type": "elephant", "name": "JJ"}
Partition: 0  Offset: 0

Key :  elephant
Value :  {"_id": "5f4a539ff8bcbd07d8023a6f", "animal_type": "elephant", "name": "Betty"}
Partition: 0  Offset: 1

When we add another dog to the list, it is written to partition 1, which is where all the other dogs are kept.

Key :  dog
Value :  {"_id": "5f4a53cdf8bcbd07d8023a70", "animal_type": "dog", "name": "Harriet"}
Partition: 1  Offset: 6

The complete code for this example is located in the Kafka 1.3 Demo GitHub repository.

Restarting the Connector Without Resuming

The Kafka Connect service helps source connectors out by storing an offset value. This is useful in case there is an issue with the source connector and it has to restart, so it can pick up where it left off. This offset value is defined by and specific to the source itself. The MongoDB connector stores the change stream resume token in this offset. There are some scenarios where you may want to restart the connector and not resume from where you left off. In this case, you are effectively starting the connector with no resume token. Prior to version 1.3, you could accomplish this by manually removing the old offset, which depending on your Kafka Connect configuration could be the offset storage file or the offset storage topic. In version 1.3, you can now set the offset.partition.name parameter, which creates a new partition name to store the offset values. This makes it easier to restart the connector without reconfiguring the Kafka Connect service or deleting and re-creating the MongoDB connector.

Summary

MongoDB Connector for Apache Kafka version 1.3 is a significant step in the journey of integrating MongoDB data within the Kafka ecosystem. It addresses many pain points experienced by early adopters of the connector such as the lack of message output formats and the need for more robust error handling. Also, the connector now supports defining schemas on MongoDB data, which allows you to integrate easily with the Kafka ecosystem, including the Confluent Schema Registry. This integration makes it easy to move data into and out of Kafka all while leveraging the unique capabilities of both platforms.

The MongoDB Connector for Apache Kafka is open source and available here https://github.com/mongodb/mongo-kafka. Members of the community have contributed to its evolution. Add your own pull request!

Have any feature requests? Share your ideas.

For any questions about the MongoDB Connector for Apache Kafka, check out the MongoDB Community website for Connectors and Integrations.