# Kafka

> Kafka source connector

### Key Features[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#key-features) <a href="#key-features" id="key-features"></a>

* [x] &#x20;batch
* [x] &#x20;stream
* [x] &#x20;exactly-once
* [ ] &#x20;column projection
* [x] &#x20;parallelism
* [ ] &#x20;support user-defined split

### Description[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#description) <a href="#description" id="description"></a>

Source connector for Apache Kafka.

### Source Options[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#source-options) <a href="#source-options" id="source-options"></a>

| Name                                | Type                                                                               | Required | Default              | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| ----------------------------------- | ---------------------------------------------------------------------------------- | -------- | -------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| topic                               | String                                                                             | Yes      | -                    | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'.                                                                                                                                                                                                                                                                                                                                                                                                           |
| table\_list                         | Map                                                                                | No       | -                    | Topic list config You can configure only one `table_list` and one `topic` at the same time                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| bootstrap.servers                   | String                                                                             | Yes      | -                    | Comma separated list of Kafka brokers.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| pattern                             | Boolean                                                                            | No       | false                | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer.                                                                                                                                                                                                                                                                                                                                                        |
| consumer.group                      | String                                                                             | No       | Nexus-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| commit\_on\_checkpoint              | Boolean                                                                            | No       | true                 | If true the consumer's offset will be periodically committed in the background.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| kafka.config                        | Map                                                                                | No       | -                    | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs).                                                                                                                                                                                                                                              |
| schema                              | Config                                                                             | No       | -                    | The structure of the data, including field names and field types.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| format                              | String                                                                             | No       | json                 | Data format. The default format is json. Optional text format, canal\_json, debezium\_json, ogg\_json and avro.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field\_delimiter" option.If you use canal format, please refer to [Canal Format](/data-integration-with-nexus/nexus-elements/connectors/formats/canal-format.md) for details.If you use debezium format, please refer to [Debezium Format](/data-integration-with-nexus/nexus-elements/connectors/formats/debezium-format.md) for details. |
| format\_error\_handle\_way          | String                                                                             | No       | fail                 | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data.                                                                                                                                                                                                                                                                                                  |
| field\_delimiter                    | String                                                                             | No       | ,                    | Customize the field delimiter for data format.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| start\_mode                         | StartMode\[earliest],\[group\_offsets],\[latest],\[specific\_offsets],\[timestamp] | No       | group\_offsets       | The initial consumption pattern of consumers.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| start\_mode.offsets                 | Config                                                                             | No       | -                    | The offset required for consumption mode to be specific\_offsets.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| start\_mode.timestamp               | Long                                                                               | No       | -                    | The time required for consumption mode to be "timestamp".                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| partition-discovery.interval-millis | Long                                                                               | No       | -1                   | The interval for dynamically discovering topics and partitions.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
| common-options                      |                                                                                    | No       | -                    | Source plugin common parameters, please refer to [Source Common Options](/data-integration-with-nexus/nexus-elements/connectors/source/source-common-options.md) for details                                                                                                                                                                                                                                                                                                                                                                                            |

### Task Example[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#task-example) <a href="#task-example" id="task-example"></a>

#### Simple[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#simple) <a href="#simple" id="simple"></a>

> This example reads the data of kafka's topic\_1, topic\_2, topic\_3 and prints it to the client.

```
# Defining the runtime environment
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  Kafka {
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
    format = text
    field_delimiter = "#"
    topic = "topic_1,topic_2,topic_3"
    bootstrap.servers = "localhost:9092"
    kafka.config = {
      client.id = client_1
      max.poll.records = 500
      auto.offset.reset = "earliest"
      enable.auto.commit = "false"
    }
  }  
}
sink {
  Console {}
}
```

#### Regex Topic[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#regex-topic) <a href="#regex-topic" id="regex-topic"></a>

```
source {
    Kafka {
          topic = ".*nexus*."
          pattern = "true" 
          bootstrap.servers = "localhost:9092"
          consumer.group = "nexus_group"
    }
}
```

#### AWS MSK SASL/SCRAM[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#aws-msk-saslscram) <a href="#aws-msk-saslscram" id="aws-msk-saslscram"></a>

Replace the following `${username}` and `${password}` with the configuration values in AWS MSK.

```
source {
    Kafka {
        topic = "nexus"
        bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096"
        consumer.group = "nexus_group"
        kafka.config = {
            security.protocol=SASL_SSL
            sasl.mechanism=SCRAM-SHA-512
            sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
            #security.protocol=SASL_SSL
            #sasl.mechanism=AWS_MSK_IAM
            #sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
            #sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }
    }
}
```

#### AWS MSK IAM[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#aws-msk-iam) <a href="#aws-msk-iam" id="aws-msk-iam"></a>

Please ensure the IAM policy have `"kafka-cluster:Connect",`. Like this:

```
"Effect": "Allow",
"Action": [
    "kafka-cluster:Connect",
    "kafka-cluster:AlterCluster",
    "kafka-cluster:DescribeCluster"
],
```

Source Config

```
source {
    Kafka {
        topic = "nexus"
        bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098"
        consumer.group = "nexus_group"
        kafka.config = {
            #security.protocol=SASL_SSL
            #sasl.mechanism=SCRAM-SHA-512
            #sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
            security.protocol=SASL_SSL
            sasl.mechanism=AWS_MSK_IAM
            sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
            sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
        }
    }
}
```

#### Kerberos Authentication Example[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#kerberos-authentication-example) <a href="#kerberos-authentication-example" id="kerberos-authentication-example"></a>

Source Config

```
source {
    Kafka {
        topic = "nexus"
        bootstrap.servers = "127.0.0.1:9092"
        consumer.group = "nexus_group"
        kafka.config = {
            security.protocol=SASL_PLAINTEXT
            sasl.kerberos.service.name=kafka
            sasl.mechanism=GSSAPI
            java.security.krb5.conf="/etc/krb5.conf"
            sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n        useKeyTab=true \n        storeKey=true  \n        keyTab=\"/path/to/xxx.keytab\" \n        principal=\"user@xxx.com\";"
        }
    }
}
```

#### Multiple Kafka Source[​](https://seatunnel.apache.org/docs/2.3.7/connector-v2/source/kafka#multiple-kafka-source) <a href="#multiple-kafka-source" id="multiple-kafka-source"></a>

> This is written to the same pg table according to different formats and topics of parsing kafka Perform upsert operations based on the id

```

env {
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  Kafka {
    bootstrap.servers = "kafka_e2e:9092"
    table_list = [
      {
        topic = "^test-ogg-sou.*"
        pattern = "true"
        consumer.group = "ogg_multi_group"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = ogg_json
      },
      {
        topic = "test-cdc_mds"
        start_mode = earliest
        schema = {
          fields {
            id = "int"
            name = "string"
            description = "string"
            weight = "string"
          }
        },
        format = canal_json
      }
    ]
  }
}

sink {
  Jdbc {
    driver = org.postgresql.Driver
    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
    user = test
    password = test
    generate_sink_sql = true
    database = test
    table = public.sink
    primary_keys = ["id"]
  }
}
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.selfuel.digital/data-integration-with-nexus/nexus-elements/connectors/source/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
