InfluxDB
InfluxDB sink connector
Description
Write data to InfluxDB.
Key features
Options
url
string
yes
-
database
string
yes
measurement
string
yes
username
string
no
-
password
string
no
-
key_time
string
no
processing time
key_tags
array
no
exclude field
& key_time
batch_size
int
no
1024
max_retries
int
no
-
retry_backoff_multiplier_ms
int
no
-
connect_timeout_ms
long
no
15000
common-options
config
no
-
url
the url to connect to influxDB e.g.
http://influxdb-host:8086
database [string]
The name of influxDB
database
measurement [string]
The name of influxDB
measurement
username [string]
influxDB
user username
password [string]
influxDB
user password
key_time [string]
Specify field-name of the influxDB
measurement timestamp in NexusRow. If not specified, use processing-time as timestamp
key_tags [array]
Specify field-name of the influxDB
measurement tags in NexusRow. If not specified, include all fields with influxDB
measurement field
batch_size [int]
For batch writing, when the number of buffers reaches the number of batch_size
or the time reaches checkpoint.interval
, the data will be flushed into the influxDB
max_retries [int]
The number of retries to flush failed
retry_backoff_multiplier_ms [int]
Using as a multiplier for generating the next delay for backoff
max_retry_backoff_ms [int]
The amount of time to wait before attempting to retry a request to influxDB
connect_timeout_ms [long]
the timeout for connecting to InfluxDB, in milliseconds
common options
Sink plugin common parameters, please refer to Sink Common Options for details
Examples
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}
Multiple table
example1
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/nexus"
username = "root"
password = "******"
table-names = ["nexus.role","nexus.user","galileo.Bucket"]
}
}
transform {
}
sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "${table_name}_test"
}
}
Last updated