Skip to content

Commit ad07a06

Browse files
committed
update tests
1 parent 5455ca1 commit ad07a06

File tree

1 file changed

+31
-70
lines changed

1 file changed

+31
-70
lines changed

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java

Lines changed: 31 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -485,16 +485,18 @@ public void testWideTable() throws Exception {
485485

486486
@Test
487487
public void testMetadataColumns() throws Exception {
488-
customerDatabase.createAndInitialize();
488+
userDatabase1.createAndInitialize();
489489
String sourceDDL =
490490
String.format(
491-
"CREATE TABLE customer_source ("
491+
"CREATE TABLE mysql_users ("
492492
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
493493
+ " table_name STRING METADATA VIRTUAL,"
494-
+ " `id` INT NOT NULL,"
494+
+ " `id` DECIMAL(20, 0) NOT NULL,"
495495
+ " name STRING,"
496496
+ " address STRING,"
497497
+ " phone_number STRING,"
498+
+ " email STRING,"
499+
+ " age INT,"
498500
+ " primary key (`id`) not enforced"
499501
+ ") WITH ("
500502
+ " 'connector' = 'mysql-cdc',"
@@ -511,22 +513,25 @@ public void testMetadataColumns() throws Exception {
511513
+ ")",
512514
MYSQL_CONTAINER.getHost(),
513515
MYSQL_CONTAINER.getDatabasePort(),
514-
TEST_USER,
515-
TEST_PASSWORD,
516-
customerDatabase.getDatabaseName(),
517-
"customers.*",
516+
userDatabase1.getUsername(),
517+
userDatabase1.getPassword(),
518+
userDatabase1.getDatabaseName(),
519+
"user_table_.*",
518520
getDezImplementation(),
519521
incrementalSnapshot,
520522
getServerId(),
521523
getSplitSize());
524+
522525
String sinkDDL =
523526
"CREATE TABLE sink ("
524527
+ " database_name STRING,"
525528
+ " table_name STRING,"
526-
+ " `id` INT NOT NULL,"
529+
+ " `id` DECIMAL(20, 0) NOT NULL,"
527530
+ " name STRING,"
528531
+ " address STRING,"
529532
+ " phone_number STRING,"
533+
+ " email STRING,"
534+
+ " age INT,"
530535
+ " primary key (database_name, table_name, id) not enforced"
531536
+ ") WITH ("
532537
+ " 'connector' = 'values',"
@@ -536,80 +541,36 @@ public void testMetadataColumns() throws Exception {
536541
tEnv.executeSql(sinkDDL);
537542

538543
// async submit job
539-
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM customer_source");
544+
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users");
540545

541546
// wait for snapshot finished and begin binlog
542-
waitForSinkSize("sink", 42);
547+
waitForSinkSize("sink", 2);
543548

544-
try (Connection connection = customerDatabase.getJdbcConnection();
549+
try (Connection connection = userDatabase1.getJdbcConnection();
545550
Statement statement = connection.createStatement()) {
546551

547-
// TODO: we execute an additional DELETE statement to make incremental-snapshot
548-
// mode pass, in that mode, the first statement will be ignored which should be a bug!
549-
statement.execute("DELETE FROM shopping_cart WHERE product_no=101;");
550-
551-
statement.execute("UPDATE customers SET address='Beijing' WHERE id=110;");
552552
statement.execute(
553-
"INSERT INTO customers_1 VALUES (4000,'user_44','Wuhan',123567891234);");
553+
"INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);");
554554
statement.execute(
555-
"INSERT INTO customers VALUES (3000,'user_33','Hangzhou',123567891234);"); // 110
556-
statement.execute("UPDATE customers_1 SET phone_number=88888888 WHERE id=4000;");
557-
statement.execute("DELETE FROM customers_1 WHERE id=2000;");
555+
"INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, '[email protected]');");
556+
statement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;");
557+
statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;");
558+
statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
558559
}
559560

560561
// waiting for binlog finished (5 more events)
561-
waitForSinkSize("sink", 47);
562+
waitForSinkSize("sink", 7);
562563

563564
List<String> expected =
564565
Stream.of(
565-
"+I[%s, customers, 101, user_1, Shanghai, 123567891234]",
566-
"+I[%s, customers, 102, user_2, Shanghai, 123567891234]",
567-
"+I[%s, customers, 103, user_3, Shanghai, 123567891234]",
568-
"+I[%s, customers, 109, user_4, Shanghai, 123567891234]",
569-
"+I[%s, customers, 110, user_5, Shanghai, 123567891234]",
570-
"+I[%s, customers, 111, user_6, Shanghai, 123567891234]",
571-
"+I[%s, customers, 118, user_7, Shanghai, 123567891234]",
572-
"+I[%s, customers, 121, user_8, Shanghai, 123567891234]",
573-
"+I[%s, customers, 123, user_9, Shanghai, 123567891234]",
574-
"+I[%s, customers, 1009, user_10, Shanghai, 123567891234]",
575-
"+I[%s, customers, 1010, user_11, Shanghai, 123567891234]",
576-
"+I[%s, customers, 1011, user_12, Shanghai, 123567891234]",
577-
"+I[%s, customers, 1012, user_13, Shanghai, 123567891234]",
578-
"+I[%s, customers, 1013, user_14, Shanghai, 123567891234]",
579-
"+I[%s, customers, 1014, user_15, Shanghai, 123567891234]",
580-
"+I[%s, customers, 1015, user_16, Shanghai, 123567891234]",
581-
"+I[%s, customers, 1016, user_17, Shanghai, 123567891234]",
582-
"+I[%s, customers, 1017, user_18, Shanghai, 123567891234]",
583-
"+I[%s, customers, 1018, user_19, Shanghai, 123567891234]",
584-
"+I[%s, customers, 1019, user_20, Shanghai, 123567891234]",
585-
"+I[%s, customers, 2000, user_21, Shanghai, 123567891234]",
586-
"+I[%s, customers_1, 101, user_1, Shanghai, 123567891234]",
587-
"+I[%s, customers_1, 102, user_2, Shanghai, 123567891234]",
588-
"+I[%s, customers_1, 103, user_3, Shanghai, 123567891234]",
589-
"+I[%s, customers_1, 109, user_4, Shanghai, 123567891234]",
590-
"+I[%s, customers_1, 110, user_5, Shanghai, 123567891234]",
591-
"+I[%s, customers_1, 111, user_6, Shanghai, 123567891234]",
592-
"+I[%s, customers_1, 118, user_7, Shanghai, 123567891234]",
593-
"+I[%s, customers_1, 121, user_8, Shanghai, 123567891234]",
594-
"+I[%s, customers_1, 123, user_9, Shanghai, 123567891234]",
595-
"+I[%s, customers_1, 1009, user_10, Shanghai, 123567891234]",
596-
"+I[%s, customers_1, 1010, user_11, Shanghai, 123567891234]",
597-
"+I[%s, customers_1, 1011, user_12, Shanghai, 123567891234]",
598-
"+I[%s, customers_1, 1012, user_13, Shanghai, 123567891234]",
599-
"+I[%s, customers_1, 1013, user_14, Shanghai, 123567891234]",
600-
"+I[%s, customers_1, 1014, user_15, Shanghai, 123567891234]",
601-
"+I[%s, customers_1, 1015, user_16, Shanghai, 123567891234]",
602-
"+I[%s, customers_1, 1016, user_17, Shanghai, 123567891234]",
603-
"+I[%s, customers_1, 1017, user_18, Shanghai, 123567891234]",
604-
"+I[%s, customers_1, 1018, user_19, Shanghai, 123567891234]",
605-
"+I[%s, customers_1, 1019, user_20, Shanghai, 123567891234]",
606-
"+I[%s, customers_1, 2000, user_21, Shanghai, 123567891234]",
607-
"+U[%s, customers, 110, user_5, Beijing, 123567891234]",
608-
"+I[%s, customers_1, 4000, user_44, Wuhan, 123567891234]",
609-
"+I[%s, customers, 3000, user_33, Hangzhou, 123567891234]",
610-
"+U[%s, customers_1, 4000, user_44, Wuhan, 88888888]",
611-
"-D[%s, customers_1, 2000, user_21, Shanghai, 123567891234]")
612-
.map(s -> String.format(s, customerDatabase.getDatabaseName()))
566+
"+I[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, [email protected], null]",
567+
"+I[%s, user_table_1_2, 121, user_121, Shanghai, 123567891234, null, null]",
568+
"+I[%s, user_table_1_2, 200, user_200, Wuhan, 123567891234, null, null]",
569+
"+I[%s, user_table_1_1, 300, user_300, Hangzhou, 123567891234, [email protected], null]",
570+
"+U[%s, user_table_1_1, 300, user_300, Beijing, 123567891234, [email protected], null]",
571+
"+U[%s, user_table_1_2, 121, user_121, Shanghai, 88888888, null, null]",
572+
"-D[%s, user_table_1_1, 111, user_111, Shanghai, 123567891234, [email protected], null]")
573+
.map(s -> String.format(s, userDatabase1.getDatabaseName()))
613574
.sorted()
614575
.collect(Collectors.toList());
615576

@@ -765,7 +726,7 @@ public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
765726
}
766727

767728
@Test
768-
public void testInconsistentSchema() throws Exception {
729+
public void testShadingTablesWithInconsistentSchema() throws Exception {
769730
userDatabase1.createAndInitialize();
770731
userDatabase2.createAndInitialize();
771732
String sourceDDL =

0 commit comments

Comments
 (0)