Docs Menu

Docs HomeMongoDB Kafka Connector

Write Model Strategies

On this page

  • Overview
  • How to Specify Write Model Strategies
  • Specify a Business Key
  • Examples
  • Update One Timestamps Strategy
  • Replace One Business Key Strategy
  • Custom Write Model Strategies
  • Sample Write Model Strategy
  • How to Install Your Strategy

This guide shows you how to change the way your sink connector writes data to MongoDB.

You can change how your connector writes data to MongoDB for use cases including the following:

  • Insert documents instead of upserting them

  • Replace documents that match a filter other than the _id field

You can configure how your connector writes data to MongoDB by specifying a write model strategy. A write model strategy is a class that defines how your sink connector should write data using write models. A write model is a MongoDB Java driver interface that defines the structure of a write operation.

To learn how to modify the sink records your connector receives before your connector writes them to MongoDB, read the guide on Sink Connector Post Processors.

To learn more about bulk writes, see the MongoDB Java driver guide on Bulk Writes.

To see a write model strategy implementation, see the source code of the InsertOneDefaultStrategy class.

To specify a write model strategy, use the following setting:

writemodel.strategy=<write model strategy classname>

For a list of the pre-built write model strategies included in the connector, see the guide on write model strategy configurations.

A business key is a value composed of one or more fields in your sink record that identifies it as unique. By default, the sink connector uses the _id field of the sink record to retrieve the business key. To specify a different business key, configure the Document Id Adder post processor to use a custom value.

You can configure the Document Id Adder to set the _id field from the sink record key as shown in the following example properties:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

Alternatively, you can configure it to set the _id field from the sink record value as shown in the following example properties:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

Important

Improve Write Performance

Create a unique index in your target collection that corresponds to the fields of your business key. This improves the performance of write operations from your sink connector. See the guide on unique indexes for more information.

The following write model strategies require a business key:

  • ReplaceOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

For more information on the Document Id Adder post processor, see Configure the Document Id Adder Post Processor.

This section shows examples of configuration and output of the following write model strategies:

You can configure the Update One Timestamps strategy to add and update timestamps when writing documents to MongoDB. This strategy performs the following actions:

  • When the connector inserts a new MongoDB document, it sets the _insertedTS and _modifiedTS fields to the current time on the connector's server.

  • When the connector updates an existing MongoDB document, it updates the _modifiedTS field to the current time on the connector's server.

Suppose you want to track the position of a train along a route, and your sink connector receives messages with the following structure:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Use the ProvidedInValueStrategy to specify that your connector should use the _id value of the message to assign the _id field in your MongoDB document. Specify your id and write model strategy properties as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

After your sink connector processes the preceding example record, it inserts a document that contains the _insertedTS and _modifiedTS fields as shown in the following document:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

After one hour, the train reports its new location along its route with a new position as shown in the following record:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Once your sink connector processes the preceding record, it inserts a document that contains the following data:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

For more information on the ProvidedInValueStrategy, see the section on how to Configure the Document Id Adder Post Processor.

You can configure the Replace One Business Key strategy to replace documents that match the value of the business key. To define a business key on multiple fields of a record and configure the connector to replace documents that contain matching business keys, perform the following tasks:

  1. Create a unique index in your collection that corresponds to your business key fields.

  2. Specify the PartialValueStrategy id strategy to identify the fields that belong to the business key in the connector configuration.

  3. Specify the ReplaceOneBusinessKeyStrategy write model strategy in the connector configuration.

Suppose you want to track airplane capacity by the flight number and airport location represented by flight_no and airport_code, respectively. An example message contains the following information:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

To implement the strategy, using flight_no and airport_code as the business key, first create a unique index on these fields in the MongoDB shell:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

Next, specify the PartialValueStrategy strategy and business key fields in the a projection list. Specify the id and write model strategy configuration as follows:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

The sample data inserted into the collection contains the following:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

When the connector processes sink data that matches the business key of the existing document, it replaces the document with the new values without changing the business key fields:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

After the connector processes the sink data, it replaces the original sample document in MongoDB with the preceding one.

If none of the write model strategies included with the connector fit your use case, you can create your own.

A write model strategy is a Java class that implements the WriteModelStrategy interface and must override the createWriteModel() method.

See the source code for the WriteModelStrategy interface for the required method signature.

The following custom write model strategy returns a write operation that replaces a MongoDB document that matches the _id field of your sink record with the value of the fullDocument field of your sink record:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

For another example of a custom write model strategy, see the UpsertAsPartOfDocumentStrategy example strategy on GitHub.

To configure your sink connector to use a custom write strategy, you must complete the following actions:

  1. Compile the custom write strategy class to a JAR file.

  2. Add the compiled JAR to the classpath/plugin path for your Kafka workers. For more information about plugin paths, see the Confluent documentation.

    Note

    Kafka Connect loads plugins in isolation. When you deploy a custom write strategy, both the connector JAR and the write model strategy JAR must be on the same path. Your paths should resemble the following:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    To learn more about Kafka Connect plugins, see this guide from Confluent.

  3. Specify your custom class in the writemodel.strategy configuration setting.

To learn how to compile a class to a JAR file, see the JAR deployment guide from the Java SE documentation.

←  FundamentalsSink Connector Post Processors →