# RocketMQ

> RocketMQ source connector

### Support Apache RocketMQ Version[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#support-apache-rocketmq-version) <a href="#support-apache-rocketmq-version" id="support-apache-rocketmq-version"></a>

* 4.9.0 (Or a newer version, for reference)

### Key Features[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#key-features) <a href="#key-features" id="key-features"></a>

* [x] &#x20;batch
* [x] &#x20;stream
* [x] &#x20;exactly-once
* [ ] &#x20;column projection
* [x] &#x20;parallelism
* [ ] &#x20;support user-defined split

### Description[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#description) <a href="#description" id="description"></a>

Source connector for Apache RocketMQ.

### Source Options[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#source-options) <a href="#source-options" id="source-options"></a>

| Name                                | Type    | Required | Default                       | Description                                                                                                                                                                                                                           |
| ----------------------------------- | ------- | -------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| topics                              | String  | yes      | -                             | `RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`.                                                                                                                                  |
| name.srv.addr                       | String  | yes      | -                             | `RocketMQ` name server cluster address.                                                                                                                                                                                               |
| acl.enabled                         | Boolean | no       | false                         | If true, access control is enabled, and access key and secret key need to be configured.                                                                                                                                              |
| access.key                          | String  | no       |                               |                                                                                                                                                                                                                                       |
| secret.key                          | String  | no       |                               | When ACL\_ENABLED is true, secret key cannot be empty.                                                                                                                                                                                |
| batch.size                          | int     | no       | 100                           | `RocketMQ` consumer pull batch size                                                                                                                                                                                                   |
| consumer.group                      | String  | no       | Nexus-Consumer-Group          | `RocketMQ consumer group id`, used to distinguish different consumer groups.                                                                                                                                                          |
| commit.on.checkpoint                | Boolean | no       | true                          | If true the consumer's offset will be periodically committed in the background.                                                                                                                                                       |
| schema                              |         | no       | -                             | The structure of the data, including field names and field types.                                                                                                                                                                     |
| format                              | String  | no       | json                          | Data format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field.delimiter" option.                                                                   |
| field.delimiter                     | String  | no       | ,                             | Customize the field delimiter for data format                                                                                                                                                                                         |
| start.mode                          | String  | no       | CONSUME\_FROM\_GROUP\_OFFSETS | The initial consumption pattern of consumers,there are several types: \[CONSUME\_FROM\_LAST\_OFFSET],\[CONSUME\_FROM\_FIRST\_OFFSET],\[CONSUME\_FROM\_GROUP\_OFFSETS],\[CONSUME\_FROM\_TIMESTAMP],\[CONSUME\_FROM\_SPECIFIC\_OFFSETS] |
| start.mode.offsets                  |         | no       |                               |                                                                                                                                                                                                                                       |
| start.mode.timestamp                | Long    | no       |                               | The time required for consumption mode to be "CONSUME\_FROM\_TIMESTAMP".                                                                                                                                                              |
| partition.discovery.interval.millis | long    | no       | -1                            | The interval for dynamically discovering topics and partitions.                                                                                                                                                                       |
| common-options                      | config  | no       | -                             | Source plugin common parameters, please refer to [Source Common Options](/data-integration-with-nexus/nexus-elements/connectors/source/source-common-options.md) for details.                                                         |

#### start.mode.offsets[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#startmodeoffsets) <a href="#startmodeoffsets" id="startmodeoffsets"></a>

The offset required for consumption mode to be "CONSUME\_FROM\_SPECIFIC\_OFFSETS".

for example:

```
start.mode.offsets = {
  topic1-0 = 70
  topic1-1 = 10
  topic1-2 = 10
}
```

### Task Example[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#task-example) <a href="#task-example" id="task-example"></a>

#### Simple:[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#simple) <a href="#simple" id="simple"></a>

> Consumer reads Rocketmq data and prints it to the console type

```
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "rocketmq-e2e:9876"
    topics = "test_topic_json"
    result_table_name = "rocketmq_table"
    schema = {
      fields {
        id = bigint
        c_map = "map<string, smallint>"
        c_array = "array<tinyint>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(2, 1)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}

sink {
  Console {
  }
}
```

#### Specified format consumption Simple:[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#specified-format-consumption-simple) <a href="#specified-format-consumption-simple" id="specified-format-consumption-simple"></a>

> When I consume the topic data in json format parsing and pulling the number of bars each time is 400, the consumption starts from the original location

```
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    result_table_name = "rocketmq_table"
    start.mode = "CONSUME_FROM_FIRST_OFFSET"
    batch.size = "400"
    consumer.group = "test_topic_group"
    format = "json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}
sink {
  Console {
  }
}
```

#### Specified timestamp Simple:[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/RocketMQ#specified-timestamp-simple) <a href="#specified-timestamp-simple" id="specified-timestamp-simple"></a>

> This is to specify a time to consume, and I dynamically sense the existence of a new partition every 1000 milliseconds to pull the consumption

```
env {
  parallelism = 1
  spark.app.name = "Nexus"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
  job.mode = "BATCH"
}

source {
  Rocketmq {
    name.srv.addr = "localhost:9876"
    topics = "test_topic"
    partition.discovery.interval.millis = "1000"
    start.mode.timestamp="1694508382000"
    consumer.group="test_topic_group"
    format="json"
    format = json
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

transform {
  # If you would like to get more information about how to configure Nexus and see full list of transform plugins,
  # please go to transform page
}

sink {
  Console {
  }
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.selfuel.digital/data-integration-with-nexus/nexus-elements/connectors/source/rocketmq.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
