-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Format] Improve maxwell_json,canal_json,debezium_json format add ts_ms and table #9701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @wubx |
...onnector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
Outdated
Show resolved
Hide resolved
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
Outdated
Show resolved
Hide resolved
@@ -88,9 +98,12 @@ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchem | |||
// but we don't need them | |||
// and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER | |||
return new SeaTunnelRowType( | |||
new String[] {"data", "type"}, | |||
new String[] {"data", "type", "tableId", "ts"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancel format is database and table fields, not tableId. Please refer https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json/#dml-event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancel format is database and table fields, not tableId. Please refer https://docs.pingcap.com/zh/tidb/stable/ticdc-canal-json/#dml-event
SeaTunnelRow do not have database and table fields, if use tableId to get db fields,may be get error data,remove it,only add ts field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update SeaTunnelRowType to CatalogTable in
Line 49 in 9212a77
public CanalJsonSerializationSchema(SeaTunnelRowType rowType) { |
Then we can get table and database from CatalogTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update SeaTunnelRowType to CatalogTable in
Line 49 in 9212a77
public CanalJsonSerializationSchema(SeaTunnelRowType rowType) { Then we can get table and database from CatalogTable.
MetadataUtil.getDatabase use tableId to get db,maybe can use it , update SeaTunnelRowType to CatalogTable will change manay file
@@ -78,7 +94,9 @@ public byte[] serialize(SeaTunnelRow row) { | |||
|
|||
private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchema) { | |||
return new SeaTunnelRowType( | |||
new String[] {"before", "after", "op"}, | |||
new SeaTunnelDataType[] {databaseSchema, databaseSchema, STRING_TYPE}); | |||
new String[] {"before", "after", "op", "tableId", "ts_ms"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The debezium format doesn't contains tableId
. Please refer https://help.aliyun.com/zh/flink/debezium
@@ -87,7 +96,7 @@ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchem | |||
// but we don't need them | |||
// and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER | |||
return new SeaTunnelRowType( | |||
new String[] {"data", "type"}, | |||
new SeaTunnelDataType[] {databaseSchema, STRING_TYPE}); | |||
new String[] {"data", "type", "tableId", "ts"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as canal, maxwell doesn'ts contains tableId too. Please refer https://help.aliyun.com/zh/flink/maxwell-format
@@ -79,7 +88,7 @@ private static SeaTunnelRowType createJsonRowType(SeaTunnelRowType databaseSchem | |||
// but we don't need them | |||
// and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER | |||
return new SeaTunnelRowType( | |||
new String[] {"data", "type"}, | |||
new SeaTunnelDataType[] {databaseSchema, STRING_TYPE}); | |||
new String[] {"data", "type", "tableId", "op_ts"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ogg json doesn't contains tableId
. Please refer https://help.aliyun.com/zh/datahub/developer-reference/ogg-for-big-data-kafka
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"type\":\"INSERT\"}", | ||
"{\"data\":{\"id\":111,\"name\":\"scooter\",\"description\":\"Big 2-wheel scooter \",\"weight\":5.17},\"type\":\"DELETE\"}"); | ||
|
||
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel scooter\",\"weight\":3.14},\"type\":\"INSERT\",\"table\":\"..test\",\"op_ts\":1589384406000}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why table is ..test
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why table is
..test
?
'..test' is because of db and schema is null
} | ||
|
||
if (row.getOptions() != null && row.getOptions().containsKey(EVENT_TIME.getName())) { | ||
reuse.setField(4, row.getOptions().get(EVENT_TIME.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we standardize the precision of eventtime? I see that the unit is sometimes seconds and sometimes milliseconds. Can we standardize it to milliseconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comment in
seatunnel/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
Line 53 in 6e5737c
EVENT_TIME("EventTime", true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comment in
seatunnel/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java
Line 53 in 6e5737c
EVENT_TIME("EventTime", true),
i add it , and change metadata.md
...son/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dyp12 . LGTM if ci passes.
Purpose of this pull request
close #9675
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide