Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Support SQL Server Version​
  • Description​
  • Key Features​
  • Supported DataSource Info​
  • Data Type Mapping​
  • Sink Options​
  • tips​
  • Task Example​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Sink

SQL Server

PreviousSocketNextStarRocks

Last updated 8 months ago

JDBC SQL Server Sink Connector

Support SQL Server Version

  • server:2008 (Or later version for information only)

Description

Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee).

Key Features

Use Xa transactions to ensure exactly-once. So only support exactly-once for the database which is support Xa transactions. You can set is_exactly_once=true to enable it.

Supported DataSource Info

Datasource
Supported Versions
Driver
Url

SQL Server

support version >= 2008

com.microsoft.sqlserver.jdbc.SQLServerDriver

jdbc:sqlserver://localhost:1433

Data Type Mapping

SQLserver Data Type
Nexus Data Type

BIT

BOOLEAN

TINYINT SMALLINT

SHORT

INTEGER

INT

BIGINT

LONG

DECIMAL NUMERIC MONEY SMALLMONEY

DECIMAL((Get the designated column's specified column size)+1, (Gets the designated column's number of digits to right of the decimal point.)))

REAL

FLOAT

FLOAT

DOUBLE

CHAR NCHAR VARCHAR NTEXT NVARCHAR TEXT

STRING

DATE

LOCAL_DATE

TIME

LOCAL_TIME

DATETIME DATETIME2 SMALLDATETIME DATETIMEOFFSET

LOCAL_DATE_TIME

TIMESTAMP BINARY VARBINARY IMAGE UNKNOWN

Not supported yet

Name
Type
Required
Default
Description

url

String

Yes

-

The URL of the JDBC connection. Refer to a case: jdbc:sqlserver://localhost:1433;databaseName=mydatabase

driver

String

Yes

-

The jdbc class name used to connect to the remote data source, if you use sqlServer the value is com.microsoft.sqlserver.jdbc.SQLServerDriver.

user

String

No

-

Connection instance user name

password

String

No

-

Connection instance password

query

String

No

-

Use this sql write upstream input datas to database. e.g INSERT ...,query have the higher priority

database

String

No

-

Use this database and table-name auto-generate sql and receive upstream input datas write to database. This option is mutually exclusive with query and has a higher priority.

table

String

No

-

Use database and this table-name auto-generate sql and receive upstream input datas write to database. This option is mutually exclusive with query and has a higher priority.

primary_keys

Array

No

-

This option is used to support operations such as insert, delete, and update when automatically generate sql.

support_upsert_by_query_primary_key_exist

Boolean

No

false

Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. Note: that this method has low performance

connection_check_timeout_sec

Int

No

30

The time in seconds to wait for the database operation used to validate the connection to complete.

max_retries

Int

No

0

The number of retries to submit failed (executeBatch)

batch_size

Int

No

1000

For batch writing, when the number of buffered records reaches the number of batch_size or the time reaches checkpoint.interval , the data will be flushed into the database

is_exactly_once

Boolean

No

false

Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set xa_data_source_class_name.

generate_sink_sql

Boolean

No

false

Generate sql statements based on the database table you want to write to

xa_data_source_class_name

String

No

-

The xa data source class name of the database Driver, for example, SqlServer is com.microsoft.sqlserver.jdbc.SQLServerXADataSource, and please refer to appendix for other data sources

max_commit_attempts

Int

No

3

The number of retries for transaction commit failures

transaction_timeout_sec

Int

No

-1

The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect exactly-once semantics

auto_commit

Boolean

No

true

Automatic transaction commit is enabled by default

common-options

no

-

enable_upsert

Boolean

No

true

Enable upsert by primary_keys exist, If the task has no key duplicate data, setting this parameter to false can speed up data import

If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks.

This is one that reads Sqlserver data and inserts it directly into another table

env {
  # You can set engine configuration here
  parallelism = 10
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "select * from column_type_test.dbo.full_types_jdbc"
    # Parallel sharding reads fields
    partition_column = "id"
    # Number of fragments
    partition_num = 10

  }
  # If you would like to get more information about how to configure Nexus and see full list of source plugins,
  # please go to source page
}

transform {

  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}

sink {
  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"

  }  # If you would like to get more information about how to configure Nexus and see full list of sink plugins,
  # please go to sink page
}

CDC change data is also supported by us In this case, you need config database, table and primary_keys.

Jdbc {
  source_table_name = "customers"
  driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
  url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
  user = SA
  password = "Y.sa123456"
  generate_sink_sql = true
  database = "column_type_test"
  table = "dbo.full_types_sink"
  batch_size = 100
  primary_keys = ["id"]
}

Transactional writes may be slower but more accurate to the data

  Jdbc {
    driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
    url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    user = SA
    password = "Y.sa123456"
    query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
    is_exactly_once = "true"

    xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"

  }  # If you would like to get more information about how to configure Nexus and see full list of sink plugins,
  # please go to sink page

Sink Options

Sink plugin common parameters, please refer to for details

tips

Task Example

simple:

CDC(Change data capture) event

Exactly Once Sink

​
​
​
​
​
​
​
​
​
​
​
Sink Common Options
Options