RocketMQ

RocketMQ source connector

Support Apache RocketMQ Version

  • 4.9.0 (Or a newer version, for reference)

Key Features

Description

Source connector for Apache RocketMQ.

Source Options

Name
Type
Required
Default
Description

topics

String

yes

-

RocketMQ topic name. If there are multiple topics, use , to split, for example: "tpc1,tpc2".

name.srv.addr

String

yes

-

RocketMQ name server cluster address.

acl.enabled

Boolean

no

false

If true, access control is enabled, and access key and secret key need to be configured.

access.key

String

no

secret.key

String

no

When ACL_ENABLED is true, secret key cannot be empty.

batch.size

int

no

100

RocketMQ consumer pull batch size

consumer.group

String

no

Nexus-Consumer-Group

RocketMQ consumer group id, used to distinguish different consumer groups.

commit.on.checkpoint

Boolean

no

true

If true the consumer's offset will be periodically committed in the background.

schema

no

-

The structure of the data, including field names and field types.

format

String

no

json

Data format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field.delimiter" option.

field.delimiter

String

no

,

Customize the field delimiter for data format

start.mode

String

no

CONSUME_FROM_GROUP_OFFSETS

The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]

start.mode.offsets

no

start.mode.timestamp

Long

no

The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".

partition.discovery.interval.millis

long

no

-1

The interval for dynamically discovering topics and partitions.

common-options

config

no

-

Source plugin common parameters, please refer to Source Common Options for details.

start.mode.offsets

The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".

for example:

start.mode.offsets = {
  topic1-0 = 70
  topic1-1 = 10
  topic1-2 = 10
}

Task Example

Simple:

Consumer reads Rocketmq data and prints it to the console type

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "rocketmq-e2e:9876"
    topics = "test_topic_json"
    result_table_name = "rocketmq_table"
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}

sink {
  Console {
  }
}

Specified format consumption Simple:

When I consume the topic data in json format parsing and pulling the number of bars each time is 400, the consumption starts from the original location

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    result_table_name = "rocketmq_table"
    start.mode = "CONSUME_FROM_FIRST_OFFSET"
    batch.size = "400"
    consumer.group = "test_topic_group"
    format = "json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}
sink {
  Console {
  }
}

Specified timestamp Simple:

This is to specify a time to consume, and I dynamically sense the existence of a new partition every 1000 milliseconds to pull the consumption

env {
  parallelism = 1
  spark.app.name = "Nexus"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    partition.discovery.interval.millis = "1000"
    start.mode.timestamp="1694508382000"
    consumer.group="test_topic_group"
    format="json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}

sink {
  Console {
  }
}

Last updated