SQL
SQL transform plugin
Description​
Use SQL to transform given input row.
SQL transform use memory SQL engine, we can via SQL functions and ability of SQL engine to implement the transform task.
Options​
source_table_name
string
yes
-
result_table_name
string
yes
-
query
string
yes
-
source_table_name [string]​
The source table name, the query SQL table name must match this field.
query [string]​
The query SQL, it's a simple SQL supported base function and criteria filter operation. But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like.
the query expression can be select [table_name.]column_a
to query the column that named column_a
. and the table name is optional.
or select c_row.c_inner_row.column_b
to query the inline struct column that named column_b
within c_row
column and c_inner_row
column. In this query expression, can't have table name.
Example​
The data read from source is a table like this:
1
Joy Ding
20
2
May Ding
21
3
Kin Dom
24
4
Joy Dom
22
We use SQL query to transform the source data like this:
transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
}
}
Then the data in result table fake1
will update to
1
Joy Ding_
21
2
May Ding_
22
3
Kin Dom_
25
4
Joy Dom_
23
Struct query​
if your upstream data schema is like this:
source {
FakeSource {
result_table_name = "fake"
row.num = 100
string.template = ["innerQuery"]
schema = {
fields {
name = "string"
c_date = "date"
c_row = {
c_inner_row = {
c_inner_int = "int"
c_inner_string = "string"
c_inner_timestamp = "timestamp"
c_map_1 = "map<string, string>"
c_map_2 = "map<string, map<string,string>>"
}
c_string = "string"
}
}
}
}
}
Those query all are valid:
select
name,
c_date,
c_row,
c_row.c_inner_row,
c_row.c_string,
c_row.c_inner_row.c_inner_int,
c_row.c_inner_row.c_inner_string,
c_row.c_inner_row.c_inner_timestamp,
c_row.c_inner_row.c_map_1,
c_row.c_inner_row.c_map_1.some_key
But this query are not valid:
select
c_row.c_inner_row.c_map_2.some_key.inner_map_key
The map must be the latest struct, can't query the nesting map.
Job Config Example​
env {
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
}
}
transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
}
}
sink {
Console {
source_table_name = "fake1"
}
}
Last updated