First of all my data in document in mongo have one field that array object but some fields in each object can be null such as field “a” in object at index 0 in array has date value but field “a” in object at index 1 in array has null value like this
arrayObject : [
{
"a" : 2021-01-10T00:00:00.000+00:00
},
{
"a": null
}
]
now i got error like this at source connector
"java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type STRING: class java.util.Date for field: \"a\"\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)\n\tat org.apache.kafka.connect.data.Struct.put(Struct.java:216)\n\tat org.apache.kafka.connect.data.Struct.put(Struct.java:203)"
how can i set some config to allow null value or ignore this error
and here is my kafka source connector configuration
{
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"auto.create.topics.enable": "false",
"topic.creation.enable": "true",
"topic.creation.default.partitions": "3",
"topic.prefix": "test-prefix",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.compression.type": "gzip",
"topic.creation.default.file.delete.delay.ms": "432000000",
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.retention.ms": "432000000",
"tombstones.on.delete": "false",
"mongodb.connection.string" : "xxxxxxxxxx",
"mongodb.name": "xxxxxxxxxx",
"mongodb.user" : "xxxxxxxxxx",
"mongodb.password" : "xxxxxxxxxx",
"mongodb.authSource": "xxxxxxxxxx",
"mongodb.connection.mode": "replica_set",
"database.include.list" : "xxxxxxxxxx",
"name": "test-connector",
"collection.include.list": "xxxxxxxxxx",
"schema.history.internal.kafka.bootstrap.servers": "kafka-0.kafka-headless.kafka-connector:9092",
"schema.history.internal.kafka.topic": "test-connector-schema",
"transforms": "ReplaceField, unwrap, RenameField, convertTS, RenameTSMS",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.exclude": "source",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.collection.expand.json.payload": "true",
"transforms.unwrap.add.fields": "op,ts_ms",
"transforms.unwrap.add.fields.prefix": "",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "_id:id",
"transforms.convertTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field": "ts_ms",
"transforms.convertTS.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.convertTS.target.type": "string",
"transforms.RenameTSMS.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameTSMS.renames": "ts_ms:@timestamp",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.nullable": "true",
"value.converter.schemas.enable": "false",
"capture.mode": "change_streams_update_full_with_pre_image",
"snapshot.mode": "never",
"capture.scope": "deployment",
"tasks.max": "1"
}
Thanks a lot for solutions.
Sorry for my english skill if you confused please ask me i’ll give more information