I’m trying to apply different configurations on a MongoSourceConnector
to have data on a kafka topic in json format, but with some transormations (replaceField, time conversion): seems that the only way to have messages as json is with output.format.value = json
and value.converter = org.apache.kafka.connect.storage.StringConverter
, but in this way I’m not able to apply transformations (SMT), with error: ‘transformation supports only Struct objects’.
Is there a way to not use schema, having json format as output, and use SMT ?
Kafka SMTs require a schema to be defined on the source. Thus, you’ll need to define one
Thanks for your prompt support! Do you mean that I need to specify output.format.value: schema
to use SMT or I’m losting somewhere? Is necessary that any message on topic include schema
and payload
subobjects?
My goal is to have a message on json format with standard changeStream fields (_id, operationType, fullDocument,…) but with SMTs: can you help to understand if is this possible or not?
if its just replacing a field is all you need SMTs for, why not just use the pipeline ? https://www.mongodb.com/docs/kafka-connector/current/source-connector/usage-examples/custom-pipeline/#customize-a-pipeline-to-filter-change-events.
then you dont need to define the schema or use SMTs at all and it will be the best performance. (SMTs can be slow)
Sorry to coming back here after a time: for field replacing pipeline is enough, but what if I need something more? For example it could be helpful to move some values on header, what is the suggested approach?
You can write Custom SMT’s
and have your Json parsed if you dont want to give schema.