Hello There Everyone!
I have a mongodb instance on which I have some databases, a Kafka cluster with plenty of data, and a KafkaConnector.
Here is the schema of the data on the kafka cluster:
( example )
{
"id2": "456",
"id3": "7u8i9o",
"obj": {
"int": {
"$numberLong": "217"
}
}
}
The issue I’m facing, is that I want my MongoDB objects to have ‘id3’ as their ‘_id’.
So far the only thing I could do that resembles this, is the Kafka documents’ keys are containing the id3, and I used the KafkaConnecto configuration below:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: konnektor-name
strimzi.io/cluster: kafka-connect
spec:
class: com.mongodb.kafka.connect.MongoSinkConnector
tasksMax: 1
config:
connector.class: "com.mongodb.kafka.connect.MongoSinkConnector"
connection.uri: mongodb://127.0.0.1:27017/
database: database_name
collection: collection_name
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: false
topics: "topic_name"
document.id.strategy: "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy"
document.id.strategy.overwrite.existing: "true"
document.id.strategy.partial.key.projection.type: "AllowList"
document.id.strategy.partial.key.projection.list: "key"
When I was uploading the documents to the Kafka cluster, I have used each of the records’ id3 as the key, with a format like this: '{ “key” : “7u8i9o” } // sample value of id3
Is there a way ( beside writing a custom strategy in java which is too time consuming atm ) to do this?
I’ve also tried aggregation pipelines, and of course you can project them, but we try to aim for a wholly automated process.
I can’t modify the schema much, as the same schema is used by elasticsearch, and when I added an ‘_id’ field to the schema, it worked great, but I can’t change the schema, and it broke the elastic connector.
Also when I tried to use the ProvidedKeyFullStrategy (?) and gave a flat value in the kafka key, the connector had a fatal error, as it couldn’t convert a literal to a Bson document.
Any help is much appreciated!