Data Movement from Oracle to MongoDB Made Easy with Apache Kafka

Robert Walters and Juan Soto

#Kafka

Change Data Capture features have existed for many years in the database world. CDC makes it possible to listen to changes to the database like inserting, updating and deleting data and have these events be sent to other database systems in various scenarios like ETL, replications and database migrations. By leveraging the Apache Kafka, the Confluent Oracle CDC Connector and the MongoDB Connector for Apache Kafka, you can easily stream database changes from Oracle to MongoDB. In this post we will pass data from Oracle to MongoDB providing a step by step configuration for you to easily re-use, tweak and explore the functionality.

”

At a high level, we will configure the above references image in a self-contained docker compose environment that consists of the following:

  • Oracle Database
  • MongoDB
  • Apache Kafka
  • Confluent KSQL

These containers will be run all within a local network bridged so you can play around with them from your local Mac or PC. Check out the GitHub repository to download the complete example.

Preparing the Oracle Docker image

If you have an existing Oracle database, remove the section “database” from the docker-compose file. If you do not already have an Oracle database, you can pull the Oracle Database Enterprise Edition from Docker Hub. You will need to accept the Oracle terms and conditions and then login into your docker account via docker login then docker pull store/oracle/database-enterprise:12.2.0.1-slim to download the image locally.

Launching the docker environment

The docker-compose file will launch the following:

  • Apache Kafka including Zookeeper, REST API, Schema Registry, KSQL
  • Apache Kafka Connect
  • MongoDB Connector for Apache Kafka
  • Confluent Oracle CDC Connector
  • Oracle Database Enterprise

The complete sample code is available from a GitHub repository.

To launch the environment, make sure you have your Oracle environment ready and then git clone the repo and build the following:

docker-compose up -d --build

Once the compose file finishes you will need to configure your Oracle environment to be used by the Confluent CDC Connector.

Step 1: Connect to your Oracle instance

If you are running Oracle within the docker environment, you can use docker exec as follows:

docker exec -it oracle bash -c "source /home/oracle/.bashrc; sqlplus /nolog "

connect / as sysdba

Step 2: Configure Oracle for CDC Connector

First, check if the database is in archive log mode.

select log_mode from v$database;

If the mode is not “ARCHIVELOG”, perform the following:

SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

Verify the archive mode:

select log_mode from v$database

The LOG_MODE should now be, “ARCHIVELOG”.

Next, enable supplemental logging for all columns

ALTER SESSION SET CONTAINER=cdb$root;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

The following should be run on the Oracle CDB:

CREATE ROLE C##CDC_PRIVS;
GRANT CREATE SESSION,
EXECUTE_CATALOG_ROLE,
SELECT ANY TRANSACTION,
SELECT ANY DICTIONARY TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_COL$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_OBJ$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_USER$ TO C##CDC_PRIVS;
GRANT SELECT ON SYSTEM.LOGMNR_UID$ TO C##CDC_PRIVS;
 
CREATE USER C##myuser IDENTIFIED BY password CONTAINER=ALL;
GRANT C##CDC_PRIVS TO C##myuser CONTAINER=ALL;
ALTER USER C##myuser QUOTA UNLIMITED ON sysaux;
ALTER USER C##myuser SET CONTAINER_DATA = (CDB$ROOT, ORCLPDB1) CONTAINER=CURRENT;
 
ALTER SESSION SET CONTAINER=CDB$ROOT;
GRANT CREATE SESSION, ALTER SESSION, SET CONTAINER, LOGMINING, EXECUTE_CATALOG_ROLE TO C##myuser CONTAINER=ALL;
GRANT SELECT ON GV_$DATABASE TO C##myuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##myuser CONTAINER=ALL;
GRANT SELECT ON GV_$ARCHIVED_LOG TO C##myuser CONTAINER=ALL;
GRANT CONNECT TO C##myuser CONTAINER=ALL;
GRANT CREATE TABLE TO C##myuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO C##myuser CONTAINER=ALL;
GRANT CREATE TRIGGER TO C##myuser CONTAINER=ALL;
 
ALTER SESSION SET CONTAINER=cdb$root;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
 
GRANT FLASHBACK ANY TABLE TO C##myuser;
GRANT FLASHBACK ANY TABLE TO C##myuser container=all;

Next, create some objects

CREATE TABLE C##MYUSER.emp
(
   i INTEGER GENERATED BY DEFAULT AS IDENTITY,
   name VARCHAR2(100),
   lastname VARCHAR2(100),
   PRIMARY KEY (i)
) tablespace sysaux;
  
insert into C##MYUSER.emp (name, lastname) values ('Bob', 'Perez');
insert into C##MYUSER.emp (name, lastname) values ('Jane','Revuelta');
insert into C##MYUSER.emp (name, lastname) values ('Mary','Kristmas');
insert into C##MYUSER.emp (name, lastname) values ('Alice','Cambio');
commit;

Step 3: Create Kafka Topic

Open a new terminal/shell and connect to your kafka server as follows:

docker exec -it broker /bin/bash

When connected create the kafka topic :

kafka-topics --create --topic SimpleOracleCDC-ORCLCDB-redo-log \
--bootstrap-server broker:9092 --replication-factor 1 \
--partitions 1 --config cleanup.policy=delete \
--config retention.ms=120960000

Step 4: Configure the Oracle CDC Connector

The oracle-cdc-source.json file in the repository contains the configuration of Confluent Oracle CDC connector. To configure simply execute:

curl -X POST -H "Content-Type: application/json" -d @oracle-cdc-source.json  http://localhost:8083/connectors

Step 5: Setup kSQL data flows within Kafka

As Oracle CRUD events arrive in the Kafka topic, we will use KSQL to stream these events into a new topic for consumption by the MongoDB Connector for Apache Kafka.

docker exec -it ksql-server bin/bash

ksql http://127.0.0.1:8088

Enter the following commands:

CREATE STREAM CDCORACLE (I DECIMAL(20,0), NAME varchar, LASTNAME varchar, op_type VARCHAR) WITH ( kafka_topic='ORCLCDB-EMP', PARTITIONS=1, REPLICAS=1, value_format='AVRO');

CREATE STREAM WRITEOP AS
  SELECT CAST(I AS BIGINT) as "_id",  NAME ,  LASTNAME , OP_TYPE  from CDCORACLE WHERE OP_TYPE!='D' EMIT CHANGES;

CREATE STREAM DELETEOP AS
  SELECT CAST(I AS BIGINT) as "_id",  NAME ,  LASTNAME , OP_TYPE  from CDCORACLE WHERE OP_TYPE='D' EMIT CHANGES;

To verify the steams were created:

SHOW STREAMS;

This command will show the following:

Stream Name | Kafka Topic | Format 
------------------------------------
 CDCORACLE   | ORCLCDB-EMP | AVRO   
 DELETEOP    | DELETEOP    | AVRO   
 WRITEOP     | WRITEOP     | AVRO   
 ------------------------------------

Step 6: Configure MongoDB Sink

The following is the configuration for the MongoDB Connector for Apache Kafka:

{
  "name": "Oracle",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "WRITEOP",
    "connection.uri": "mongodb://mongo1",
    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy",
    "database": "kafka",
    "collection": "oracle",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "document.id.strategy.overwrite.existing": "true",
    "document.id.strategy.partial.value.projection.type": "allowlist",
    "document.id.strategy.partial.value.projection.list": "_id",
    "errors.log.include.messages": true,
    "errors.deadletterqueue.context.headers.enable": true,
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter"

  }
}

In this example, this sink process consumes records from the WRITEOP topic and saves the data to MongoDB. The write model, UpdateOneBusinessKeyTimestampStrategy, performs an upsert operation using the filter defined on PartialValueStrategy property which in this example is the "_id" field. For your convenience, this configuration script is written in the mongodb-sink.json file in the repository. To configure execute:

curl -X POST -H "Content-Type: application/json" -d @mongodb-sink.json  http://localhost:8083/connectors

Delete events are written in the DELETEOP topic and are sinked to MongoDB with the following sink configuration:

{
  "name": "Oracle-Delete",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "DELETEOP",
    "connection.uri": "mongodb://mongo1”,
    "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy",
    "database": "kafka",
    "collection": "oracle",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
    "document.id.strategy.overwrite.existing": "true",
    "document.id.strategy.partial.value.projection.type": "allowlist",
    "document.id.strategy.partial.value.projection.list": "_id",
    "errors.log.include.messages": true,
    "errors.deadletterqueue.context.headers.enable": true,
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"

  }
}

curl -X POST -H "Content-Type: application/json" -d @mongodb-sink-delete.json  http://localhost:8083/connectors

This sink process uses the DeleteOneBusinessKeyStrategy writemdoel strategy. In this configuration, the sink reads from the DELETEOP topic and deletes documents in MongoDB based upon the filter defined on PartialValueStrategy property. In this example that filter is the “_id” field.

Step 7: Write data to Oracle

Now that your environment is setup and configured, return to the Oracle database and insert the following data:

insert into C##MYUSER.emp (name, lastname) values ('Juan','Soto');
insert into C##MYUSER.emp (name, lastname) values ('Robert','Walters');
insert into C##MYUSER.emp (name, lastname) values ('Ruben','Trigo');
commit;

Next, notice the data as it arrived in MongoDB by accessing the MongoDB shell.

docker exec -it mongo1 /bin/mongo

The inserted data will now be available in MongoDB.

”

If we update the data in Oracle e.g.

UPDATE C##MYUSER.emp SET name=’Rob’ WHERE name=’Robert’;
COMMIT;\

The document will be updated in MongoDB as:

{
        "_id" : NumberLong(11),
        "LASTNAME" : "Walters",
        "NAME" : "Rob",
        "OP_TYPE" : "U",
        "_insertedTS" : ISODate("2021-07-27T10:25:08.867Z"),
        "_modifiedTS" : ISODate("2021-07-27T10:25:08.867Z")
}

If we delete the data in Oracle e.g.

DELETE FROM C##MYUSER.emp WHERE name=’Rob’; COMMIT;.

The documents with name=’Rob’ will no longer be in MongoDB.

Note that it may take a few seconds for the propagation from Oracle to MongoDB.

Many possibilities

In this post we performed a basic setup of moving data from Oracle to MongoDB via Apache Kafka and the Confluent Oracle CDC Connector and MongoDB Connector for Apache Kafka. While this example is fairly simple, you can add more complex transformations using KSQL and integrate other data sources within your Kafka environment making a production ready ETL or streaming environment with best of breed solutions.

Resources