Skip to content

Commit 4ed7206

Browse files
GOODBOY008leonardBang
authored andcommitted
[cdc-base] Fix parsing error when serializing and deserializing the table name with dot (#2443)
(cherry picked from commit 7801ac9)
1 parent a7906da commit 4ed7206

File tree

2 files changed

+98
-1
lines changed

2 files changed

+98
-1
lines changed

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public byte[] serialize(SourceSplitBase split) throws IOException {
7070
boolean useCatalogBeforeSchema =
7171
SerializerUtils.shouldUseCatalogBeforeSchema(snapshotSplit.getTableId());
7272
out.writeBoolean(useCatalogBeforeSchema);
73-
out.writeUTF(snapshotSplit.getTableId().toString());
73+
out.writeUTF(snapshotSplit.getTableId().toDoubleQuotedString());
7474
out.writeUTF(snapshotSplit.splitId());
7575
out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString());
7676

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.base.source.meta.split;
18+
19+
import org.apache.flink.table.types.logical.BigIntType;
20+
import org.apache.flink.table.types.logical.RowType;
21+
22+
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
23+
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
24+
import io.debezium.relational.TableId;
25+
import org.junit.Test;
26+
27+
import java.io.IOException;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
import static org.junit.Assert.assertEquals;
33+
34+
/** Tests for {@link SourceSplitSerializer}. */
35+
public class SourceSplitSerializerTest {
36+
37+
@Test
38+
public void testSnapshotTableIdSerializeAndDeserialize() throws IOException {
39+
SnapshotSplit snapshotSplitBefore =
40+
new SnapshotSplit(
41+
new TableId("cata`log\"", "s\"che`ma", "ta\"ble.1`"),
42+
"test",
43+
new RowType(
44+
Collections.singletonList(
45+
new RowType.RowField("id", new BigIntType()))),
46+
null,
47+
null,
48+
null,
49+
new HashMap<>());
50+
51+
SourceSplitSerializer sourceSplitSerializer =
52+
new SourceSplitSerializer() {
53+
@Override
54+
public OffsetFactory getOffsetFactory() {
55+
return new OffsetFactory() {
56+
@Override
57+
public Offset newOffset(Map<String, String> offset) {
58+
return null;
59+
}
60+
61+
@Override
62+
public Offset newOffset(String filename, Long position) {
63+
return null;
64+
}
65+
66+
@Override
67+
public Offset newOffset(Long position) {
68+
return null;
69+
}
70+
71+
@Override
72+
public Offset createTimestampOffset(long timestampMillis) {
73+
return null;
74+
}
75+
76+
@Override
77+
public Offset createInitialOffset() {
78+
return null;
79+
}
80+
81+
@Override
82+
public Offset createNoStoppingOffset() {
83+
return null;
84+
}
85+
};
86+
}
87+
};
88+
89+
SnapshotSplit snapshotSplitAfter =
90+
(SnapshotSplit)
91+
sourceSplitSerializer.deserialize(
92+
sourceSplitSerializer.getVersion(),
93+
sourceSplitSerializer.serialize(snapshotSplitBefore));
94+
95+
assertEquals(snapshotSplitBefore.getTableId(), snapshotSplitAfter.getTableId());
96+
}
97+
}

0 commit comments

Comments
 (0)