Juan Soto

1 result

Data Movement from Oracle to MongoDB Made Easy with Apache Kafka

Change Data Capture features have existed for many years in the database world. CDC makes it possible to listen to changes to the database like inserting, updating and deleting data and have these events be sent to other database systems in various scenarios like ETL, replications and database migrations. By leveraging the Apache Kafka, the Confluent Oracle CDC Connector and the MongoDB Connector for Apache Kafka, you can easily stream database changes from Oracle to MongoDB. In this post we will pass data from Oracle to MongoDB providing a step by step configuration for you to easily re-use, tweak and explore the functionality. At a high level, we will configure the above references image in a self-contained docker compose environment that consists of the following: Oracle Database MongoDB Apache Kafka Confluent KSQL These containers will be run all within a local network bridged so you can play around with them from your local Mac or PC. Check out the GitHub repository to download the complete example. Preparing the Oracle Docker image If you have an existing Oracle database, remove the section “database” from the docker-compose file. If you do not already have an Oracle database, you can pull the Oracle Database Enterprise Edition from Docker Hub . You will need to accept the Oracle terms and conditions and then login into your docker account via docker login then docker pull store/oracle/database-enterprise:12.2.0.1-slim to download the image locally. Launching the docker environment The docker-compose file will launch the following: Apache Kafka including Zookeeper, REST API, Schema Registry, KSQL Apache Kafka Connect MongoDB Connector for Apache Kafka Confluent Oracle CDC Connector Oracle Database Enterprise The complete sample code is available from a GitHub repository . To launch the environment, make sure you have your Oracle environment ready and then git clone the repo and build the following: docker-compose up -d --build Once the compose file finishes you will need to configure your Oracle environment to be used by the Confluent CDC Connector. Step 1: Connect to your Oracle instance If you are running Oracle within the docker environment, you can use docker exec as follows: docker exec -it oracle bash -c "source /home/oracle/.bashrc; sqlplus /nolog " connect / as sysdba Step 2: Configure Oracle for CDC Connector First, check if the database is in archive log mode. select log_mode from v$database; If the mode is not “ARCHIVELOG”, perform the following: SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN; Verify the archive mode: select log_mode from v$database The LOG_MODE should now be, “ARCHIVELOG”. Next, enable supplemental logging for all columns ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; The following should be run on the Oracle CDB: CREATE ROLE C##CDC_PRIVS; GRANT CREATE SESSION, EXECUTE_CATALOG_ROLE, SELECT ANY TRANSACTION, SELECT ANY DICTIONARY TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS; GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS; CREATE USER C##myuser IDENTIFIED BY password CONTAINER=ALL; GRANT C##CDC_PRIVS TO C##myuser CONTAINER=ALL; ALTER USER C##myuser QUOTA UNLIMITED ON sysaux; ALTER USER C##myuser SET CONTAINER_DATA = (CDB$ROOT, ORCLPDB1) CONTAINER=CURRENT; ALTER SESSION SET CONTAINER=CDB$ROOT; GRANT CREATE SESSION, ALTER SESSION, SET CONTAINER, LOGMINING, EXECUTE_CATALOG_ROLE TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$DATABASE TO C##myuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##myuser CONTAINER=ALL; GRANT SELECT ON GV_$ARCHIVED_LOG TO C##myuser CONTAINER=ALL; GRANT CONNECT TO C##myuser CONTAINER=ALL; GRANT CREATE TABLE TO C##myuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO C##myuser CONTAINER=ALL; GRANT CREATE TRIGGER TO C##myuser CONTAINER=ALL; ALTER SESSION SET CONTAINER=cdb$root; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; GRANT FLASHBACK ANY TABLE TO C##myuser; GRANT FLASHBACK ANY TABLE TO C##myuser container=all; Next, create some objects CREATE TABLE C##MYUSER.emp ( i INTEGER GENERATED BY DEFAULT AS IDENTITY, name VARCHAR2(100), lastname VARCHAR2(100), PRIMARY KEY (i) ) tablespace sysaux; insert into C##MYUSER.emp (name, lastname) values ('Bob', 'Perez'); insert into C##MYUSER.emp (name, lastname) values ('Jane','Revuelta'); insert into C##MYUSER.emp (name, lastname) values ('Mary','Kristmas'); insert into C##MYUSER.emp (name, lastname) values ('Alice','Cambio'); commit; Step 3: Create Kafka Topic Open a new terminal/shell and connect to your kafka server as follows: docker exec -it broker /bin/bash When connected create the kafka topic : kafka-topics --create --topic SimpleOracleCDC-ORCLCDB-redo-log \ --bootstrap-server broker:9092 --replication-factor 1 \ --partitions 1 --config cleanup.policy=delete \ --config retention.ms=120960000 Step 4: Configure the Oracle CDC Connector The oracle-cdc-source.json file in the repository contains the configuration of Confluent Oracle CDC connector. To configure simply execute: curl -X POST -H "Content-Type: application/json" -d @oracle-cdc-source.json http://localhost:8083/connectors Step 5: Setup kSQL data flows within Kafka As Oracle CRUD events arrive in the Kafka topic, we will use KSQL to stream these events into a new topic for consumption by the MongoDB Connector for Apache Kafka. docker exec -it ksql-server bin/bash ksql http://127.0.0.1:8088 Enter the following commands: CREATE STREAM CDCORACLE (I DECIMAL(20,0), NAME varchar, LASTNAME varchar, op_type VARCHAR) WITH ( kafka_topic='ORCLCDB-EMP', PARTITIONS=1, REPLICAS=1, value_format='AVRO'); CREATE STREAM WRITEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE!='D' EMIT CHANGES; CREATE STREAM DELETEOP AS SELECT CAST(I AS BIGINT) as "_id", NAME , LASTNAME , OP_TYPE from CDCORACLE WHERE OP_TYPE='D' EMIT CHANGES; To verify the steams were created: SHOW STREAMS; This command will show the following: Stream Name | Kafka Topic | Format ------------------------------------ CDCORACLE | ORCLCDB-EMP | AVRO DELETEOP | DELETEOP | AVRO WRITEOP | WRITEOP | AVRO ------------------------------------ Step 6: Configure MongoDB Sink The following is the configuration for the MongoDB Connector for Apache Kafka: { "name": "Oracle", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "WRITEOP", "connection.uri": "mongodb://mongo1", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } } In this example, this sink process consumes records from the WRITEOP topic and saves the data to MongoDB. The write model, UpdateOneBusinessKeyTimestampStrategy, performs an upsert operation using the filter defined on PartialValueStrategy property which in this example is the "_id" field. For your convenience, this configuration script is written in the mongodb-sink.json file in the repository. To configure execute: curl -X POST -H "Content-Type: application/json" -d @mongodb-sink.json http://localhost:8083/connectors Delete events are written in the DELETEOP topic and are sinked to MongoDB with the following sink configuration: { "name": "Oracle-Delete", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "DELETEOP", "connection.uri": "mongodb://mongo1”, "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy", "database": "kafka", "collection": "oracle", "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.overwrite.existing": "true", "document.id.strategy.partial.value.projection.type": "allowlist", "document.id.strategy.partial.value.projection.list": "_id", "errors.log.include.messages": true, "errors.deadletterqueue.context.headers.enable": true, "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://schema-registry:8081" } } curl -X POST -H "Content-Type: application/json" -d @mongodb-sink-delete.json http://localhost:8083/connectors This sink process uses the DeleteOneBusinessKeyStrategy writemdoel strategy . In this configuration, the sink reads from the DELETEOP topic and deletes documents in MongoDB based upon the filter defined on PartialValueStrategy property. In this example that filter is the “_id” field. Step 7: Write data to Oracle Now that your environment is setup and configured, return to the Oracle database and insert the following data: insert into C##MYUSER.emp (name, lastname) values ('Juan','Soto'); insert into C##MYUSER.emp (name, lastname) values ('Robert','Walters'); insert into C##MYUSER.emp (name, lastname) values ('Ruben','Trigo'); commit; Next, notice the data as it arrived in MongoDB by accessing the MongoDB shell. docker exec -it mongo1 /bin/mongo The inserted data will now be available in MongoDB. If we update the data in Oracle e.g. UPDATE C##MYUSER.emp SET name=’Rob’ WHERE name=’Robert’; COMMIT;\ The document will be updated in MongoDB as: { "_id" : NumberLong(11), "LASTNAME" : "Walters", "NAME" : "Rob", "OP_TYPE" : "U", "_insertedTS" : ISODate("2021-07-27T10:25:08.867Z"), "_modifiedTS" : ISODate("2021-07-27T10:25:08.867Z") } If we delete the data in Oracle e.g. DELETE FROM C##MYUSER.emp WHERE name=’Rob’; COMMIT;. The documents with name=’Rob’ will no longer be in MongoDB. Note that it may take a few seconds for the propagation from Oracle to MongoDB. Many possibilities In this post we performed a basic setup of moving data from Oracle to MongoDB via Apache Kafka and the Confluent Oracle CDC Connector and MongoDB Connector for Apache Kafka. While this example is fairly simple, you can add more complex transformations using KSQL and integrate other data sources within your Kafka environment making a production ready ETL or streaming environment with best of breed solutions. Resources How to Get Started with MongoDB Atlas and Confluent Cloud Announcing the MongoDB Atlas Sink and Source Connectors in Confluent Cloud Making your Life Easier with MongoDB and Kafka Streaming Time-Series Data Using Apache Kafka and MongoDB

August 17, 2021