Hi guys, i using kafka connect to replicate database from postgresql to mongodb. In source connector side, i use debezium postgresql connector to capture events from postgresql database and push message to 3 topics corresponding with 3 events: update/insert/delete. My question is: how can i config mongo sink connector to push CRUD document base on value of field “source.table” in message record?
This is example message from kafka connecto
{
"before": null,
"after": {
"id": 3815,
"shift_id": 30,
"plan_id": 1317,
"plan_status": 3,
"created_at": 1672667143000,
"updated_at": 1672667143000,
"deleted_at": null,
"created_by": 1,
"updated_by": 1,
"deleted_by": null
},
"source": {
"version": "2.2.1.Final",
"connector": "postgresql",
"name": "source",
"ts_ms": 1698380116989,
"snapshot": "false",
"db": "database_name",
"sequence": "[null,\"1017130048\"]",
"schema": "public",
"table": "shift_plans",
"txId": 1320,
"lsn": 1017130048,
"xmin": null
},
"op": "u",
"ts_ms": 1698380117364,
"transaction": null
}