-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Format] Improve maxwell_json,canal_json,debezium_json format support merge update_before and update_after #9805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
- [x] canal_json | ||
- [x] debezium_json | ||
- [x] maxwell_json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you checked the behavior of canal_json
and maxwell_json
? I'm not sure about it should merge or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you checked the behavior of
canal_json
andmaxwell_json
? I'm not sure about it should merge or not.
I don't understand what you mean. merge only add update event data, canal_json
and maxwell_json
support update type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, I checked the canal_json and maxwell_json support UPDATE (without UPDATE_BEFORE and UPDATE_AFTER).
this.options = new HashMap<>(); | ||
this.options.put( | ||
CanalJsonFormatOptions.MERGE_UPDATE_EVENT.key(), | ||
textFileSinkConfig.getMergeUpdateEvent().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why introduce options? Let's keep the code simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why introduce options? Let's keep the code simple.
if not, the CanalJsonSerializationSchema() will add a param. if we want change other , will add a param,and the CanalJsonSerializationSchema() will have many param. use Map can keep the code simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add it when we really need it. But I think we don't need this for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add it when we really need it. But I think we don't need this for now.
i fixed it
@@ -158,6 +157,7 @@ private void parsePayload(Collector<SeaTunnelRow> out, TablePath tablePath, Json | |||
if (tsNode != null) { | |||
MetadataUtil.setEventTime(after, tsNode.asLong()); | |||
} | |||
out.collect(before); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why moved this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why moved this?
if parse DATA_AFTER have error, out.collect(before) and out.collect(after) together can make sure before and after data send to target , not only send before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
@@ -204,6 +203,7 @@ public void deserializeMessage( | |||
if (tsNode != null) { | |||
MetadataUtil.setEventTime(after, ts); | |||
} | |||
out.collect(before); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Purpose of this pull request
close #9773
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide