Apache Iceberg
Apache Iceberg source connector
Support Iceberg Version​
1.4.2
Key features​
iceberg catalog
hadoop(2.7.1 , 2.7.5 , 3.1.3)
hive(2.3.9 , 3.1.2)
Description​
Source connector for Apache Iceberg. It can support batch and stream mode.
Supported DataSource Info​
Iceberg
hive-exec
Iceberg
libfb303
Data Type Mapping​
BOOLEAN
BOOLEAN
INTEGER
INT
LONG
BIGINT
FLOAT
FLOAT
DOUBLE
DOUBLE
DATE
DATE
TIME
TIME
TIMESTAMP
TIMESTAMP
STRING
STRING
FIXED BINARY
BYTES
DECIMAL
DECIMAL
STRUCT
ROW
LIST
ARRAY
MAP
MAP
Source Options​
catalog_name
string
yes
-
User-specified catalog name.
namespace
string
yes
-
The iceberg database name in the backend catalog.
table
string
yes
-
The iceberg table name in the backend catalog.
iceberg.catalog.config
map
yes
-
Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:"https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java"
hadoop.config
map
no
-
Properties passed through to the Hadoop configuration
iceberg.hadoop-conf-path
string
no
-
The specified loading paths for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files.
schema
config
no
-
Use projection to select data columns and columns order.
case_sensitive
boolean
no
false
If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
start_snapshot_timestamp
long
no
-
Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch
start_snapshot_id
long
no
-
Instructs this scan to look for changes starting from a particular snapshot (exclusive).
end_snapshot_id
long
no
-
Instructs this scan to look for changes up to a particular snapshot (inclusive).
use_snapshot_id
long
no
-
Instructs this scan to look for use the given snapshot ID.
use_snapshot_timestamp
long
no
-
Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch
stream_scan_strategy
enum
no
FROM_LATEST_SNAPSHOT
Starting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT
if don’t specify any value,The optional values are:
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.
common-options
no
-
Source plugin common parameters, please refer to Source Common Options for details.
Task Example​
Simple:​
env {
parallelism = 2
job.mode = "BATCH"
}
source {
Iceberg {
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
f5 = "float"
f6 = "double"
f7 = "date"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
f12 = "bytes"
f13 = "bytes"
f14 = "decimal(19,9)"
f15 = "array<int>"
f16 = "map<string, int>"
}
}
catalog_name = "nexus"
iceberg.catalog.config={
type = "hadoop"
warehouse = "file:///tmp/nexus/iceberg/hadoop/"
}
namespace = "database1"
table = "source"
result_table_name = "iceberg"
}
}
transform {
}
sink {
Console {
source_table_name = "iceberg"
}
}
Hadoop S3 Catalog:​
source {
iceberg {
catalog_name = "nexus"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="s3a://your_bucket/spark/warehouse/"
}
hadoop.config={
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "s3.cn-north-1.amazonaws.com.cn"
"fs.s3a.access.key" = "xxxxxxxxxxxxxxxxx"
"fs.s3a.secret.key" = "xxxxxxxxxxxxxxxxx"
"fs.defaultFS" = "s3a://your_bucket"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
result_table_name = "iceberg_test"
}
}
Hive Catalog:​
source {
Iceberg {
catalog_name = "nexus"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/nexus/iceberg/"
}
catalog_type = "hive"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
Column Projection:​
source {
Iceberg {
catalog_name = "nexus"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/nexus/iceberg/"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
}
Last updated