Skip to content

[Feature][S3-Sink] Improve maxwell_json,canal_json,debezium_json format add ts_ms and [source] #9675

@wubx

Description

@wubx

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

I have test the main use


source {
  MySQL-CDC {
   base-url="jdbc:mysql://192.168.1.100:3306/wubx"
   username="wubx"
   password="wubxwubx"
   table-names=["wubx.t01"]
   startup.mode="initial"
  }
}

sink {
  S3File {
    bucket = "s3a://mystage"
    tmp_path = "/tmp/SeaTunnel/${table_name}"
    path="/mysql/${table_name}"
    fs.s3a.endpoint="http://192.168.1.100:9900"
    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
    access_key = "minioadmin"
    secret_key = "minioadmin"
    file_format_type="maxwell_json"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    data_save_mode="APPEND_DATA"
  }
}

./bin/seatunnel.sh --config ./config/v2.mysql.streaming.s3.conf -m local

SQL

update t01 set c1='debezium_json' where id=7;

debezium format

{"before":{"id":7,"c1":"test"},"after":null,"op":"d"}
{"before":null,"after":{"id":7,"c1":"debezium_json"},"op":"c"}

maxwell format
update t01 set c1='debezium_json10' where id=7;

{"data":{"id":7,"c1":"debezium_json"},"type":"DELETE"}
{"data":{"id":7,"c1":"debezium_json10"},"type":"INSERT"}

From the usage, a time field(ts_ms) is required in this format to indicate the generation time of the record. This makes it convenient to obtain the status of the last record.

Full debezium json , source section is also very useful. If the file contains sources that are useful for batch Change Data Capture (CDC) for multiple tables.

{
    "before": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter ",
        "weight": 5.18
    },
    "after": {
        "id": 111,
        "name": "scooter",
        "description": "Big 2-wheel scooter ",
        "weight": 5.17
    },
    "source": {
        "version": "1.1.1.Final",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 1589362330000,
        "snapshot": "false",
        "db": "inventory",
        "table": "products",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 2090,
        "row": 0,
        "thread": 2,
        "query": null
    },
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
}

Databend /Snowfalke use as

merge into  products a
using (
	select
		ifnull(after.id::int, before:id::int) id,
		after:name::varchar name,
		after:description::varchar description,
		after:weight::decima(10, 2) weight,
		ts_ms,
		op
	from
		products_raw_stream qualify row_number() over(partition by id order by ts_ms desc)= 1) b 
on a.id = b.id
when not matched THEN
insert (id,name,description,weight) values(b.id, b.name, b.description, b.weight)
when matched and b.op = 'u' THEN
update set 
a.name = b.name ,a.description = b.description,a.weight = b.weight
when matached and b.op = 'd' then 
delete;

maxwall format

  mysql> update `test`.`maxwell` set mycol = 55, daemon = 'Stanislaw Lem';
  maxwell -> kafka: 
  {
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786310,
    "data": { "id":1, "daemon": "Stanislaw Lem", "mycol": 55 },
    "old": { "mycol":, 23, "daemon": "what once was" }
  }

It is recommended to add the "ts" field at least.If you have obtained the other information, it is recommended to keep it.

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions