Skip to content

Commit 12f27ac

Browse files
authored
[mysql] Support metadata columns for mysql-cdc connector (#496)
1 parent 64fc3e3 commit 12f27ac

File tree

10 files changed

+561
-80
lines changed

10 files changed

+561
-80
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.debezium.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.data.GenericRowData;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.flink.table.data.utils.JoinedRowData;
25+
import org.apache.flink.util.Collector;
26+
27+
import org.apache.kafka.connect.source.SourceRecord;
28+
29+
import java.io.Serializable;
30+
31+
/** Emits a row with physical fields and metadata fields. */
32+
@Internal
33+
public final class AppendMetadataCollector implements Collector<RowData>, Serializable {
34+
private static final long serialVersionUID = 1L;
35+
36+
private final MetadataConverter[] metadataConverters;
37+
38+
public transient SourceRecord inputRecord;
39+
public transient Collector<RowData> outputCollector;
40+
41+
public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
42+
this.metadataConverters = metadataConverters;
43+
}
44+
45+
@Override
46+
public void collect(RowData physicalRow) {
47+
GenericRowData metaRow = new GenericRowData(metadataConverters.length);
48+
for (int i = 0; i < metadataConverters.length; i++) {
49+
Object meta = metadataConverters[i].read(inputRecord);
50+
metaRow.setField(i, meta);
51+
}
52+
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
53+
outputCollector.collect(outRow);
54+
}
55+
56+
@Override
57+
public void close() {
58+
// nothing to do
59+
}
60+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.debezium.table;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import org.apache.kafka.connect.source.SourceRecord;
24+
25+
import java.io.Serializable;
26+
27+
/** A converter converts {@link SourceRecord} metadata into Flink internal data structures. */
28+
@FunctionalInterface
29+
@Internal
30+
public interface MetadataConverter extends Serializable {
31+
Object read(SourceRecord record);
32+
}

flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java

Lines changed: 88 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
import org.apache.flink.types.RowKind;
3131
import org.apache.flink.util.Collector;
3232

33-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
34-
3533
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
3634
import com.ververica.cdc.debezium.utils.TemporalConversions;
3735
import io.debezium.data.Envelope;
@@ -55,13 +53,15 @@
5553
import java.time.LocalDateTime;
5654
import java.time.ZoneId;
5755

56+
import static org.apache.flink.util.Preconditions.checkNotNull;
57+
5858
/**
5959
* Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link
6060
* RowData}.
6161
*/
6262
public final class RowDataDebeziumDeserializeSchema
6363
implements DebeziumDeserializationSchema<RowData> {
64-
private static final long serialVersionUID = -4852684966051743776L;
64+
private static final long serialVersionUID = 2L;
6565

6666
/** Custom validator to validate the row value. */
6767
public interface ValueValidator extends Serializable {
@@ -72,26 +72,42 @@ public interface ValueValidator extends Serializable {
7272
private final TypeInformation<RowData> resultTypeInfo;
7373

7474
/**
75-
* Runtime converter that converts {@link JsonNode}s into objects of Flink SQL internal data
76-
* structures. *
75+
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
76+
* physical column values.
7777
*/
78-
private final DeserializationRuntimeConverter runtimeConverter;
78+
private final DeserializationRuntimeConverter physicalConverter;
79+
80+
/** Whether the deserializer needs to handle metadata columns. */
81+
private final boolean hasMetadata;
82+
83+
/**
84+
* A wrapped output collector which is used to append metadata columns after physical columns.
85+
*/
86+
private final AppendMetadataCollector appendMetadataCollector;
7987

8088
/** Time zone of the database server. */
8189
private final ZoneId serverTimeZone;
8290

8391
/** Validator to validate the row value. */
8492
private final ValueValidator validator;
8593

86-
public RowDataDebeziumDeserializeSchema(
87-
RowType rowType,
94+
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
95+
public static Builder newBuilder() {
96+
return new Builder();
97+
}
98+
99+
RowDataDebeziumDeserializeSchema(
100+
RowType physicalDataType,
101+
MetadataConverter[] metadataConverters,
88102
TypeInformation<RowData> resultTypeInfo,
89103
ValueValidator validator,
90104
ZoneId serverTimeZone) {
91-
this.runtimeConverter = createConverter(rowType);
92-
this.resultTypeInfo = resultTypeInfo;
93-
this.validator = validator;
94-
this.serverTimeZone = serverTimeZone;
105+
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
106+
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
107+
this.physicalConverter = createConverter(checkNotNull(physicalDataType));
108+
this.resultTypeInfo = checkNotNull(resultTypeInfo);
109+
this.validator = checkNotNull(validator);
110+
this.serverTimeZone = checkNotNull(serverTimeZone);
95111
}
96112

97113
@Override
@@ -103,42 +119,96 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
103119
GenericRowData insert = extractAfterRow(value, valueSchema);
104120
validator.validate(insert, RowKind.INSERT);
105121
insert.setRowKind(RowKind.INSERT);
106-
out.collect(insert);
122+
emit(record, insert, out);
107123
} else if (op == Envelope.Operation.DELETE) {
108124
GenericRowData delete = extractBeforeRow(value, valueSchema);
109125
validator.validate(delete, RowKind.DELETE);
110126
delete.setRowKind(RowKind.DELETE);
111-
out.collect(delete);
127+
emit(record, delete, out);
112128
} else {
113129
GenericRowData before = extractBeforeRow(value, valueSchema);
114130
validator.validate(before, RowKind.UPDATE_BEFORE);
115131
before.setRowKind(RowKind.UPDATE_BEFORE);
116-
out.collect(before);
132+
emit(record, before, out);
117133

118134
GenericRowData after = extractAfterRow(value, valueSchema);
119135
validator.validate(after, RowKind.UPDATE_AFTER);
120136
after.setRowKind(RowKind.UPDATE_AFTER);
121-
out.collect(after);
137+
emit(record, after, out);
122138
}
123139
}
124140

125141
private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception {
126142
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
127143
Struct after = value.getStruct(Envelope.FieldName.AFTER);
128-
return (GenericRowData) runtimeConverter.convert(after, afterSchema);
144+
return (GenericRowData) physicalConverter.convert(after, afterSchema);
129145
}
130146

131147
private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception {
132148
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
133149
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
134-
return (GenericRowData) runtimeConverter.convert(before, beforeSchema);
150+
return (GenericRowData) physicalConverter.convert(before, beforeSchema);
151+
}
152+
153+
private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) {
154+
if (!hasMetadata) {
155+
collector.collect(physicalRow);
156+
return;
157+
}
158+
159+
appendMetadataCollector.inputRecord = inRecord;
160+
appendMetadataCollector.outputCollector = collector;
161+
appendMetadataCollector.collect(physicalRow);
135162
}
136163

137164
@Override
138165
public TypeInformation<RowData> getProducedType() {
139166
return resultTypeInfo;
140167
}
141168

169+
// -------------------------------------------------------------------------------------
170+
// Builder
171+
// -------------------------------------------------------------------------------------
172+
173+
/** Builder of {@link RowDataDebeziumDeserializeSchema}. */
174+
public static class Builder {
175+
private RowType physicalRowType;
176+
private TypeInformation<RowData> resultTypeInfo;
177+
private MetadataConverter[] metadataConverters = new MetadataConverter[0];
178+
private ValueValidator validator = (rowData, rowKind) -> {};
179+
private ZoneId serverTimeZone = ZoneId.of("UTC");
180+
181+
public Builder setPhysicalRowType(RowType physicalRowType) {
182+
this.physicalRowType = physicalRowType;
183+
return this;
184+
}
185+
186+
public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
187+
this.metadataConverters = metadataConverters;
188+
return this;
189+
}
190+
191+
public Builder setResultTypeInfo(TypeInformation<RowData> resultTypeInfo) {
192+
this.resultTypeInfo = resultTypeInfo;
193+
return this;
194+
}
195+
196+
public Builder setValueValidator(ValueValidator validator) {
197+
this.validator = validator;
198+
return this;
199+
}
200+
201+
public Builder setServerTimeZone(ZoneId serverTimeZone) {
202+
this.serverTimeZone = serverTimeZone;
203+
return this;
204+
}
205+
206+
public RowDataDebeziumDeserializeSchema build() {
207+
return new RowDataDebeziumDeserializeSchema(
208+
physicalRowType, metadataConverters, resultTypeInfo, validator, serverTimeZone);
209+
}
210+
}
211+
142212
// -------------------------------------------------------------------------------------
143213
// Runtime Converters
144214
// -------------------------------------------------------------------------------------
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.connectors.mysql.table;
20+
21+
import org.apache.flink.table.api.DataTypes;
22+
import org.apache.flink.table.data.StringData;
23+
import org.apache.flink.table.data.TimestampData;
24+
import org.apache.flink.table.types.DataType;
25+
26+
import com.ververica.cdc.debezium.table.MetadataConverter;
27+
import io.debezium.connector.AbstractSourceInfo;
28+
import io.debezium.data.Envelope;
29+
import org.apache.kafka.connect.data.Struct;
30+
import org.apache.kafka.connect.source.SourceRecord;
31+
32+
/** Defines the supported metadata columns for {@link MySqlTableSource}. */
33+
public enum MySqlReadableMetadata {
34+
/** Name of the table that contain the row. . */
35+
TABLE_NAME(
36+
"table_name",
37+
DataTypes.STRING().notNull(),
38+
new MetadataConverter() {
39+
private static final long serialVersionUID = 1L;
40+
41+
@Override
42+
public Object read(SourceRecord record) {
43+
Struct messageStruct = (Struct) record.value();
44+
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
45+
return StringData.fromString(
46+
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY));
47+
}
48+
}),
49+
50+
/** Name of the database that contain the row. */
51+
DATABASE_NAME(
52+
"database_name",
53+
DataTypes.STRING().notNull(),
54+
new MetadataConverter() {
55+
private static final long serialVersionUID = 1L;
56+
57+
@Override
58+
public Object read(SourceRecord record) {
59+
Struct messageStruct = (Struct) record.value();
60+
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
61+
return StringData.fromString(
62+
sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY));
63+
}
64+
}),
65+
66+
/**
67+
* It indicates the time that the change was made in the database. If the record is read from
68+
* snapshot of the table instead of the binlog, the value is always 0.
69+
*/
70+
OP_TS(
71+
"op_ts",
72+
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
73+
new MetadataConverter() {
74+
private static final long serialVersionUID = 1L;
75+
76+
@Override
77+
public Object read(SourceRecord record) {
78+
Struct messageStruct = (Struct) record.value();
79+
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
80+
return TimestampData.fromEpochMillis(
81+
(Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
82+
}
83+
});
84+
85+
private final String key;
86+
87+
private final DataType dataType;
88+
89+
private final MetadataConverter converter;
90+
91+
MySqlReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
92+
this.key = key;
93+
this.dataType = dataType;
94+
this.converter = converter;
95+
}
96+
97+
public String getKey() {
98+
return key;
99+
}
100+
101+
public DataType getDataType() {
102+
return dataType;
103+
}
104+
105+
public MetadataConverter getConverter() {
106+
return converter;
107+
}
108+
}

0 commit comments

Comments
 (0)