Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Key features​
  • Description​
  • Supported DataSource Info​
  • Using Dependency​
  • Data Type Mapping​
  • Source Options​
  • Task Example​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Source

PostgreSQL CDC

PreviousPhoenixNextPostgreSQL

Last updated 8 months ago

PostgreSQL CDC source connector

Key features

Description

The Postgre CDC connector allows for reading snapshot data and incremental data from Postgre database. This document describes how to set up the Postgre CDC connector to run SQL queries against Postgre databases.

Supported DataSource Info

Datasource
Supported versions
Driver
Url

PostgreSQL

Different dependency version has different driver class.

org.postgresql.Driver

jdbc:postgresql://localhost:5432/test

PostgreSQL

If you want to manipulate the GEOMETRY type in PostgreSQL.

org.postgresql.Driver

jdbc:postgresql://localhost:5432/test

Using Dependency

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

  1. Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:

ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();
  1. Change the REPLICA policy of the specified table to FULL

ALTER TABLE your_table_name REPLICA IDENTITY FULL;
PostgreSQL Data type
Nexus Data type

BOOL

BOOLEAN

_BOOL

ARRAY<BOOLEAN>

BYTEA

BYTES

_BYTEA

ARRAY<TINYINT>

INT2 SMALLSERIAL INT4 SERIAL

INT

_INT2 _INT4

ARRAY<INT>

INT8 BIGSERIAL

BIGINT

_INT8

ARRAY<BIGINT>

FLOAT4

FLOAT

_FLOAT4

ARRAY<FLOAT>

FLOAT8

DOUBLE

_FLOAT8

ARRAY<DOUBLE>

NUMERIC(Get the designated column's specified column size>0)

DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point)

NUMERIC(Get the designated column's specified column size<0)

DECIMAL(38, 18)

BPCHAR CHARACTER VARCHAR TEXT GEOMETRY GEOGRAPHY JSON JSONB

STRING

_BPCHAR _CHARACTER _VARCHAR _TEXT

ARRAY<STRING>

TIMESTAMP

TIMESTAMP

TIME

TIME

DATE

DATE

OTHER DATA TYPES

NOT SUPPORTED YET

Name
Type
Required
Default
Description

base-url

String

Yes

-

The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF.

username

String

Yes

-

Name of the database to use when connecting to the database server.

password

String

Yes

-

Password to use when connecting to the database server.

database-names

List

No

-

Database name of the database to monitor.

table-names

List

Yes

-

Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name

table-names-config

List

No

-

Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}]

startup.mode

Enum

No

INITIAL

Optional startup mode for PostgreSQL CDC consumer, valid enumerations are initial, earliest, latest and specific. initial: Synchronize historical data at startup, and then synchronize incremental data. earliest: Startup from the earliest offset possible. latest: Startup from the latest offset. specific: Startup from user-supplied specific offsets.

snapshot.split.size

Integer

No

8096

The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.

snapshot.fetch.size

Integer

No

1024

The maximum fetch size for per poll when read table snapshot.

slot.name

String

No

-

The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Default is Nexus.

decoding.plugin.name

String

No

pgoutput

The name of the Postgres logical decoding plug-in installed on the server,Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,wal2json_rds_streaming and pgoutput.

server-time-zone

String

No

UTC

The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.

connect.timeout.ms

Duration

No

30000

The maximum time that the connector should wait after trying to connect to the database server before timing out.

connect.max-retries

Integer

No

3

The max retry times that the connector should retry to build database server connection.

connection.pool.size

Integer

No

20

The jdbc connection pool size.

chunk-key.even-distribution.factor.upper-bound

Double

No

100

The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.

chunk-key.even-distribution.factor.lower-bound

Double

No

0.05

The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.

sample-sharding.threshold

Integer

No

1000

This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.

inverse-sampling.rate

Integer

No

1000

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.

exactly_once

Boolean

No

false

Enable exactly once semantic.

format

Enum

No

DEFAULT

Optional output format for PostgreSQL CDC, valid enumerations are DEFAULT, COMPATIBLE_DEBEZIUM_JSON.

debezium

Config

No

-

common-options

no

-

Support multi-table reading



env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}

source {
  Postgres-CDC {
    result_table_name = "customers_Postgre_cdc"
    username = "postgres"
    password = "postgres"
    database-names = ["postgres_cdc"]
    schema-names = ["inventory"]
    table-names = ["postgres_cdc.inventory.postgres_cdc_table_1,postgres_cdc.inventory.postgres_cdc_table_2"]
    base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
  }
}

transform {

}

sink {
  jdbc {
    source_table_name = "customers_Postgre_cdc"
    url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
    driver = "org.postgresql.Driver"
    user = "postgres"
    password = "postgres"

    generate_sink_sql = true
    # You need to configure both database and table
    database = postgres_cdc
    chema = "inventory"
    tablePrefix = "sink_"
    primary_keys = ["id"]
  }
}
source {
  Postgres-CDC {
    result_table_name = "customers_mysql_cdc"
    username = "postgres"
    password = "postgres"
    database-names = ["postgres_cdc"]
    schema-names = ["inventory"]
    table-names = ["postgres_cdc.inventory.full_types_no_primary_key"]
    base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
    decoding.plugin.name = "decoderbufs"
    exactly_once = false
    table-names-config = [
      {
        table = "postgres_cdc.inventory.full_types_no_primary_key"
        primaryKeys = ["id"]
      }
    ]
  }
}

Data Type Mapping

Source Options

Pass-through to Debezium Embedded Engine which is used to capture data changes from PostgreSQL server.

Source plugin common parameters, please refer to for details

Task Example

Simple

Support custom primary key for table

​
​
​
​
​
​
​
​
​
Source Common Options
Debezium's properties