Kudu

Kudu source connector

Support Kudu Version​

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Key features​

Description​

Used to read data from Kudu.

The tested kudu version is 1.11.1.

Data Type Mapping​

kudu Data Type
Nexus Data Type

BOOL

BOOLEAN

INT8 INT16 INT32

INT

INT64

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

STRING

STRING

UNIXTIME_MICROS

TIMESTAMP

BINARY

BYTES

Source Options​

Name
Type
Required
Default
Description

kudu_masters

String

Yes

-

Kudu master address. Separated by ',',such as '192.168.88.110:7051'.

table_name

String

Yes

-

The name of kudu table.

client_worker_count

Int

No

2 * Runtime.getRuntime().availableProcessors()

Kudu worker count. Default value is twice the current number of cpu cores.

client_default_operation_timeout_ms

Long

No

30000

Kudu normal operation time out.

client_default_admin_operation_timeout_ms

Long

No

30000

Kudu admin operation time out.

enable_kerberos

Bool

No

false

Kerberos principal enable.

kerberos_principal

String

No

-

Kerberos principal. Note that all zeta nodes require have this file.

kerberos_keytab

String

No

-

Kerberos keytab. Note that all zeta nodes require have this file.

kerberos_krb5conf

String

No

-

Kerberos krb5 conf. Note that all zeta nodes require have this file.

scan_token_query_timeout

Long

No

30000

The timeout for connecting scan token. If not set, it will be the same as operationTimeout.

scan_token_batch_size_bytes

Int

No

1024 * 1024

Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB.

filter

Int

No

1024 * 1024

Kudu scan filter expressions,Not supported yet.

schema

Map

No

1024 * 1024

Nexus Schema.

table_list

Array

No

-

The list of tables to be read. you can use this configuration instead of table_path example: table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}]

common-options

No

-

Source plugin common parameters, please refer to Source Common Options for details.

Task Example​

Simple:​

The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"

# Defining the runtime environment
env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  kudu {
    kudu_masters = "kudu-master:7051"
    table_name = "kudu_source_table"
    result_table_name = "kudu"
    enable_kerberos = true
    kerberos_principal = "[email protected]"
    kerberos_keytab = "xx.keytab"
  }
}

transform {
}

sink {
  console {
    source_table_name = "kudu"
  }

  kudu {
    source_table_name = "kudu"
    kudu_masters = "kudu-master:7051"
    table_name = "kudu_sink_table"
    enable_kerberos = true
    kerberos_principal = "[email protected]"
    kerberos_keytab = "xx.keytab"
  }
}

Multiple Table​

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  kudu{
   kudu_masters = "kudu-master:7051"
   table_list = [
   {
    table_name = "kudu_source_table_1"
   },{
    table_name = "kudu_source_table_2"
   }
   ]
   result_table_name = "kudu"
}
}

transform {
}

sink {
  Assert {
    rules {
      table-names = ["kudu_source_table_1", "kudu_source_table_2"]
    }
  }
}

Last updated