Paimon

Paimon sink connector

Description

Sink connector for Apache Paimon. It can support cdc mode 、auto create table.

Key features

Options

name
type
required
default value
Description

warehouse

String

Yes

-

Paimon warehouse path

catalog_type

String

No

filesystem

Catalog type of Paimon, support filesystem and hive

catalog_uri

String

No

-

Catalog uri of Paimon, only needed when catalog_type is hive

database

String

Yes

-

The database you want to access

table

String

Yes

-

The table you want to access

hdfs_site_path

String

No

-

The path of hdfs-site.xml

schema_save_mode

Enum

No

CREATE_SCHEMA_WHEN_NOT_EXIST

The schema save mode

data_save_mode

Enum

No

APPEND_DATA

The data save mode

paimon.table.primary-keys

String

No

-

Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields)

paimon.table.partition-keys

String

No

-

Default comma-separated list of partition fields to use when creating tables.

paimon.table.write-props

Map

No

-

Properties passed through to paimon table initialization, reference.

paimon.hadoop.conf

Map

No

-

Properties in hadoop conf

paimon.hadoop.conf-path

String

No

-

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

Examples

Single table

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
    username = "root"
    password = "******"
    table-names = ["nexus.role"]
  }
}

transform {
}

sink {
  Paimon {
    catalog_name="nexus_test"
    warehouse="file:///tmp/nexus/paimon/hadoop-sink/"
    database="nexus"
    table="role"
  }
}

Single table(Specify hadoop HA config and kerberos config)

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
    username = "root"
    password = "******"
    table-names = ["nexus.role"]
  }
}

transform {
}

sink {
  Paimon {
    catalog_name="nexus_test"
    warehouse="hdfs:///tmp/nexus/paimon/hadoop-sink/"
    database="nexus"
    table="role"
    paimon.hadoop.conf = {
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
      security.kerberos.login.principal = "your-kerberos-principal"
      security.kerberos.login.keytab = "your-kerberos-keytab-path"
    }
  }
}

Single table(Hive catalog)

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_1", 100]
      },
      {
        kind = DELETE
        fields = [2, "B", 100]
      }
    ]
  }
}

sink {
  Paimon {
    schema_save_mode = "RECREATE_SCHEMA"
    catalog_name="nexus_test"
    catalog_type="hive"
    catalog_uri="thrift://hadoop04:9083"
    warehouse="hdfs:///tmp/nexus"
    database="nexus_test"
    table="st_test3"
    paimon.hadoop.conf = {
      fs.defaultFS = "hdfs://nameservice1"
      dfs.nameservices = "nameservice1"
      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
      dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      dfs.client.use.datanode.hostname = "true"
    }
  }
}

Single table with write props of paimon

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
    username = "root"
    password = "******"
    table-names = ["nexus.role"]
  }
}

sink {
  Paimon {
    catalog_name="nexus_test"
    warehouse="file:///tmp/nexus/paimon/hadoop-sink/"
    database="nexus"
    table="role"
    paimon.table.write-props = {
        bucket = 2
        file.format = "parquet"
    }
    paimon.table.partition-keys = "dt"
    paimon.table.primary-keys = "pk_id,dt"
  }
}

Multiple table

example1

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  Mysql-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
    username = "root"
    password = "******"
    
    table-names = ["nexus.role","nexus.user","galileo.Bucket"]
  }
}

transform {
}

sink {
  Paimon {
    catalog_name="nexus_test"
    warehouse="file:///tmp/nexus/paimon/hadoop-sink/"
    database="${database_name}_test"
    table="${table_name}_test"
  }
}

example2

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Jdbc {
    driver = oracle.jdbc.driver.OracleDriver
    url = "jdbc:oracle:thin:@localhost:1521/XE"
    user = testUser
    password = testPassword

    table_list = [
      {
        table_path = "TESTSCHEMA.TABLE_1"
      },
      {
        table_path = "TESTSCHEMA.TABLE_2"
      }
    ]
  }
}

transform {
}

sink {
  Paimon {
    catalog_name="nexus_test"
    warehouse="file:///tmp/nexus/paimon/hadoop-sink/"
    database="${schema_name}_test"
    table="${table_name}_test"
  }
}

Last updated