# MaxWell Format

[Maxwell](https://maxwells-daemon.io/) is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into Kafka, Kinesis and other streaming connectors. Maxwell provides a unified format schema for changelog and supports to serialize messages using JSON.

Nexus supports to interpret MaxWell JSON messages as INSERT/UPDATE/DELETE messages into Nexus system. This is useful in many cases to leverage this feature, such as

```
    synchronizing incremental data from databases to other systems
    auditing logs
    real-time materialized views on databases
    temporal join changing history of a database table and so on.
```

Nexus also supports to encode the INSERT/UPDATE/DELETE messages in Nexus as MaxWell JSON messages, and emit to storage like Kafka. However, currently Nexus can’t combine UPDATE\_BEFORE and UPDATE\_AFTER into a single UPDATE message. Therefore, Nexus encodes UPDATE\_BEFORE and UPDATE\_AFTER as DELETE and INSERT MaxWell messages.

## Format Options

| Option                            | Default | Required | Description                                                                                                                                                                                                  |
| --------------------------------- | ------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| format                            | (none)  | yes      | Specify what format to use, here should be 'maxwell\_json'.                                                                                                                                                  |
| maxwell\_json.ignore-parse-errors | false   | no       | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.                                                                                                         |
| maxwell\_json.database.include    | (none)  | no       | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the MaxWell record. The pattern string is compatible with Java's Pattern. |
| maxwell\_json.table.include       | (none)  | no       | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the MaxWell record. The pattern string is compatible with Java's Pattern.       |

## How To Use MaxWell format

### Kafka Uses Example[​](https://seatunnel.apache.org/docs/connector-v2/formats/maxwell-json#kafka-uses-example) <a href="#kafka-uses-example" id="kafka-uses-example"></a>

MaxWell provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:

```
{
    "database":"test",
    "table":"product",
    "type":"insert",
    "ts":1596684904,
    "xid":7201,
    "commit":true,
    "data":{
        "id":111,
        "name":"scooter",
        "description":"Big 2-wheel scooter ",
        "weight":5.18
    },
    "primary_key_columns":[
        "id"
    ]
}
```

Note: please refer to MaxWell documentation about the meaning of each fields.

The MySQL products table has 4 columns (id, name, description and weight). The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15. Assuming the messages have been synchronized to Kafka topic products\_binlog, then we can use the following Nexus to consume this topic and interpret the change events.

```
env {
    execution.parallelism = 1
    job.mode = "BATCH"
}

source {
  Kafka {
    bootstrap.servers = "kafkaCluster:9092"
    topic = "products_binlog"
    result_table_name = "kafka_name"
    start_mode = earliest
    schema = {
      fields {
           id = "int"
           name = "string"
           description = "string"
           weight = "string"
      }
    },
    format = maxwell_json
  }

}

transform {
}

sink {
  Kafka {
    bootstrap.servers = "localhost:9092"
    topic = "consume-binlog"
    format = maxwell_json
  }
}
```

<br>
