Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Key features​
  • Description​
  • Notice​
  • Database Dependency​
  • Data Type Mapping​
  • Source Options​
  • Task Example​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Source

Oracle CDC

PreviousOpenMldbNextOracle

Last updated 8 months ago

Oracle CDC source connector

Key features

Description

The Oracle CDC connector allows for reading snapshot data and incremental data from Oracle database. This document describes how to set up the Oracle CDC connector to run SQL queries against Oracle databases.

Notice

The Debezium Oracle connector does not rely on the continuous mining option. The connector is responsible for detecting log switches and adjusting the logs that are mined automatically, which the continuous mining option did for you automatically. So, you can not set this property named log.mining.continuous.mine in the debezium.

Database Dependency

Enable Oracle Logminer

To enable Oracle CDC (Change Data Capture) using Logminer in Nexus, which is a built-in tool provided by Oracle, follow the steps below:

  1. The operating system creates an empty file directory to store Oracle archived logs and user tablespaces.

mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
chown -R oracle /opt/oracle/***
  1. Login as admin and enable Oracle archived logs.

sqlplus /nolog;
connect sys as sysdba;
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
archive log list;
  1. Login as admin and create an account called logminer_user with the password "oracle", and grant it privileges to read tables and logs.

CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;

GRANT CREATE SESSION TO logminer_user;
GRANT SELECT ON V_$DATABASE to logminer_user;
GRANT SELECT ON V_$LOG TO logminer_user;
GRANT SELECT ON V_$LOGFILE TO logminer_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
GRANT SELECT ANY TRANSACTION TO logminer_user;
GRANT SELECT ON V_$TRANSACTION TO logminer_user;
GRANT LOGMINING TO logminer_user;
GRANT SELECT ANY TABLE TO logminer_user;
GRANT ANALYZE ANY TO logminer_user;
  1. The operating system creates an empty file directory to store Oracle archived logs and user tablespaces.

mkdir -p /opt/oracle/oradata/recovery_area
mkdir -p /opt/oracle/oradata/ORCLCDB
mkdir -p /opt/oracle/oradata/ORCLCDB/ORCLPDB1
chown -R oracle /opt/oracle/***
  1. Login as admin and enable logging

sqlplus /nolog
connect sys as sysdba; # Password: oracle
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
archive log list;
  1. Executing in CDB

ALTER TABLE TEST.* ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE TEST.T2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  1. Creating debeziume account

Operating in CDB

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
 SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;

Operating in PDB

sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
 CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
   SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
 exit;
  1. Operating in CDB

sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba

CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
CONTAINER=ALL;

GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;

GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;

GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;

GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT analyze any TO debeziume_1 CONTAINER=ALL;

exit;
Oracle Data type
SeaTunnel Data type

INTEGER

INT

FLOAT

DECIMAL(38, 18)

NUMBER(precision <= 9, scale == 0)

INT

NUMBER(9 < precision <= 18, scale == 0)

BIGINT

NUMBER(18 < precision, scale == 0)

DECIMAL(38, 0)

NUMBER(precision == 0, scale == 0)

DECIMAL(38, 18)

NUMBER(scale != 0)

DECIMAL(38, 18)

BINARY_DOUBLE

DOUBLE

BINARY_FLOAT REAL

FLOAT

CHAR NCHAR NVARCHAR2 VARCHAR2 LONG ROWID NCLOB CLOB

STRING

DATE

DATE

TIMESTAMP TIMESTAMP WITH LOCAL TIME ZONE

TIMESTAMP

BLOB RAW LONG RAW BFILE

BYTES

Name
Type
Required
Default
Description

base-url

String

Yes

-

The URL of the JDBC connection. Refer to a case: idbc:oracle:thin:datasource01:1523:xe.

username

String

Yes

-

Name of the database to use when connecting to the database server.

password

String

Yes

-

Password to use when connecting to the database server.

database-names

List

No

-

Database name of the database to monitor.

schema-names

List

No

-

Schema name of the database to monitor.

table-names

List

Yes

-

Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name

table-names-config

List

No

-

Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}]

startup.mode

Enum

No

INITIAL

Optional startup mode for Oracle CDC consumer, valid enumerations are initial, earliest, latest and specific. initial: Synchronize historical data at startup, and then synchronize incremental data. earliest: Startup from the earliest offset possible. latest: Startup from the latest offset. specific: Startup from user-supplied specific offsets.

startup.specific-offset.file

String

No

-

Start from the specified binlog file name. Note, This option is required when the startup.mode option used specific.

startup.specific-offset.pos

Long

No

-

Start from the specified binlog file position. Note, This option is required when the startup.mode option used specific.

stop.mode

Enum

No

NEVER

Optional stop mode for Oracle CDC consumer, valid enumerations are never, latest or specific. never: Real-time job don't stop the source. latest: Stop from the latest offset. specific: Stop from user-supplied specific offset.

stop.specific-offset.file

String

No

-

Stop from the specified binlog file name. Note, This option is required when the stop.mode option used specific.

stop.specific-offset.pos

Long

No

-

Stop from the specified binlog file position. Note, This option is required when the stop.mode option used specific.

snapshot.split.size

Integer

No

8096

The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.

snapshot.fetch.size

Integer

No

1024

The maximum fetch size for per poll when read table snapshot.

server-time-zone

String

No

UTC

The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.

connect.timeout.ms

Duration

No

30000

The maximum time that the connector should wait after trying to connect to the database server before timing out.

connect.max-retries

Integer

No

3

The max retry times that the connector should retry to build database server connection.

connection.pool.size

Integer

No

20

The jdbc connection pool size.

chunk-key.even-distribution.factor.upper-bound

Double

No

100

The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.

chunk-key.even-distribution.factor.lower-bound

Double

No

0.05

The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.

sample-sharding.threshold

Integer

No

1000

This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.

inverse-sampling.rate

Integer

No

1000

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.

exactly_once

Boolean

No

false

Enable exactly once semantic.

use_select_count

Boolean

No

false

Use select count for table count rather then other methods in full stage.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table

skip_analyze

Boolean

No

false

Skip the analysis of table count in full stage.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently

format

Enum

No

DEFAULT

Optional output format for Oracle CDC, valid enumerations are DEFAULT、COMPATIBLE_DEBEZIUM_JSON.

debezium

Config

No

-

common-options

no

-

Support multi-table reading

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Oracle-CDC {
    result_table_name = "customers"
    username = "system"
    password = "oracle"
    database-names = ["XE"]
    schema-names = ["DEBEZIUM"]
    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
    base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
    source.reader.close.timeout = 120000
  }
}

Use the select count(*) instead of analysis table for count table rows in full stage

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
use_select_count = true 
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
source.reader.close.timeout = 120000
}
}

Use the select NUM_ROWS from all_tables for the table rows but skip the analyze table.

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Oracle-CDC {
result_table_name = "customers"
skip_analyze = true 
username = "system"
password = "oracle"
database-names = ["XE"]
schema-names = ["DEBEZIUM"]
table-names = ["XE.DEBEZIUM.FULL_TYPES"]
base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
source.reader.close.timeout = 120000
}
}

source {
  Oracle-CDC {
    result_table_name = "customers"
    base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
    source.reader.close.timeout = 120000
    username = "system"
    password = "oracle"
    database-names = ["XE"]
    schema-names = ["DEBEZIUM"]
    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
    table-names-config = [
      {
        table = "XE.DEBEZIUM.FULL_TYPES"
        primaryKeys = ["ID"]
      }
    ]
  }
}

Enabling Logminer without CDB (Container Database) mode.

Oracle 11g is not supported

Grant privileges only to the tables that need to be collected

To enable Logminer in Oracle with CDB (Container Database) + PDB (Pluggable Database) mode, follow the steps below:

Data Type Mapping

Source Options

Pass-through to Debezium Embedded Engine which is used to capture data changes from Oracle server.

Source plugin common parameters, please refer to for details

Task Example

Simple

Support custom primary key for table

Support debezium-compatible format send to kafka

Must be used with kafka connector sink, see for details

​
​
​
​
​
​
​
​
​
​
​
​
​
​
​
compatible debezium format
Source Common Options
Debezium's properties