Apache Iceberg

Apache Iceberg source connector

Support Iceberg Version​

  • 1.4.2

Key features​

Description​

Source connector for Apache Iceberg. It can support batch and stream mode.

Supported DataSource Info​

Datasource
Dependent

Iceberg

hive-exec

Iceberg

libfb303

Data Type Mapping​

Iceberg Data type
Nexus Data type

BOOLEAN

BOOLEAN

INTEGER

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

STRING

STRING

FIXED BINARY

BYTES

DECIMAL

DECIMAL

STRUCT

ROW

LIST

ARRAY

MAP

MAP

Source Options​

Name
Type
Required
Default
Description

catalog_name

string

yes

-

User-specified catalog name.

namespace

string

yes

-

The iceberg database name in the backend catalog.

table

string

yes

-

The iceberg table name in the backend catalog.

iceberg.catalog.config

map

yes

-

Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java"

hadoop.config

map

no

-

Properties passed through to the Hadoop configuration

iceberg.hadoop-conf-path

string

no

-

The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files.

schema

config

no

-

Use projection to select data columns and columns order.

case_sensitive

boolean

no

false

If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.

start_snapshot_timestamp

long

no

-

Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch

start_snapshot_id

long

no

-

Instructs this scan to look for changes starting from a particular snapshot (exclusive).

end_snapshot_id

long

no

-

Instructs this scan to look for changes up to a particular snapshot (inclusive).

use_snapshot_id

long

no

-

Instructs this scan to look for use the given snapshot ID.

use_snapshot_timestamp

long

no

-

Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch

stream_scan_strategy

enum

no

FROM_LATEST_SNAPSHOT

Starting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT if don’t specify any value,The optional values are: TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.

common-options

no

-

Source plugin common parameters, please refer to Source Common Options for details.

Task Example​

Simple:​

env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  Iceberg {
    schema {
      fields {
        f2 = "boolean"
        f1 = "bigint"
        f3 = "int"
        f4 = "bigint"
        f5 = "float"
        f6 = "double"
        f7 = "date"
        f9 = "timestamp"
        f10 = "timestamp"
        f11 = "string"
        f12 = "bytes"
        f13 = "bytes"
        f14 = "decimal(19,9)"
        f15 = "array<int>"
        f16 = "map<string, int>"
      }
    }
    catalog_name = "nexus"
    iceberg.catalog.config={
      type = "hadoop"
      warehouse = "file:///tmp/nexus/iceberg/hadoop/"
    }
    namespace = "database1"
    table = "source"
    result_table_name = "iceberg"
  }
}

transform {
}

sink {
  Console {
    source_table_name = "iceberg"
  }
}

Hadoop S3 Catalog:​

source {
  iceberg {
    catalog_name = "nexus"
    iceberg.catalog.config={
      "type"="hadoop"
      "warehouse"="s3a://your_bucket/spark/warehouse/"
    }
    hadoop.config={
      "fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
      "fs.s3a.endpoint" = "s3.cn-north-1.amazonaws.com.cn"
      "fs.s3a.access.key" = "xxxxxxxxxxxxxxxxx"
      "fs.s3a.secret.key" = "xxxxxxxxxxxxxxxxx"
      "fs.defaultFS" = "s3a://your_bucket"
    }
    namespace = "your_iceberg_database"
    table = "your_iceberg_table"
    result_table_name = "iceberg_test"
  }
}

Hive Catalog:​

source {
  Iceberg {
    catalog_name = "nexus"
    iceberg.catalog.config={
      type = "hive"
      uri = "thrift://localhost:9083"
      warehouse = "hdfs://your_cluster//tmp/nexus/iceberg/"
    }
    catalog_type = "hive"
    
    namespace = "your_iceberg_database"
    table = "your_iceberg_table"
  }
}

Column Projection:​

source {
  Iceberg {
    catalog_name = "nexus"
    iceberg.catalog.config={
      type = "hadoop"
      warehouse = "hdfs://your_cluster/tmp/nexus/iceberg/"
    }
    namespace = "your_iceberg_database"
    table = "your_iceberg_table"

    schema {
      fields {
        f2 = "boolean"
        f1 = "bigint"
        f3 = "int"
        f4 = "bigint"
      }
    }
  }
}

Last updated