Skip to content

Commit fb8444b

Browse files
authored
[Feature][Format] Improve maxwell_json,canal_json,debezium_json format add ts_ms and table (#9701)
1 parent 7f756b1 commit fb8444b

File tree

19 files changed

+414
-217
lines changed

19 files changed

+414
-217
lines changed

docs/en/transform-v2/metadata.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ Metadata transform plugin for adding metadata fields to data
77

88
## Available Metadata
99

10-
| Key | DataType | Description |
11-
|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
12-
| Database | string | Name of the table that contain the row. |
13-
| Table | string | Name of the table that contain the row. |
14-
| RowKind | string | The type of operation |
15-
| EventTime | Long | The time at which the connector processed the event. |
16-
| Delay | Long | The difference between data extraction time and database change time |
17-
| Partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join |
10+
| Key | DataType | Description |
11+
|:---------:|:--------:|:---------------------------------------------------------------------------------------------------------|
12+
| Database | string | Name of the table that contain the row. |
13+
| Table | string | Name of the table that contain the row. |
14+
| RowKind | string | The type of operation |
15+
| EventTime | Long | The time at which the connector processed the event.And the data should be milliseconds |
16+
| Delay | Long | The difference between data extraction time and database change time.And the data should be milliseconds |
17+
| Partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join |
1818

1919
### note
2020
`Delay` `EventTime` only worked on cdc series connectors for now , except TiDB-CDC

docs/zh/transform-v2/metadata.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77

88
## 支持的元数据
99

10-
| Key | DataType | Description |
11-
|:---------:|:--------:|:-----------------------:|
12-
| Database | string | 包含该行的数据库名 |
13-
| Table | string | 包含该行的数表名 |
14-
| RowKind | string | 行类型 |
15-
| EventTime | Long | |
16-
| Delay | Long | 数据抽取时间与数据库变更时间的差 |
17-
| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
10+
| Key | DataType | Description |
11+
|:---------:|:--------:|:-----------------------------:|
12+
| Database | string | 包含该行的数据库名 |
13+
| Table | string | 包含该行的数表名 |
14+
| RowKind | string | 行类型 |
15+
| EventTime | Long | 该行的对应的数据时间,统一格式是到毫秒的时间戳 |
16+
| Delay | Long | 数据抽取时间与数据库变更时间的差,统一格式是到毫秒的时间戳 |
17+
| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |
1818

1919
### 注意事项
2020
`Delay` `EventTime`目前只适用于cdc系列连接器,TiDB-CDC除外

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@ public enum CommonOptions {
4949
ROW_KIND("RowKind", true),
5050
/**
5151
* The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME value of the row value.
52+
* And the data should be milliseconds.
5253
*/
5354
EVENT_TIME("EventTime", true),
54-
/** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. */
55+
/**
56+
* The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. And
57+
* the data should be milliseconds.
58+
*/
5559
DELAY("Delay", true);
5660

5761
private final String name;

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public SeaTunnelRow copy() {
9595
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
9696
newRow.setRowKind(this.getRowKind());
9797
newRow.setTableId(this.getTableId());
98+
newRow.setOptions(this.getOptions());
9899
return newRow;
99100
}
100101

@@ -106,6 +107,7 @@ public SeaTunnelRow copy(int[] indexMapping) {
106107
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
107108
newRow.setRowKind(this.getRowKind());
108109
newRow.setTableId(this.getTableId());
110+
newRow.setOptions(this.getOptions());
109111
return newRow;
110112
}
111113

seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/LocalFileTest.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import java.util.List;
4949
import java.util.Map;
5050

51+
import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
52+
5153
@DisabledOnOs(
5254
value = OS.WINDOWS,
5355
disabledReason =
@@ -346,24 +348,33 @@ void testCanalJsonSink() throws IOException {
346348
Collections.emptyList(),
347349
"comment");
348350

351+
Map<String, Object> rowOptions = new HashMap<>();
352+
rowOptions.put(EVENT_TIME.getName(), 1L);
353+
349354
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
350355
row1.setRowKind(RowKind.INSERT);
351356
row1.setTableId(TablePath.DEFAULT.getFullName());
357+
row1.setOptions(rowOptions);
352358
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
353359
row2.setRowKind(RowKind.INSERT);
354360
row2.setTableId(TablePath.DEFAULT.getFullName());
361+
row2.setOptions(rowOptions);
355362
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
356363
row3.setRowKind(RowKind.INSERT);
357364
row3.setTableId(TablePath.DEFAULT.getFullName());
365+
row3.setOptions(rowOptions);
358366
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, "A", 100});
359367
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
360368
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
369+
row1UpdateBefore.setOptions(rowOptions);
361370
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, "A_1", 100});
362371
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
363372
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
373+
row1UpdateAfter.setOptions(rowOptions);
364374
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 100});
365375
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
366376
row2Delete.setRowKind(RowKind.DELETE);
377+
row2Delete.setOptions(rowOptions);
367378

368379
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
369380
catalogTable,
@@ -379,22 +390,22 @@ void testCanalJsonSink() throws IOException {
379390
String dataStr = FileUtils.readFileToStr(path);
380391
Assertions.assertTrue(
381392
dataStr.contains(
382-
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\"}"));
393+
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
383394
Assertions.assertTrue(
384395
dataStr.contains(
385-
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\"}"));
396+
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
386397
Assertions.assertTrue(
387398
dataStr.contains(
388-
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\"}"));
399+
"{\"data\":[{\"a\":3,\"b\":\"C\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
389400
Assertions.assertTrue(
390401
dataStr.contains(
391-
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\"}"));
402+
"{\"data\":[{\"a\":1,\"b\":\"A\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
392403
Assertions.assertTrue(
393404
dataStr.contains(
394-
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\"}"));
405+
"{\"data\":[{\"a\":1,\"b\":\"A_1\",\"c\":100}],\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
395406
Assertions.assertTrue(
396407
dataStr.contains(
397-
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\"}"));
408+
"{\"data\":[{\"a\":2,\"b\":\"B\",\"c\":100}],\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
398409
}
399410

400411
@Test
@@ -431,24 +442,33 @@ void testDebeziumJsonSink() throws IOException {
431442
Collections.emptyList(),
432443
"comment");
433444

445+
Map<String, Object> rowOptions = new HashMap<>();
446+
rowOptions.put(EVENT_TIME.getName(), 1L);
447+
434448
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
435449
row1.setRowKind(RowKind.INSERT);
436450
row1.setTableId(TablePath.DEFAULT.getFullName());
451+
row1.setOptions(rowOptions);
437452
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
438453
row2.setRowKind(RowKind.INSERT);
439454
row2.setTableId(TablePath.DEFAULT.getFullName());
455+
row2.setOptions(rowOptions);
440456
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
441457
row3.setRowKind(RowKind.INSERT);
442458
row3.setTableId(TablePath.DEFAULT.getFullName());
459+
row3.setOptions(rowOptions);
443460
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, "A", 100});
444461
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
445462
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
463+
row1UpdateBefore.setOptions(rowOptions);
446464
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, "A_1", 100});
447465
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
448466
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
467+
row1UpdateAfter.setOptions(rowOptions);
449468
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 100});
450469
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
451470
row2Delete.setRowKind(RowKind.DELETE);
471+
row2Delete.setOptions(rowOptions);
452472

453473
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
454474
catalogTable,
@@ -464,22 +484,22 @@ void testDebeziumJsonSink() throws IOException {
464484
String dataStr = FileUtils.readFileToStr(path);
465485
Assertions.assertTrue(
466486
dataStr.contains(
467-
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\"}"));
487+
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
468488
Assertions.assertTrue(
469489
dataStr.contains(
470-
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\"}"));
490+
"{\"before\":null,\"after\":{\"a\":2,\"b\":\"B\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
471491
Assertions.assertTrue(
472492
dataStr.contains(
473-
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\"}"));
493+
"{\"before\":null,\"after\":{\"a\":3,\"b\":\"C\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
474494
Assertions.assertTrue(
475495
dataStr.contains(
476-
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
496+
"{\"before\":{\"a\":1,\"b\":\"A\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
477497
Assertions.assertTrue(
478498
dataStr.contains(
479-
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\"}"));
499+
"{\"before\":null,\"after\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"op\":\"c\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
480500
Assertions.assertTrue(
481501
dataStr.contains(
482-
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\"}"));
502+
"{\"before\":{\"a\":2,\"b\":\"B\",\"c\":100},\"after\":null,\"op\":\"d\",\"source\":{\"schema\":\"default\",\"database\":\"default\",\"table\":\"default\"},\"ts_ms\":1}"));
483503
}
484504

485505
@Test
@@ -515,25 +535,33 @@ void testMaxWellJsonSink() throws IOException {
515535
Collections.emptyMap(),
516536
Collections.emptyList(),
517537
"comment");
538+
Map<String, Object> rowOptions = new HashMap<>();
539+
rowOptions.put(EVENT_TIME.getName(), 1L);
518540

519541
SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100});
520542
row1.setRowKind(RowKind.INSERT);
521543
row1.setTableId(TablePath.DEFAULT.getFullName());
544+
row1.setOptions(rowOptions);
522545
SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100});
523546
row2.setRowKind(RowKind.INSERT);
524547
row2.setTableId(TablePath.DEFAULT.getFullName());
548+
row2.setOptions(rowOptions);
525549
SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100});
526550
row3.setRowKind(RowKind.INSERT);
527551
row3.setTableId(TablePath.DEFAULT.getFullName());
552+
row3.setOptions(rowOptions);
528553
SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, "A", 100});
529554
row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName());
530555
row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE);
556+
row1UpdateBefore.setOptions(rowOptions);
531557
SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, "A_1", 100});
532558
row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName());
533559
row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER);
560+
row1UpdateAfter.setOptions(rowOptions);
534561
SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 100});
535562
row2Delete.setTableId(TablePath.DEFAULT.getFullName());
536563
row2Delete.setRowKind(RowKind.DELETE);
564+
row2Delete.setOptions(rowOptions);
537565

538566
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
539567
catalogTable,
@@ -548,17 +576,22 @@ void testMaxWellJsonSink() throws IOException {
548576
Path path = Paths.get("/tmp/seatunnel/LocalFileTest/maxwell_json_file.maxwell_json");
549577
String dataStr = FileUtils.readFileToStr(path);
550578
Assertions.assertTrue(
551-
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\"}"));
579+
dataStr.contains(
580+
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
552581
Assertions.assertTrue(
553-
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\"}"));
582+
dataStr.contains(
583+
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
554584
Assertions.assertTrue(
555-
dataStr.contains("{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\"}"));
585+
dataStr.contains(
586+
"{\"data\":{\"a\":3,\"b\":\"C\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
556587
Assertions.assertTrue(
557-
dataStr.contains("{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\"}"));
588+
dataStr.contains(
589+
"{\"data\":{\"a\":1,\"b\":\"A\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
558590
Assertions.assertTrue(
559591
dataStr.contains(
560-
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\"}"));
592+
"{\"data\":{\"a\":1,\"b\":\"A_1\",\"c\":100},\"type\":\"INSERT\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
561593
Assertions.assertTrue(
562-
dataStr.contains("{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\"}"));
594+
dataStr.contains(
595+
"{\"data\":{\"a\":2,\"b\":\"B\",\"c\":100},\"type\":\"DELETE\",\"database\":\"default\",\"table\":\"default\",\"ts\":1}"));
563596
}
564597
}

0 commit comments

Comments
 (0)