Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.util.TestLogger;

import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
Expand All @@ -45,15 +46,7 @@ public abstract class MySqlSourceTestBase extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceTestBase.class);

protected static final int DEFAULT_PARALLELISM = 4;
protected static final MySqlContainer MYSQL_CONTAINER =
(MySqlContainer)
new MySqlContainer()
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);

@Rule
public final MiniClusterWithClientResource miniClusterResource =
Expand All @@ -72,6 +65,17 @@ public static void startContainers() {
LOG.info("Containers are started.");
}

protected static MySqlContainer createMySqlContainer(MySqlVersion version) {
return (MySqlContainer)
new MySqlContainer(version)
.withConfigurationOverride("docker/server-gtids/my.cnf")
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
}

public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
assertTrue(expected != null && actual != null);
assertEqualsInOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,18 @@
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;

import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer;
import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion;
import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.lifecycle.Startables;

import java.sql.Connection;
import java.sql.Statement;
Expand All @@ -56,14 +62,22 @@
@RunWith(Parameterized.class)
public class MySqlConnectorITCase extends MySqlSourceTestBase {

private static final Logger LOG = LoggerFactory.getLogger(MySqlConnectorITCase.class);

private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private static final MySqlContainer MYSQL8_CONTAINER =
(MySqlContainer) createMySqlContainer(MySqlVersion.V8_0).withExposedPorts(3307);

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD);

private final UniqueDatabase fullTypesDatabase =
private final UniqueDatabase fullTypesMySql57Database =
new UniqueDatabase(MYSQL_CONTAINER, "column_type_test", TEST_USER, TEST_PASSWORD);
private final UniqueDatabase fullTypesMySql8Database =
new UniqueDatabase(
MYSQL8_CONTAINER, "column_type_test_mysql8", TEST_USER, TEST_PASSWORD);

private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
Expand Down Expand Up @@ -101,6 +115,13 @@ public static Object[] parameters() {
};
}

@BeforeClass
public static void beforeClass() {
LOG.info("Starting MySql8 containers...");
Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
LOG.info("Container MySql8 is started.");
}

@Before
public void before() {
TestValuesTableFactory.clearAllData();
Expand Down Expand Up @@ -312,8 +333,18 @@ public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception {
}

@Test
public void testAllTypes() throws Throwable {
fullTypesDatabase.createAndInitialize();
public void testMysql57AllDataTypes() throws Throwable {
testAllDataTypes(MYSQL_CONTAINER, fullTypesMySql57Database);
}

@Test
public void testMySql8AllDataTypes() throws Throwable {
testAllDataTypes(MYSQL8_CONTAINER, fullTypesMySql8Database);
}

public void testAllDataTypes(MySqlContainer mySqlContainer, UniqueDatabase database)
throws Throwable {
database.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE full_types (\n"
Expand Down Expand Up @@ -378,11 +409,11 @@ public void testAllTypes() throws Throwable {
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
fullTypesDatabase.getUsername(),
fullTypesDatabase.getPassword(),
fullTypesDatabase.getDatabaseName(),
mySqlContainer.getHost(),
mySqlContainer.getDatabasePort(),
database.getUsername(),
database.getPassword(),
database.getDatabaseName(),
"full_types",
getDezImplementation(),
incrementalSnapshot,
Expand Down Expand Up @@ -445,7 +476,7 @@ public void testAllTypes() throws Throwable {
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);

try (Connection connection = fullTypesDatabase.getJdbcConnection();
try (Connection connection = database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"UPDATE full_types SET timestamp_c = '2020-07-17 18:33:22' WHERE id=1;");
Expand Down Expand Up @@ -537,10 +568,10 @@ public void testAllTypes() throws Throwable {
@Test
public void testWideTable() throws Exception {
final int tableColumnCount = 500;
fullTypesDatabase.createAndInitialize();
try (Connection connection = fullTypesDatabase.getJdbcConnection();
fullTypesMySql57Database.createAndInitialize();
try (Connection connection = fullTypesMySql57Database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s", fullTypesDatabase.getDatabaseName()));
statement.execute(String.format("USE %s", fullTypesMySql57Database.getDatabaseName()));
statement.execute(
"CREATE TABLE wide_table("
+ buildColumnsDDL("col", 0, tableColumnCount, "BIGINT")
Expand Down Expand Up @@ -572,9 +603,9 @@ public void testWideTable() throws Exception {
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
fullTypesDatabase.getUsername(),
fullTypesDatabase.getPassword(),
fullTypesDatabase.getDatabaseName(),
fullTypesMySql57Database.getUsername(),
fullTypesMySql57Database.getPassword(),
fullTypesMySql57Database.getDatabaseName(),
"wide_table",
getDezImplementation(),
incrementalSnapshot,
Expand All @@ -588,7 +619,7 @@ public void testWideTable() throws Exception {
CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);

try (Connection connection = fullTypesDatabase.getJdbcConnection();
try (Connection connection = fullTypesMySql57Database.getJdbcConnection();
Statement statement = connection.createStatement()) {

statement.execute("UPDATE wide_table SET col1 = 1024 WHERE col0=0;");
Expand All @@ -613,10 +644,10 @@ public void testBigTableWithHugeSplits() throws Exception {
return;
}
final int tableRowNumber = 10;
fullTypesDatabase.createAndInitialize();
try (Connection connection = fullTypesDatabase.getJdbcConnection();
fullTypesMySql57Database.createAndInitialize();
try (Connection connection = fullTypesMySql57Database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute(String.format("USE %s", fullTypesDatabase.getDatabaseName()));
statement.execute(String.format("USE %s", fullTypesMySql57Database.getDatabaseName()));
statement.execute(
"CREATE TABLE big_table1(id BIGINT, str VARCHAR(100), PRIMARY KEY (id))");
statement.execute(
Expand Down Expand Up @@ -648,9 +679,9 @@ public void testBigTableWithHugeSplits() throws Exception {
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
fullTypesDatabase.getUsername(),
fullTypesDatabase.getPassword(),
fullTypesDatabase.getDatabaseName(),
fullTypesMySql57Database.getUsername(),
fullTypesMySql57Database.getPassword(),
fullTypesMySql57Database.getDatabaseName(),
getServerId());
String sinkDDL =
"CREATE TABLE sink ("
Expand All @@ -670,7 +701,7 @@ public void testBigTableWithHugeSplits() throws Exception {
// wait for snapshot finished and begin binlog
waitForSinkSize("sink", tableRowNumber * 2);

try (Connection connection = fullTypesDatabase.getJdbcConnection();
try (Connection connection = fullTypesMySql57Database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE big_table1 SET str = '1024' WHERE id=0;");
statement.execute("UPDATE big_table1 SET str = '1025' WHERE id=1;");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: column_type_test
-- ----------------------------------------------------------------------------------------------------------------

CREATE TABLE full_types (
id INT AUTO_INCREMENT NOT NULL,
tiny_c TINYINT,
tiny_un_c TINYINT UNSIGNED,
small_c SMALLINT,
small_un_c SMALLINT UNSIGNED,
medium_c MEDIUMINT,
medium_un_c MEDIUMINT UNSIGNED,
int_c INTEGER ,
int_un_c INTEGER UNSIGNED,
int11_c INT(11) ,
big_c BIGINT,
big_un_c BIGINT UNSIGNED,
varchar_c VARCHAR(255),
char_c CHAR(3),
real_c REAL,
float_c FLOAT,
double_c DOUBLE,
decimal_c DECIMAL(8, 4),
numeric_c NUMERIC(6, 0),
big_decimal_c DECIMAL(65, 1),
bit1_c BIT,
tiny1_c TINYINT(1),
boolean_c BOOLEAN,
date_c DATE,
time_c TIME(0),
datetime3_c DATETIME(3),
datetime6_c DATETIME(6),
timestamp_c TIMESTAMP,
file_uuid BINARY(16),
bit_c BIT(64),
text_c TEXT,
tiny_blob_c TINYBLOB,
blob_c BLOB,
medium_blob_c MEDIUMBLOB,
long_blob_c LONGBLOB,
year_c YEAR,
enum_c enum('red', 'white') default 'red',
set_c SET('a', 'b'),
json_c JSON,
point_c POINT,
geometry_c GEOMETRY,
linestring_c LINESTRING,
polygon_c POLYGON,
multipoint_c MULTIPOINT,
multiline_c MULTILINESTRING,
multipolygon_c MULTIPOLYGON,
geometrycollection_c GEOMCOLLECTION,
PRIMARY KEY (id)
) DEFAULT CHARSET=utf8;

INSERT INTO full_types VALUES (
DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807,
18446744073709551615,
'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true,
'2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22',
unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100',
'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021,
'red', 'a,b,a', '{"key1": "value1"}',
ST_GeomFromText('POINT(1 1)'),
ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'),
ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'),
ST_GeomFromText('MULTIPOINT((1 1),(2 2))'),
ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'),
ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'),
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))')
Copy link
Contributor Author

@luoyuxia luoyuxia Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although GEOMCOLLECTION is synonymous with GEOMETRYCOLLECTION in mysql8.0, but it still needs to use GEOMETRYCOLLECTION when insert such data, otherwise it'll throw invalid gis data exception.

);
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/var/lib/mysql-files doesn't exist in mysql8.0, so change it to /var/lib/mysql

secure-file-priv=/var/lib/mysql
user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
secure-file-priv=/var/lib/mysql
user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks
Expand Down