Skip to content

Commit 8c75db1

Browse files
authored
[cdc-pipeline-connector][mysql] Ensure the inference of MEDIUMINT_UNSIGNED type matches INT type in table schema (#2811)
1 parent a10e07f commit 8c75db1

File tree

2 files changed

+374
-2
lines changed

2 files changed

+374
-2
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/utils/MySqlTypeUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,14 @@ private static DataType convertFromColumn(Column column) {
143143
case INT:
144144
case INTEGER:
145145
case MEDIUMINT:
146+
case MEDIUMINT_UNSIGNED:
147+
case MEDIUMINT_UNSIGNED_ZEROFILL:
146148
case YEAR:
147149
return DataTypes.INT();
148150
case INT_UNSIGNED:
149151
case INT_UNSIGNED_ZEROFILL:
150152
case INTEGER_UNSIGNED:
151153
case INTEGER_UNSIGNED_ZEROFILL:
152-
case MEDIUMINT_UNSIGNED:
153-
case MEDIUMINT_UNSIGNED_ZEROFILL:
154154
case BIGINT:
155155
return DataTypes.BIGINT();
156156
case BIGINT_UNSIGNED:
Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.mysql.source;
18+
19+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
20+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
21+
22+
import com.ververica.cdc.common.event.TableId;
23+
import com.ververica.cdc.common.schema.Schema;
24+
import com.ververica.cdc.common.types.DataType;
25+
import com.ververica.cdc.common.types.DataTypes;
26+
import com.ververica.cdc.common.types.RowType;
27+
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
28+
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
29+
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
30+
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
31+
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
32+
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
33+
import org.junit.AfterClass;
34+
import org.junit.Before;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.testcontainers.lifecycle.Startables;
38+
39+
import java.time.ZoneId;
40+
import java.util.Arrays;
41+
import java.util.List;
42+
import java.util.stream.Collectors;
43+
import java.util.stream.Stream;
44+
45+
import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
46+
import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
49+
50+
/** IT cases for {@link MySqlMetadataAccessor}. */
51+
public class MySqlMetadataAccessorITCase extends MySqlSourceTestBase {
52+
53+
private static final MySqlContainer MYSQL8_CONTAINER =
54+
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
55+
56+
private final UniqueDatabase fullTypesMySql57Database =
57+
new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, TEST_PASSWORD);
58+
59+
private final UniqueDatabase fullTypesMySql8Database =
60+
new UniqueDatabase(
61+
MYSQL8_CONTAINER, "column_type_test_mysql8", TEST_USER, TEST_PASSWORD);
62+
63+
private final StreamExecutionEnvironment env =
64+
StreamExecutionEnvironment.getExecutionEnvironment();
65+
66+
@BeforeClass
67+
public static void beforeClass() {
68+
LOG.info("Starting MySql8 containers...");
69+
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
70+
LOG.info("Container MySql8 is started.");
71+
}
72+
73+
@AfterClass
74+
public static void afterClass() {
75+
LOG.info("Stopping MySql8 containers...");
76+
MYSQL8_CONTAINER.stop();
77+
LOG.info("Container MySql8 is stopped.");
78+
}
79+
80+
@Before
81+
public void before() {
82+
env.setParallelism(DEFAULT_PARALLELISM);
83+
env.enableCheckpointing(200);
84+
env.setRestartStrategy(RestartStrategies.noRestart());
85+
}
86+
87+
@Test
88+
public void testMysql57AccessDatabaseAndTable() {
89+
testAccessDatabaseAndTable(fullTypesMySql57Database);
90+
}
91+
92+
@Test
93+
public void testMysql8AccessDatabaseAndTable() {
94+
testAccessDatabaseAndTable(fullTypesMySql8Database);
95+
}
96+
97+
@Test
98+
public void testMysql57AccessCommonTypesSchema() {
99+
testAccessCommonTypesSchema(fullTypesMySql57Database);
100+
}
101+
102+
@Test
103+
public void testMysql8AccessCommonTypesSchema() {
104+
testAccessCommonTypesSchema(fullTypesMySql8Database);
105+
}
106+
107+
@Test
108+
public void testMysql57AccessTimeTypesSchema() {
109+
fullTypesMySql57Database.createAndInitialize();
110+
111+
String[] tables = new String[] {"time_types"};
112+
MySqlMetadataAccessor metadataAccessor =
113+
getMetadataAccessor(tables, fullTypesMySql57Database);
114+
115+
Schema actualSchema =
116+
metadataAccessor.getTableSchema(
117+
TableId.tableId(fullTypesMySql57Database.getDatabaseName(), "time_types"));
118+
Schema expectedSchema =
119+
Schema.newBuilder()
120+
.primaryKey("id")
121+
.fromRowDataType(
122+
RowType.of(
123+
new DataType[] {
124+
DataTypes.DECIMAL(20, 0).notNull(),
125+
DataTypes.INT(),
126+
DataTypes.DATE(),
127+
DataTypes.TIME(0),
128+
DataTypes.TIME(3),
129+
DataTypes.TIME(6),
130+
DataTypes.TIMESTAMP(0),
131+
DataTypes.TIMESTAMP(3),
132+
DataTypes.TIMESTAMP(6),
133+
DataTypes.TIMESTAMP_LTZ(0)
134+
},
135+
new String[] {
136+
"id",
137+
"year_c",
138+
"date_c",
139+
"time_c",
140+
"time_3_c",
141+
"time_6_c",
142+
"datetime_c",
143+
"datetime3_c",
144+
"datetime6_c",
145+
"timestamp_c"
146+
}))
147+
.build();
148+
assertThat(actualSchema).isEqualTo(expectedSchema);
149+
}
150+
151+
@Test
152+
public void testMysql8AccessTimeTypesSchema() {
153+
fullTypesMySql8Database.createAndInitialize();
154+
155+
String[] tables = new String[] {"time_types"};
156+
MySqlMetadataAccessor metadataAccessor =
157+
getMetadataAccessor(tables, fullTypesMySql8Database);
158+
159+
Schema actualSchema =
160+
metadataAccessor.getTableSchema(
161+
TableId.tableId(fullTypesMySql8Database.getDatabaseName(), "time_types"));
162+
Schema expectedSchema =
163+
Schema.newBuilder()
164+
.primaryKey("id")
165+
.fromRowDataType(
166+
RowType.of(
167+
new DataType[] {
168+
DataTypes.DECIMAL(20, 0).notNull(),
169+
DataTypes.INT(),
170+
DataTypes.DATE(),
171+
DataTypes.TIME(0),
172+
DataTypes.TIME(3),
173+
DataTypes.TIME(6),
174+
DataTypes.TIMESTAMP(0),
175+
DataTypes.TIMESTAMP(3),
176+
DataTypes.TIMESTAMP(6),
177+
DataTypes.TIMESTAMP_LTZ(0),
178+
DataTypes.TIMESTAMP_LTZ(3),
179+
DataTypes.TIMESTAMP_LTZ(6)
180+
},
181+
new String[] {
182+
"id",
183+
"year_c",
184+
"date_c",
185+
"time_c",
186+
"time_3_c",
187+
"time_6_c",
188+
"datetime_c",
189+
"datetime3_c",
190+
"datetime6_c",
191+
"timestamp_c",
192+
"timestamp3_c",
193+
"timestamp6_c"
194+
}))
195+
.build();
196+
assertThat(actualSchema).isEqualTo(expectedSchema);
197+
}
198+
199+
private void testAccessDatabaseAndTable(UniqueDatabase database) {
200+
database.createAndInitialize();
201+
202+
String[] tables = new String[] {"common_types", "time_types"};
203+
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
204+
205+
assertThatThrownBy(metadataAccessor::listNamespaces)
206+
.isInstanceOf(UnsupportedOperationException.class);
207+
208+
List<String> schemas = metadataAccessor.listSchemas(null);
209+
assertThat(schemas).contains(database.getDatabaseName());
210+
211+
List<TableId> actualTables = metadataAccessor.listTables(null, database.getDatabaseName());
212+
List<TableId> expectedTables =
213+
Arrays.stream(tables)
214+
.map(table -> TableId.tableId(database.getDatabaseName(), table))
215+
.collect(Collectors.toList());
216+
assertThat(actualTables).containsExactlyInAnyOrderElementsOf(expectedTables);
217+
}
218+
219+
private void testAccessCommonTypesSchema(UniqueDatabase database) {
220+
database.createAndInitialize();
221+
222+
String[] tables = new String[] {"common_types"};
223+
MySqlMetadataAccessor metadataAccessor = getMetadataAccessor(tables, database);
224+
225+
Schema actualSchema =
226+
metadataAccessor.getTableSchema(
227+
TableId.tableId(database.getDatabaseName(), "common_types"));
228+
Schema expectedSchema =
229+
Schema.newBuilder()
230+
.primaryKey("id")
231+
.fromRowDataType(
232+
RowType.of(
233+
new DataType[] {
234+
DataTypes.DECIMAL(20, 0).notNull(),
235+
DataTypes.TINYINT(),
236+
DataTypes.SMALLINT(),
237+
DataTypes.SMALLINT(),
238+
DataTypes.SMALLINT(),
239+
DataTypes.INT(),
240+
DataTypes.INT(),
241+
DataTypes.INT(),
242+
DataTypes.INT(),
243+
DataTypes.INT(),
244+
DataTypes.INT(),
245+
DataTypes.BIGINT(),
246+
DataTypes.BIGINT(),
247+
DataTypes.INT(),
248+
DataTypes.BIGINT(),
249+
DataTypes.DECIMAL(20, 0),
250+
DataTypes.DECIMAL(20, 0),
251+
DataTypes.VARCHAR(255),
252+
DataTypes.CHAR(3),
253+
DataTypes.DOUBLE(),
254+
DataTypes.FLOAT(),
255+
DataTypes.FLOAT(),
256+
DataTypes.FLOAT(),
257+
DataTypes.DOUBLE(),
258+
DataTypes.DOUBLE(),
259+
DataTypes.DOUBLE(),
260+
DataTypes.DECIMAL(8, 4),
261+
DataTypes.DECIMAL(8, 4),
262+
DataTypes.DECIMAL(8, 4),
263+
DataTypes.DECIMAL(6, 0),
264+
// Decimal precision larger than 38 will be treated as
265+
// string.
266+
DataTypes.STRING(),
267+
DataTypes.BOOLEAN(),
268+
DataTypes.BOOLEAN(),
269+
DataTypes.BOOLEAN(),
270+
DataTypes.BINARY(16),
271+
DataTypes.BINARY(8),
272+
DataTypes.STRING(),
273+
DataTypes.BYTES(),
274+
DataTypes.BYTES(),
275+
DataTypes.BYTES(),
276+
DataTypes.BYTES(),
277+
DataTypes.INT(),
278+
DataTypes.STRING(),
279+
DataTypes.STRING(),
280+
DataTypes.STRING(),
281+
DataTypes.STRING(),
282+
DataTypes.STRING(),
283+
DataTypes.STRING(),
284+
DataTypes.STRING(),
285+
DataTypes.STRING(),
286+
DataTypes.STRING(),
287+
DataTypes.STRING()
288+
},
289+
new String[] {
290+
"id",
291+
"tiny_c",
292+
"tiny_un_c",
293+
"tiny_un_z_c",
294+
"small_c",
295+
"small_un_c",
296+
"small_un_z_c",
297+
"medium_c",
298+
"medium_un_c",
299+
"medium_un_z_c",
300+
"int_c",
301+
"int_un_c",
302+
"int_un_z_c",
303+
"int11_c",
304+
"big_c",
305+
"big_un_c",
306+
"big_un_z_c",
307+
"varchar_c",
308+
"char_c",
309+
"real_c",
310+
"float_c",
311+
"float_un_c",
312+
"float_un_z_c",
313+
"double_c",
314+
"double_un_c",
315+
"double_un_z_c",
316+
"decimal_c",
317+
"decimal_un_c",
318+
"decimal_un_z_c",
319+
"numeric_c",
320+
"big_decimal_c",
321+
"bit1_c",
322+
"tiny1_c",
323+
"boolean_c",
324+
"file_uuid",
325+
"bit_c",
326+
"text_c",
327+
"tiny_blob_c",
328+
"blob_c",
329+
"medium_blob_c",
330+
"long_blob_c",
331+
"year_c",
332+
"enum_c",
333+
"json_c",
334+
"point_c",
335+
"geometry_c",
336+
"linestring_c",
337+
"polygon_c",
338+
"multipoint_c",
339+
"multiline_c",
340+
"multipolygon_c",
341+
"geometrycollection_c"
342+
}))
343+
.build();
344+
assertThat(actualSchema).isEqualTo(expectedSchema);
345+
}
346+
347+
private MySqlMetadataAccessor getMetadataAccessor(String[] tables, UniqueDatabase database) {
348+
MySqlSourceConfig sourceConfig = getConfig(tables, database);
349+
return new MySqlMetadataAccessor(sourceConfig);
350+
}
351+
352+
private MySqlSourceConfig getConfig(String[] captureTables, UniqueDatabase database) {
353+
String[] captureTableIds =
354+
Arrays.stream(captureTables)
355+
.map(tableName -> database.getDatabaseName() + "." + tableName)
356+
.toArray(String[]::new);
357+
358+
return new MySqlSourceConfigFactory()
359+
.startupOptions(StartupOptions.latest())
360+
.databaseList(database.getDatabaseName())
361+
.tableList(captureTableIds)
362+
.includeSchemaChanges(false)
363+
.hostname(database.getHost())
364+
.port(database.getDatabasePort())
365+
.splitSize(10)
366+
.fetchSize(2)
367+
.username(database.getUsername())
368+
.password(database.getPassword())
369+
.serverTimeZone(ZoneId.of("UTC").toString())
370+
.createConfig(0);
371+
}
372+
}

0 commit comments

Comments
 (0)