Overview
On this page, you can learn how to configure post processors in your MongoDB Kafka sink connector. Post processors modify sink records that the connector reads from a Kafka topic before the connector stores it in your MongoDB collection. A few examples of data modifications post processors can make include:
Set the document
_idfield to a custom valueInclude or exclude message key or value fields
Rename fields
You can use the prebuilt post processors included in the connector or implement your own.
See the following sections for more information on post processors:
How Post Processors Modify Data
Post processors modify data read from a Kafka topic. The connector stores the message in a SinkDocument class which contains a representation of the Kafka SinkRecord key and value fields. The connector sequentially applies any post processors specified in the configuration and stores the result in a MongoDB collection.
Post processors perform data modification tasks such as generating the document _id field, projecting message key or value fields, and renaming fields. You can use the prebuilt post processors included in the connector, or you can implement your own by extending the PostProcessor class.
Important
Post Processors and Change Data Capture (CDC) Handlers
You cannot apply a post processor to CDC handler event data. If you specify both, the connector logs a warning.
How to Specify Post Processors
You can specify one or more post processors in the post.processor.chain configuration setting as a comma-separated list. If you specify more than one, the connector applies them sequentially in which each post processor modifies the data output by the prior one.
To ensure the documents the connector writes to MongoDB contain unique _id fields, it automatically adds the DocumentIdAdder post processor in the first position of the chain if you do not otherwise include it.
The following example setting specifies that the connector should run the KafkaMetaAdder post processor first and then the AllowListValueProjector post processor on the output.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Prebuilt Post Processors
The following table contains a list of all the post processors included in the sink connector.
Post Processor Name | Description | |
|---|---|---|
DocumentIdAdder | Full Path: Inserts an For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section. | |
BlockListKeyProjector | Full Path: Removes matching key fields from the sink record. | |
BlockListValueProjector | Full Path: Removes matching value fields from the sink record. | |
AllowListKeyProjector | Full Path: Includes only matching key fields from the sink record. | |
AllowListValueProjector | Full Path: Includes only matching value fields from the sink record. | |
KafkaMetaAdder | Full Path: Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document. | |
RenameByMapping | Full Path: Renames fields that are an exact match to a specified field name in
the key or value document. | |
RenameByRegex | Full Path: Renames fields that match a regular expression in the key or
value document. | |
NullFieldValueRemover | Full Path: Removes all document fields that contain |
Configure the Document Id Adder Post Processor
The DocumentIdAdder post processor uses a strategy to determine how it should format the _id field in the MongoDB document. A strategy defines preset behavior that you can customize for your use case.
You can specify a strategy for this post processor in the document.id.strategy setting as shown in the following example:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
The following table shows a list of the strategies you can use to configure the DocumentIdAdder post processor:
Strategy Name | Description | |
|---|---|---|
BsonOidStrategy | Full Path: Generates a MongoDB BSON ObjectId. | |
KafkaMetaDataStrategy | Full Path: Builds a string composed of the concatenation of Kafka topic, partition, and offset. | |
FullKeyStrategy | Full Path: Uses the complete key structure of the sink document to generate the
value for the | |
ProvidedInKeyStrategy | Full Path: Uses the | |
ProvidedInValueStrategy | Full Path: Uses the | |
PartialKeyStrategy | Full Path: Uses a block list or allow list projection of the sink document key
structure. | |
PartialValueStrategy | Full Path: Uses a block list or allow list projection of the sink document
value structure. | |
UuidProvidedInKeyStrategy | Full Path: Converts the | |
UuidProvidedInValueStrategy | Full Path: Converts the | |
UuidStrategy | Full Path: Uses a randomly generated UUID in string format. |
Create a Custom Document Id Strategy
If the built-in document id adder strategies do not cover your use case, you can define a custom document id strategy by following the steps below:
Create a Java class that implements the interface IdStrategy and contains your custom configuration logic.
Compile the class to a JAR file.
Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation.
Update the
document.id.strategysetting to the full class name of your custom class in all your Kafka workers.
Note
Selected strategy may have implications on delivery semantics
BSON ObjectId or UUID strategies can only guarantee at-least-once delivery since the connector generates new ids on retries or when processing records again. Other strategies permit exactly-once delivery if you can guarantee the fields that form the document id are unique.
For example implementations of the IdStrategy interface, see the source code directory that contains id strategy implementations packaged with the connector.
Post Processor Examples
This section shows examples of configuration and sample output of the following types of post processors:
Allow List and Block List Examples
The allow list and block list projector post processors determine which fields to include and exclude from the output.
When you use the allow list projector, the post processor only outputs data from the fields that you specify.
When you use the block list projector, the post process only omits data from the fields that you specify.
Note
You can use the "." (dot) notation to reference nested fields in the record. You can also use the notation to reference fields of documents in an array.
When you add a projector to your post processor chain, you must specify the projector type and whether to apply it to the key or value portion of the sink document.
See the following sections for example projector configurations and output.
Allow List Projector Example
Suppose your Kafka record value documents resembled the following user profile data:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
You can configure the AllowList value projector to store select data such as the "name", "address.city", and "hobbies" fields from your value documents using the following settings:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
After the post processor applies the projection, it outputs the following record:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Block List Projector Example
Suppose your Kafka record key documents resembled the following user identification data:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
You can configure the BlockList key projector to omit the "authToken" and "registration.source" fields before storing the data with the following settings:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
After the post processor applies the projection, it outputs the following record:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Projection Wildcard Pattern Matching Examples
This section shows how you can configure the projector post processors to match wildcard patterns to match field names.
Pattern | Description |
| Matches any number of characters in the current level. |
| Matches any characters in the current level and all nested levels. |
For the allow list and block list wildcard pattern matching examples in this section, refer to the following value document that contains weather measurements:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Allow List Wildcard Examples
You can use the * wildcard to match multiple field names. The following example configuration matches the following fields:
The top-level field named "city"
The fields named "average" that are subdocuments of any top-level field that starts with the name "wind_speed".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
After the post processor applies the allow list projection, it outputs the following record:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
You can use the ** wildcard which matches objects at any level starting from the one at which you specify the wildcard. The following wildcard matching example projects any document that contains the field named "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
The post processor that applies the projection outputs the following record:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Block List Wildcard Example
You can use the wildcard patterns to match fields at a specific document level as shown in the following block list configuration example:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Field Renaming Examples
This section shows how you can configure the RenameByMapping and RenameByRegex field renamer post processors to update field names in a sink record. The field renaming settings specify the following:
Whether to update the key or value document in the record
The field names to update
The new field names
You must specify RenameByMapping and RenameByRegex settings in a JSON array. You can specify nested fields by using either dot notation or pattern matching.
The field renamer post processor examples use the following example sink record:
Key Document
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Value Document
{ "flapjacks": { "purchased": 598, "size": "large" } }
Rename by Mapping Example
The RenameByMapping post processor setting specifies one or more JSON objects that assign fields matching a string to a new name. Each object contains the text to match in the oldName element and the replacement text in the newName element as described in the table below.
Key Name | Description |
|---|---|
oldName | Specifies whether to match fields in the key or value document and the field name to replace. The setting uses a "." character to separate the two values. |
newName | Specifies the replacement field name for all matches of the field. |
The following example property matches the "location" field of a key document and renames it to "country":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
This setting instructs the RenameByMapping post processor to transform the original key document to the following document:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
You can perform a similar field name assignment for value documents by specifying the value document with the appended field name in the oldName field as follows:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
This setting instructs the RenameByMapping post processor to transform the original value document to the following document:
{ "crepes": { "purchased": 598, "size": "large" } }
You can also specify one or more mappings in the field.renamer.mapping property by using a JSON array in string format as shown in the following setting:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Rename by Regular Expression
The RenameByRegex post processor setting specifies the field names and text patterns that it should match, and replacement values for the matched text. You can specify one or more renaming expressions in JSON objects containing the fields described in the following table:
Key Name | Description |
|---|---|
regexp | Contains a regular expression that matches fields to perform the replacement. |
pattern | Contains a regular expression that matches on the text to replace. |
replace | Contains the replacement text for all matches of the regular expression you defined in the |
The following example setting instructs the post processor to perform the following:
Match any field names in the key document that start with "date". In the set of matching fields, replace all text that matches the pattern
_with the-character.Match any field names in the value document that are subdocuments of
crepes. In the set of matching fields, replace all text that matches the patternpurchasedwithquantity.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
When the connector applies the post processor to the example key document and the example value document, it outputs the following:
Key Document
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Value Document
{ "crepes": { "quantity": 598, "size": "large" } }
Warning
The renamer post processors do not overwrite existing field names
The target field names you set in your renamer post processors to may result in duplicate field names in the same document. To avoid this, the post processor skips renaming when it would duplicate an existing field name at the same level of the document.
How to Create a Custom Post Processor
If the built-in post processors do not cover your use case, you can create a custom post processor class using the following steps:
Create a Java class that extends the PostProcessor abstract class.
Override the
process()method in your class. You can update theSinkDocument, a BSON representation of the sink record key and value fields, and access the original KafkaSinkRecordin your method.Compile the class to a JAR file.
Add the compiled JAR to the class path / plugin path for all your Kafka workers. For more information about plugin paths, see the Confluent documentation on Manually Installing Community Connectors.
Add your post processor full class name to the post processor chain configuration.
For example post processors, you can browse the source code for the built-in post processor classes.