Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Support SQL Server Version​
  • Key Features​
  • Description​
  • Data Type Mapping​
  • Source Options​
  • Task Example​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Source

SQL Server CDC

PreviousSocketNextSQL Server

Last updated 8 months ago

Sql Server CDC source connector

Support SQL Server Version

  • server:2019 (Or later version for information only)

Key Features

Description

The Sql Server CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document describes how to setup the Sql Server CDC connector to run SQL queries against SqlServer databases.

Data Type Mapping

SQLserver Data Type
Nexus Data Type

CHAR VARCHAR NCHAR NVARCHAR TEXT NTEXT XML

STRING

BINARY VARBINARY IMAGE

BYTES

INTEGER INT

INT

SMALLINT TINYINT

SMALLINT

BIGINT

BIGINT

FLOAT(1~24) REAL

FLOAT

DOUBLE FLOAT(>24)

DOUBLE

NUMERIC(p,s) DECIMAL(p,s) MONEY SMALLMONEY

DECIMAL(p, s)

TIMESTAMP

BYTES

DATE

DATE

TIME(s)

TIME(s)

DATETIME(s) DATETIME2(s) DATETIMEOFFSET(s) SMALLDATETIME

TIMESTAMP(s)

BOOLEAN BIT

BOOLEAN

Name
Type
Required
Default
Description

username

String

Yes

-

Name of the database to use when connecting to the database server.

password

String

Yes

-

Password to use when connecting to the database server.

database-names

List

Yes

-

Database name of the database to monitor.

table-names

List

Yes

-

Table name is a combination of schema name and table name (databaseName.schemaName.tableName).

table-names-config

List

No

-

Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":["key1"]}]

base-url

String

Yes

-

URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test".

startup.mode

Enum

No

INITIAL

Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific".

startup.timestamp

Long

No

-

Start from the specified epoch timestamp (in milliseconds). Note, This option is required when the "startup.mode" option used 'timestamp'.

startup.specific-offset.file

String

No

-

Start from the specified binlog file name. Note, This option is required when the "startup.mode" option used 'specific'.

startup.specific-offset.pos

Long

No

-

Start from the specified binlog file position. Note, This option is required when the "startup.mode" option used 'specific'.

stop.mode

Enum

No

NEVER

Optional stop mode for SqlServer CDC consumer, valid enumerations are "never".

stop.timestamp

Long

No

-

Stop from the specified epoch timestamp (in milliseconds). Note, This option is required when the "stop.mode" option used 'timestamp'.

stop.specific-offset.file

String

No

-

Stop from the specified binlog file name. Note, This option is required when the "stop.mode" option used 'specific'.

stop.specific-offset.pos

Long

No

-

Stop from the specified binlog file position. Note, This option is required when the "stop.mode" option used 'specific'.

incremental.parallelism

Integer

No

1

The number of parallel readers in the incremental phase.

snapshot.split.size

Integer

No

8096

The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshotof table.

snapshot.fetch.size

Integer

No

1024

The maximum fetch size for per poll when read table snapshot.

server-time-zone

String

No

UTC

The session time zone in database server.

connect.timeout

Duration

No

30s

The maximum time that the connector should wait after trying to connect to the database server before timing out.

connect.max-retries

Integer

No

3

The max retry times that the connector should retry to build database server connection.

connection.pool.size

Integer

No

20

The connection pool size.

chunk-key.even-distribution.factor.upper-bound

Double

No

100

The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.

chunk-key.even-distribution.factor.lower-bound

Double

No

0.05

The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.

sample-sharding.threshold

int

No

1000

This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.

inverse-sampling.rate

int

No

1000

The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.

exactly_once

Boolean

No

false

Enable exactly once semantic.

debezium.*

config

No

-

format

Enum

No

DEFAULT

Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".

common-options

no

-

  1. Check whether the CDC Agent is enabled

EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT'; If the result is running, prove that it is enabled. Otherwise, you need to manually enable it

  1. Enable the CDC Agent

/opt/mssql/bin/mssql-conf setup

  1. The result is as follows

1) Evaluation (free, no production use rights, 180-day limit) 2) Developer (free, no production use rights) 3) Express (free) 4) Web (PAID) 5) Standard (PAID) 6) Enterprise (PAID) 7) Enterprise Core (PAID) 8) I bought a license through a retail sales channel and have a product key to enter.

  1. Set the CDC at the library level Set the library level below to enable CDC. At this level, all tables under the libraries of the enabled CDC automatically enable CDC

USE TestDB; -- Replace with the actual database name EXEC sys.sp_cdc_enable_db; SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'table'; -- table Replace with the name of the table you want to check

This is a stream mode cdc initializes read table data will be read incrementally after successful read The following sql DDL is for reference only

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  SqlServer-CDC {
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="initial"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }

This is an incremental read that reads the changed data for printing

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  SqlServer-CDC {
   # Set up accurate one read
    exactly_once=true 
    result_table_name = "customers"
    username = "sa"
    password = "Y.sa123456"
    startup.mode="latest"
    database-names = ["column_type_test"]
    table-names = ["column_type_test.dbo.full_types"]
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  }
}

transform {
}

sink {
  console {
    source_table_name = "customers"
  }
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  SqlServer-CDC {
    base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    username = "sa"
    password = "Y.sa123456"
    database-names = ["column_type_test"]
    
    table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
    table-names-config = [
      {
        table = "column_type_test.dbo.full_types"
        primaryKeys = ["id"]
      }
    ]
  }
}

sink {
  console {
  }

Source Options

Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server. See more about the

Source plugin common parameters, please refer to for details.

Enable Sql Server CDC

Task Example

initiali read Simple

increment read Simple

Support custom primary key for table

​
​
​
​
​
​
​
​
​
​
Source Common Options
Debezium's SqlServer Connector properties