Skip to content

Commit 3dfbecb

Browse files
xiangtaoleonardBang
authored andcommitted
[mysql] Support MySQL 5.6 (apache#396)
Co-authored-by: Leonard Xu <[email protected]>
1 parent 05c64eb commit 3dfbecb

File tree

5 files changed

+339
-12
lines changed

5 files changed

+339
-12
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,13 @@ private void checkVersion(JdbcConnection connection) throws SQLException {
104104
} else if (versionNumbers[0] < 5) {
105105
isSatisfied = false;
106106
} else {
107-
isSatisfied = versionNumbers[1] >= 7;
107+
isSatisfied = versionNumbers[1] >= 6;
108108
}
109109
if (!isSatisfied) {
110110
throw new ValidationException(
111111
String.format(
112112
"Currently Flink MySql CDC connector only supports MySql "
113-
+ "whose version is larger or equal to 5.7, but actual is %s.%s.",
113+
+ "whose version is larger or equal to 5.6, but actual is %s.%s.",
114114
versionNumbers[0], versionNumbers[1]));
115115
}
116116
}

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlValidatorTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@
2323

2424
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
2525
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
26+
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
2627
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
2728
import com.ververica.cdc.debezium.DebeziumSourceFunction;
2829
import org.apache.kafka.connect.source.SourceRecord;
2930
import org.junit.AfterClass;
3031
import org.junit.BeforeClass;
32+
import org.junit.Ignore;
3133
import org.junit.Test;
3234
import org.junit.rules.TemporaryFolder;
3335
import org.junit.runner.RunWith;
@@ -51,6 +53,8 @@
5153

5254
import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.basicSourceBuilder;
5355
import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.setupSource;
56+
import static com.ververica.cdc.connectors.mysql.testutils.MySqlVersion.V5_5;
57+
import static com.ververica.cdc.connectors.mysql.testutils.MySqlVersion.V5_7;
5458
import static org.junit.Assert.assertEquals;
5559
import static org.junit.Assert.assertTrue;
5660
import static org.junit.Assert.fail;
@@ -90,12 +94,13 @@ public static void tearDown() {
9094
tempFolder.delete();
9195
}
9296

97+
@Ignore("The jdbc driver used in this module cannot connect to MySQL 5.5")
9398
@Test
9499
public void testValidateVersion() {
95-
String version = "5.6";
100+
MySqlVersion version = V5_5;
96101
String message =
97102
String.format(
98-
"Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is %s.",
103+
"Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.6, but actual is %s.",
99104
version);
100105
doValidate(version, "docker/server/my.cnf", message);
101106
}
@@ -109,7 +114,7 @@ public void testValidateBinlogFormat() {
109114
+ "connector to work properly. Change the MySQL configuration to use a binlog_format=ROW "
110115
+ "and restart the connector.",
111116
mode);
112-
doValidate("5.7", buildMySqlConfigFile("[mysqld]\nbinlog_format = " + mode), message);
117+
doValidate(V5_7, buildMySqlConfigFile("[mysqld]\nbinlog_format = " + mode), message);
113118
}
114119

115120
@Test
@@ -122,13 +127,14 @@ public void testValidateBinlogRowImage() {
122127
+ "binlog_row_image=FULL and restart the connector.",
123128
mode);
124129
doValidate(
125-
"5.7",
130+
V5_7,
126131
buildMySqlConfigFile("[mysqld]\nbinlog_format = ROW\nbinlog_row_image = " + mode),
127132
message);
128133
}
129134

130-
private void doValidate(String tag, String configPath, String exceptionMessage) {
131-
MySqlContainer container = new MySqlContainer(tag).withConfigurationOverride(configPath);
135+
private void doValidate(MySqlVersion version, String configPath, String exceptionMessage) {
136+
MySqlContainer container =
137+
new MySqlContainer(version).withConfigurationOverride(configPath);
132138

133139
LOG.info("Starting containers...");
134140
Startables.deepStart(Stream.of(container)).join();
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.connectors.mysql.table;
20+
21+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
22+
import org.apache.flink.table.api.EnvironmentSettings;
23+
import org.apache.flink.table.api.TableResult;
24+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
25+
import org.apache.flink.types.Row;
26+
import org.apache.flink.util.CloseableIterator;
27+
28+
import com.ververica.cdc.connectors.mysql.MySqlValidatorTest;
29+
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
30+
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
31+
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
import org.junit.rules.TemporaryFolder;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
import org.testcontainers.containers.output.Slf4jLogConsumer;
38+
import org.testcontainers.lifecycle.Startables;
39+
40+
import java.io.File;
41+
import java.nio.charset.StandardCharsets;
42+
import java.nio.file.Files;
43+
import java.nio.file.Path;
44+
import java.nio.file.Paths;
45+
import java.nio.file.StandardOpenOption;
46+
import java.sql.Connection;
47+
import java.sql.Statement;
48+
import java.util.ArrayList;
49+
import java.util.Arrays;
50+
import java.util.Collections;
51+
import java.util.Iterator;
52+
import java.util.List;
53+
import java.util.Objects;
54+
import java.util.Random;
55+
import java.util.UUID;
56+
import java.util.stream.Stream;
57+
58+
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInAnyOrder;
59+
import static com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
60+
61+
/** Integration tests to check mysql-cdc works well with different MySQL server version. */
62+
public class MySqlCompatibilityITCase {
63+
64+
private static final Logger LOG = LoggerFactory.getLogger(MySqlCompatibilityITCase.class);
65+
66+
private static TemporaryFolder tempFolder;
67+
private static File resourceFolder;
68+
69+
private final StreamExecutionEnvironment env =
70+
StreamExecutionEnvironment.getExecutionEnvironment();
71+
private final StreamTableEnvironment tEnv =
72+
StreamTableEnvironment.create(
73+
env,
74+
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
75+
76+
@Before
77+
public void setup() throws Exception {
78+
resourceFolder =
79+
Paths.get(
80+
Objects.requireNonNull(
81+
MySqlValidatorTest.class
82+
.getClassLoader()
83+
.getResource("."))
84+
.toURI())
85+
.toFile();
86+
tempFolder = new TemporaryFolder(resourceFolder);
87+
tempFolder.create();
88+
env.setParallelism(4);
89+
env.enableCheckpointing(200);
90+
}
91+
92+
@Test
93+
public void testMySqlV56() throws Exception {
94+
testDifferentMySqlVersion(MySqlVersion.V5_6, false);
95+
}
96+
97+
@Test
98+
public void testMySqlV56WithGtidModeOn() throws Exception {
99+
testDifferentMySqlVersion(MySqlVersion.V5_6, true);
100+
}
101+
102+
@Test
103+
public void testMySqlV57() throws Exception {
104+
testDifferentMySqlVersion(MySqlVersion.V5_7, false);
105+
}
106+
107+
@Test
108+
public void testMySqlV57WithGtidModeOn() throws Exception {
109+
testDifferentMySqlVersion(MySqlVersion.V5_7, true);
110+
}
111+
112+
@Test
113+
public void testMySqlV8() throws Exception {
114+
testDifferentMySqlVersion(MySqlVersion.V8_0, false);
115+
}
116+
117+
@Test
118+
public void testMySqlV8WithGtidModeOn() throws Exception {
119+
testDifferentMySqlVersion(MySqlVersion.V8_0, true);
120+
}
121+
122+
private void testDifferentMySqlVersion(MySqlVersion version, boolean enableGtid)
123+
throws Exception {
124+
final MySqlContainer mySqlContainer =
125+
(MySqlContainer)
126+
new MySqlContainer(version)
127+
.withConfigurationOverride(
128+
buildMySqlConfigWithTimezone(version, enableGtid))
129+
.withSetupSQL("docker/setup.sql")
130+
.withDatabaseName("flink-test")
131+
.withUsername("flinkuser")
132+
.withPassword("flinkpw")
133+
.withLogConsumer(new Slf4jLogConsumer(LOG));
134+
135+
LOG.info("Starting containers...");
136+
Startables.deepStart(Stream.of(mySqlContainer)).join();
137+
LOG.info("Containers are started.");
138+
139+
UniqueDatabase testDatabase =
140+
new UniqueDatabase(mySqlContainer, "inventory", "mysqluser", "mysqlpw");
141+
testDatabase.createAndInitialize();
142+
143+
String sourceDDL =
144+
String.format(
145+
"CREATE TABLE products ("
146+
+ " `id` INT NOT NULL,"
147+
+ " name STRING,"
148+
+ " description STRING,"
149+
+ " weight DECIMAL(10,3),"
150+
+ " primary key (`id`) not enforced"
151+
+ ") WITH ("
152+
+ " 'connector' = 'mysql-cdc',"
153+
+ " 'hostname' = '%s',"
154+
+ " 'port' = '%s',"
155+
+ " 'username' = '%s',"
156+
+ " 'password' = '%s',"
157+
+ " 'database-name' = '%s',"
158+
+ " 'table-name' = '%s',"
159+
+ " 'server-id' = '%s'"
160+
+ ")",
161+
mySqlContainer.getHost(),
162+
mySqlContainer.getDatabasePort(),
163+
testDatabase.getUsername(),
164+
testDatabase.getPassword(),
165+
testDatabase.getDatabaseName(),
166+
"products",
167+
getServerId());
168+
tEnv.executeSql(sourceDDL);
169+
170+
// async submit job
171+
TableResult result =
172+
tEnv.executeSql("SELECT `id`, name, description, weight FROM products");
173+
174+
CloseableIterator<Row> iterator = result.collect();
175+
176+
String[] expectedSnapshot =
177+
new String[] {
178+
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
179+
"+I[102, car battery, 12V car battery, 8.100]",
180+
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
181+
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
182+
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
183+
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
184+
"+I[107, rocks, box of assorted rocks, 5.300]",
185+
"+I[108, jacket, water resistent black wind breaker, 0.100]",
186+
"+I[109, spare tire, 24 inch spare tire, 22.200]"
187+
};
188+
assertEqualsInAnyOrder(
189+
Arrays.asList(expectedSnapshot), fetchRows(iterator, expectedSnapshot.length));
190+
191+
try (Connection connection = testDatabase.getJdbcConnection();
192+
Statement statement = connection.createStatement()) {
193+
statement.execute(
194+
"UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
195+
statement.execute("UPDATE products SET weight='5.1' WHERE id=107;");
196+
statement.execute(
197+
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
198+
statement.execute(
199+
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
200+
statement.execute(
201+
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
202+
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
203+
statement.execute("DELETE FROM products WHERE id=111;");
204+
}
205+
206+
String[] expectedBinlog =
207+
new String[] {
208+
"-U[106, hammer, 16oz carpenter's hammer, 1.000]",
209+
"+U[106, hammer, 18oz carpenter hammer, 1.000]",
210+
"-U[107, rocks, box of assorted rocks, 5.300]",
211+
"+U[107, rocks, box of assorted rocks, 5.100]",
212+
"+I[110, jacket, water resistent white wind breaker, 0.200]",
213+
"+I[111, scooter, Big 2-wheel scooter , 5.180]",
214+
"-U[110, jacket, water resistent white wind breaker, 0.200]",
215+
"+U[110, jacket, new water resistent white wind breaker, 0.500]",
216+
"-U[111, scooter, Big 2-wheel scooter , 5.180]",
217+
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
218+
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
219+
};
220+
221+
assertEqualsInOrder(
222+
Arrays.asList(expectedBinlog), fetchRows(iterator, expectedBinlog.length));
223+
result.getJobClient().get().cancel().get();
224+
}
225+
226+
private String getServerId() {
227+
final Random random = new Random();
228+
int serverId = random.nextInt(100) + 5400;
229+
return serverId + "-" + (serverId + env.getParallelism());
230+
}
231+
232+
private static List<String> fetchRows(Iterator<Row> iter, int size) {
233+
List<String> rows = new ArrayList<>(size);
234+
while (size > 0 && iter.hasNext()) {
235+
Row row = iter.next();
236+
rows.add(row.toString());
237+
size--;
238+
}
239+
return rows;
240+
}
241+
242+
private String buildMySqlConfigWithTimezone(MySqlVersion version, boolean enableGtid) {
243+
try {
244+
File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
245+
Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
246+
StringBuilder mysqlConfBuilder = new StringBuilder();
247+
mysqlConfBuilder.append(
248+
"[mysqld]\n"
249+
+ "binlog_format = row\n"
250+
+ "log_bin = mysql-bin\n"
251+
+ "server-id = 223344\n"
252+
+ "binlog_row_image = FULL\n");
253+
if (!enableGtid) {
254+
mysqlConfBuilder.append("gtid-mode = OFF\n");
255+
} else {
256+
mysqlConfBuilder.append("gtid-mode = ON\n");
257+
mysqlConfBuilder.append("enforce-gtid-consistency = 1\n");
258+
// see
259+
// https://dev.mysql.com/doc/refman/5.7/en/replication-options-gtids.html#sysvar_gtid_mode
260+
if (version == MySqlVersion.V5_6 || version == MySqlVersion.V5_7) {
261+
mysqlConfBuilder.append("log-slave-updates = ON\n");
262+
}
263+
}
264+
265+
if (version == MySqlVersion.V8_0) {
266+
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
267+
}
268+
269+
Files.write(
270+
cnf,
271+
Collections.singleton(mysqlConfBuilder.toString()),
272+
StandardCharsets.UTF_8,
273+
StandardOpenOption.APPEND);
274+
return Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
275+
} catch (Exception e) {
276+
throw new RuntimeException("Failed to create my.cnf file.", e);
277+
}
278+
}
279+
}

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.testcontainers.containers.ContainerLaunchException;
2222
import org.testcontainers.containers.JdbcDatabaseContainer;
23+
import org.testcontainers.utility.DockerImageName;
2324

2425
import java.util.HashSet;
2526
import java.util.Set;
@@ -33,7 +34,6 @@
3334
public class MySqlContainer extends JdbcDatabaseContainer {
3435

3536
public static final String IMAGE = "mysql";
36-
public static final String DEFAULT_TAG = "5.7";
3737
public static final Integer MYSQL_PORT = 3306;
3838

3939
private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
@@ -45,11 +45,11 @@ public class MySqlContainer extends JdbcDatabaseContainer {
4545
private String password = "test";
4646

4747
public MySqlContainer() {
48-
this(DEFAULT_TAG);
48+
this(MySqlVersion.V5_7);
4949
}
5050

51-
public MySqlContainer(String tag) {
52-
super(IMAGE + ":" + tag);
51+
public MySqlContainer(MySqlVersion version) {
52+
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
5353
addExposedPort(MYSQL_PORT);
5454
}
5555

0 commit comments

Comments
 (0)