Selfuel Docs
  • Welcome to Selfuel Platform
    • Features
    • Capabilities
    • Target Audience
    • $150 Free Trial
  • Registration and Login
  • Platform UI
  • Stream Processing with Cortex
    • Cortex Quickstart Guide
    • Cortex Elements
      • Streams
      • Attributes
      • Mappings
        • 🚧Source Mapping Types
        • 🚧Sink Mapping Types
      • Node and Application Healthchecks
      • Nodes
        • Node Preview
        • Node Connectivites
        • Node Units
      • Expression Builder
        • 🚧Built-in Functions
      • Windows
        • Cron Window
        • Delay Window
        • Unique Event Window
        • First Event Window
        • Sliding Event Count Window
        • Tumbling Event Count Window
        • Session Window
        • Tumbling Event Sort Window
        • Sliding Time Window
        • Tumbling Time Window
        • Sliding Time and Event Count Window
      • Store and Cache
        • RDBMS
        • MongoDB
        • Redis
        • Elasticsearch
    • Applications
      • Applications Page
      • Creating Applications using Canvas
      • Connector Nodes Cluster
        • Source Nodes
          • CDC Source
          • Email Source
          • HTTP Source
          • HTTP Call Response Source
          • HTTP Service Source
          • Kafka Source
          • RabbitMQ Source
          • gRPC Source
          • JMS Source
          • Kafka Multi DC Source
          • JMS Source
          • AWS S3 Source
          • Google Pub-sub Source
          • AWS SQS Source
          • MQTT Source
          • Google Cloud Storage Source
          • HTTP SSE Source
          • WebSubHub Source
        • Sink Nodes
          • Email Sink
          • HTTP Sink
          • HTTP Service Response Sink
          • HTTP Call Sink
          • Kafka Sink
          • RabbitMQ Sink
          • gRPC Sink
          • JMS Sink
          • Kafka Multi DC Sink
          • AWS S3 Sink
          • Google Pub-sub Sink
          • AWS SQS Sink
          • MQTT Sink
          • Google Cloud Storage Sink
          • HTTP SSE Sink
          • WebSubHub Sink
      • Processing Nodes Cluster
        • Query
        • Join
        • Pattern
        • Sequence
        • Processor
        • 🚧On-demand Query
      • Buffer Nodes Cluster
        • Stream
        • Table
        • Window
        • Aggregation
        • Trigger
    • Run Applications
      • Run Applications Using Runners
      • Update Running Applications
      • Application Versioning
  • Data Integration with Nexus
    • Nexus Quickstart Guide
    • Nexus Elements
      • Concept
        • Config
        • Schema Feature
        • Speed Control
      • Connectors
        • Source
          • Source Connector Features
          • Source Common Options
          • AmazonDynamoDB
          • AmazonSqs
          • Cassandra
          • Clickhouse
          • CosFile
          • DB2
          • Doris
          • Easysearch
          • Elasticsearch
          • FakeSource
          • FtpFile
          • Github
          • Gitlab
          • GoogleSheets
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • HiveJdbc
          • Http
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Jira
          • Kingbase
          • Klaviyo
          • Kudu
          • Lemlist
          • Maxcompute
          • Milvus
          • MongoDB CDC
          • MongoDB
          • My Hours
          • MySQL CDC
          • MySQL
          • Neo4j
          • Notion
          • ObsFile
          • OceanBase
          • OneSignal
          • OpenMldb
          • Oracle CDC
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Persistiq
          • Phoenix
          • PostgreSQL CDC
          • PostgreSQL
          • Apache Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3File
          • SftpFile
          • Sls
          • Snowflake
          • Socket
          • SQL Server CDC
          • SQL Server
          • StarRocks
          • TDengine
          • Vertica
          • Web3j
          • Kafka
        • Sink
          • Sink Connector Features
          • Sink Common Options
          • Activemq
          • AmazonDynamoDB
          • AmazonSqs
          • Assert
          • Cassandra
          • Clickhouse
          • ClickhouseFile
          • CosFile
          • DB2
          • DataHub
          • DingTalk
          • Doris
          • Druid
          • INFINI Easysearch
          • Elasticsearch
          • Email
          • Enterprise WeChat
          • Feishu
          • FtpFile
          • GoogleFirestore
          • Greenplum
          • Hbase
          • HdfsFile
          • Hive
          • Http
          • Hudi
          • Apache Iceberg
          • InfluxDB
          • IoTDB
          • JDBC
          • Kafka
          • Kingbase
          • Kudu
          • Maxcompute
          • Milvus
          • MongoDB
          • MySQL
          • Neo4j
          • ObsFile
          • OceanBase
          • Oracle
          • OssFile
          • OssJindoFile
          • Paimon
          • Phoenix
          • PostgreSql
          • Pulsar
          • Rabbitmq
          • Redis
          • Redshift
          • RocketMQ
          • S3Redshift
          • S3File
          • SelectDB Cloud
          • Sentry
          • SftpFile
          • Slack
          • Snowflake
          • Socket
          • SQL Server
          • StarRocks
          • TDengine
          • Tablestore
          • Vertica
        • Formats
          • Avro format
          • Canal Format
          • CDC Compatible Debezium-json
          • Debezium Format
          • Kafka source compatible kafka-connect-json
          • MaxWell Format
          • Ogg Format
        • Error Quick Reference Manual
      • Transform
        • Transform Common Options
        • Copy
        • FieldMapper
        • FilterRowKind
        • Filter
        • JsonPath
        • LLM
        • Replace
        • Split
        • SQL Functions
        • SQL
    • Integrations
      • Integrations Page
      • Creating Integrations Using Json
    • Run Integrations
      • Run Integrations Using Runners
      • Integration Versioning
  • Batch Processing/Storage with Maxim
    • Maxim Quickstart Guide
    • Maxim Elements
    • Queries
    • Run Queries
  • Orchestration with Routines
    • Routines Quickstart Guide
    • Routines Elements
    • Routines
    • Run Routines
  • Runners
    • Runners Page
    • Create a Runner to Run Applications
  • Security
    • Vaults
      • Vaults Page
      • Create Vaults
        • Runner-level Vaults
        • Application-level Vaults
      • Edit and Delete Vaults
      • 🚧Utilizing Vaults in Applications and Runners
    • Certificates
      • Certificates Page
      • 🚧Utilizing Certificates in Applications
      • 🟨Setting Up Security Settings
  • Monitoring Performance
    • Dashboard
    • Application Details
    • Runner Details
  • Logging
    • Log Types
  • Cost Management
    • SaaS
      • Pay-as-you-go
        • Hard Budget Cap
        • Soft Budget Cap
      • Subscriptions
    • On-prem
  • Organization Settings
    • General
    • Access Controls
      • User Roles and Privileges
    • Current Costs
    • Billing Addresses
    • Payment Accounts
    • Subscriptions
    • Pricing
    • Invoicing
  • User Settings
  • Troubleshooting
  • FAQs
Powered by GitBook
On this page
  • Support Doris Version​
  • Key Features​
  • Description​
  • Sink Options​
  • Data Type Mapping​
  • Task Example​
  1. Data Integration with Nexus
  2. Nexus Elements
  3. Connectors
  4. Sink

Doris

PreviousDingTalkNextDruid

Last updated 8 months ago

Doris sink connector

Support Doris Version

  • exactly-once & cdc supported Doris version is >= 1.1.x

  • Array data type supported Doris version is >= 1.2.x

  • Map data type will be support in Doris version is 2.x

Key Features

Description

Used to send data to Doris. Both support streaming and batch mode. The internal implementation of Doris sink connector is cached and imported by stream load in batches.

Sink Options

Name
Type
Required
Default
Description

fenodes

String

Yes

-

Doris cluster fenodes address, the format is "fe_ip:fe_http_port, ..."

query-port

int

No

9030

Doris Fenodes query_port

username

String

Yes

-

Doris user username

password

String

Yes

-

Doris user password

database

String

Yes

-

The database name of Doris table, use ${database_name} to represent the upstream table name

table

String

Yes

-

The table name of Doris table, use ${table_name} to represent the upstream table name

table.identifier

String

Yes

-

The name of Doris table, it will deprecate after version 2.3.5, please use database and table instead.

sink.label-prefix

String

Yes

-

The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of Nexus.

sink.enable-2pc

bool

No

false

sink.enable-delete

bool

No

-

sink.check-interval

int

No

10000

check exception with the interval while loading

sink.max-retries

int

No

3

the max retry times if writing records to database failed

sink.buffer-size

int

No

256 * 1024

the buffer size to cache data for stream load.

sink.buffer-count

int

No

3

the buffer count to cache data for stream load.

doris.batch.size

int

No

1024

the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server.

needs_unsupported_type_casting

boolean

No

false

Whether to enable the unsupported type casting, such as Decimal64 to Double

schema_save_mode

Enum

no

CREATE_SCHEMA_WHEN_NOT_EXIST

the schema save mode, please refer to schema_save_mode below

data_save_mode

Enum

no

APPEND_DATA

the data save mode, please refer to data_save_mode below

save_mode_create_template

string

no

see below

see below

custom_sql

String

no

-

When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.

doris.config

map

yes

-

This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats.

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction: RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. Option introduction: DROP_DATA: Preserve database structure and delete data APPEND_DATA:Preserve database structure, preserve data CUSTOM_PROCESSING:User defined processing ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported

We use templates to automatically create Doris tables, which will create corresponding table creation statements based on the type of upstream data and schema type, and the default template can be modified according to the situation.

Default template:

CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
 UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
)

If a custom field is filled in the template, such as adding an id field

CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(   
    id,
    ${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
    DISTRIBUTED BY HASH (${rowtype_primary_key})
    PROPERTIES
(
    "replication_num" = "1"
);

The connector will automatically obtain the corresponding type from the upstream to complete the filling, and remove the id field from rowtype_fields. This method can be used to customize the modification of field types and attributes.

You can use the following placeholders

  • database: Used to get the database in the upstream schema

  • table_name: Used to get the table name in the upstream schema

  • rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field description of Doris

  • rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)

  • rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)

  • rowtype_duplicate_key: Used to get the duplicate key in the upstream schema (only for doris source, maybe a list)

Doris Data Type
Nexus Data Type

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT TINYINT

INT

INT SMALLINT TINYINT

BIGINT

BIGINT INT SMALLINT TINYINT

LARGEINT

BIGINT INT SMALLINT TINYINT

FLOAT

FLOAT

DOUBLE

DOUBLE FLOAT

DECIMAL

DECIMAL DOUBLE FLOAT

DATE

DATE

DATETIME

TIMESTAMP

CHAR

STRING

VARCHAR

STRING

STRING

STRING

ARRAY

ARRAY

MAP

MAP

JSON

STRING

HLL

Not supported yet

BITMAP

Not supported yet

QUANTILE_STATE

Not supported yet

STRUCT

Not supported yet

The supported formats include CSV and JSON

The following example describes writing multiple data types to Doris, and users need to create corresponding tables downstream

env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  FakeSource {
    row.num = 10
    map.size = 10
    array.size = 10
    bytes.length = 10
    string.length = 10
    schema = {
      fields {
        c_map = "map<string, array<int>>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(16, 1)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    }
}

sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "test"
    table = "e2e_table_sink"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

This example defines a Nexus synchronization task that automatically generates data through FakeSource and sends it to Doris Sink,FakeSource simulates CDC data with schema, score (int type),Doris needs to create a table sink named test.e2e_table_sink and a corresponding table for it.

env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
        sex = boolean
        number = tinyint
        height = float
        sight = double
        create_time = date
        update_time = timestamp
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = INSERT
        fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = INSERT
        fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = DELETE
        fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      }
    ]
  }
}

sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "test"
    table = "e2e_table_sink"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}
sink {
    Doris {
        fenodes = "e2e_dorisdb:8030"
        username = root
        password = ""
        database = "test"
        table = "e2e_table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_json"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}
sink {
    Doris {
        fenodes = "e2e_dorisdb:8030"
        username = root
        password = ""
        database = "test"
        table = "e2e_table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_csv"
        doris.config = {
          format = "csv"
          column_separator = ","
        }
    }
}
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
    username = "root"
    password = "******"
    
    table-names = ["nexus.role","nexus.user","galileo.Bucket"]
  }
}

transform {
}

sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "${database_name}_test"
    table = "${table_name}_test"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Jdbc {
    driver = oracle.jdbc.driver.OracleDriver
    url = "jdbc:oracle:thin:@localhost:1521/XE"
    user = testUser
    password = testPassword

    table_list = [
      {
        table_path = "TESTSCHEMA.TABLE_1"
      },
      {
        table_path = "TESTSCHEMA.TABLE_2"
      }
    ]
  }
}

transform {
}

sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "${schema_name}_test"
    table = "${table_name}_test"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to .

Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this

schema_save_mode[Enum]

data_save_mode[Enum]

save_mode_create_template

Data Type Mapping

Supported import data formats

Task Example

Simple:

CDC(Change Data Capture) Event:

Use JSON format to import data

Use CSV format to import data

Multiple table

example1

example2

​
​
​
​
​
​
​
​
​
​
​
​
​
​
​
​
​
here
link