Skip to content

Commit b27d7f9

Browse files
authored
[sqlserver] Add e2e tests for sqlserver cdc connector (#837)
1 parent e70e0f1 commit b27d7f9

File tree

4 files changed

+274
-0
lines changed

4 files changed

+274
-0
lines changed

flink-cdc-e2e-tests/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ under the License.
7676
<type>test-jar</type>
7777
<scope>test</scope>
7878
</dependency>
79+
<dependency>
80+
<groupId>com.ververica</groupId>
81+
<artifactId>flink-connector-sqlserver-cdc</artifactId>
82+
<version>${project.version}</version>
83+
<type>test-jar</type>
84+
<scope>test</scope>
85+
</dependency>
7986
<dependency>
8087
<groupId>com.ververica</groupId>
8188
<artifactId>flink-connector-test-util</artifactId>
@@ -102,6 +109,12 @@ under the License.
102109
<version>${testcontainers.version}</version>
103110
<scope>test</scope>
104111
</dependency>
112+
<dependency>
113+
<groupId>org.testcontainers</groupId>
114+
<artifactId>mssqlserver</artifactId>
115+
<version>${testcontainers.version}</version>
116+
<scope>test</scope>
117+
</dependency>
105118
</dependencies>
106119

107120
<build>
@@ -218,6 +231,16 @@ under the License.
218231
<outputDirectory>${project.build.directory}/dependencies
219232
</outputDirectory>
220233
</artifactItem>
234+
235+
<artifactItem>
236+
<groupId>com.ververica</groupId>
237+
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
238+
<version>${project.version}</version>
239+
<destFileName>sqlserver-cdc-connector.jar</destFileName>
240+
<type>jar</type>
241+
<outputDirectory>${project.build.directory}/dependencies
242+
</outputDirectory>
243+
</artifactItem>
221244
</artifactItems>
222245
</configuration>
223246
</plugin>
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.ververica.cdc.connectors.tests;
20+
21+
import com.ververica.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
22+
import com.ververica.cdc.connectors.tests.utils.JdbcProxy;
23+
import com.ververica.cdc.connectors.tests.utils.TestUtils;
24+
import org.junit.Before;
25+
import org.junit.ClassRule;
26+
import org.junit.Test;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
import org.testcontainers.containers.MSSQLServerContainer;
30+
import org.testcontainers.containers.output.Slf4jLogConsumer;
31+
32+
import java.net.URL;
33+
import java.nio.file.Files;
34+
import java.nio.file.Path;
35+
import java.nio.file.Paths;
36+
import java.sql.Connection;
37+
import java.sql.DriverManager;
38+
import java.sql.SQLException;
39+
import java.sql.Statement;
40+
import java.time.Duration;
41+
import java.util.Arrays;
42+
import java.util.List;
43+
import java.util.regex.Matcher;
44+
import java.util.regex.Pattern;
45+
import java.util.stream.Collectors;
46+
47+
import static org.junit.Assert.assertNotNull;
48+
49+
/** End-to-end tests for sqlserver-cdc connector uber jar. */
50+
public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
51+
52+
private static final Logger LOG = LoggerFactory.getLogger(SqlServerE2eITCase.class);
53+
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
54+
private static final String INTER_CONTAINER_SQL_SERVER_ALIAS = "mssqlserver";
55+
private static final Path sqlServerCdcJar =
56+
TestUtils.getResource("sqlserver-cdc-connector.jar");
57+
private static final Path jdbcJar = TestUtils.getResource("jdbc-connector.jar");
58+
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
59+
60+
@ClassRule
61+
public static final MSSQLServerContainer MSSQL_SERVER_CONTAINER =
62+
new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
63+
.withPassword("Password!")
64+
.withEnv("MSSQL_AGENT_ENABLED", "true")
65+
.withEnv("MSSQL_PID", "Standard")
66+
.withNetwork(NETWORK)
67+
.withNetworkAliases(INTER_CONTAINER_SQL_SERVER_ALIAS)
68+
.withLogConsumer(new Slf4jLogConsumer(LOG));
69+
70+
@Before
71+
public void before() {
72+
super.before();
73+
initializeSqlServerTable("sqlserver_inventory");
74+
}
75+
76+
@Test
77+
public void testSqlServerCDC() throws Exception {
78+
List<String> sqlLines =
79+
Arrays.asList(
80+
"CREATE TABLE products_source (",
81+
" `id` INT NOT NULL,",
82+
" name STRING,",
83+
" description STRING,",
84+
" weight DECIMAL(10,3),",
85+
" primary key (`id`) not enforced",
86+
") WITH (",
87+
" 'connector' = 'sqlserver-cdc',",
88+
" 'hostname' = '" + INTER_CONTAINER_SQL_SERVER_ALIAS + "',",
89+
" 'port' = '" + MSSQL_SERVER_CONTAINER.MS_SQL_SERVER_PORT + "',",
90+
" 'username' = '" + MSSQL_SERVER_CONTAINER.getUsername() + "',",
91+
" 'password' = '" + MSSQL_SERVER_CONTAINER.getPassword() + "',",
92+
" 'database-name' = 'inventory',",
93+
" 'schema-name' = 'dbo',",
94+
" 'table-name' = 'products'",
95+
");",
96+
"CREATE TABLE products_sink (",
97+
" `id` INT NOT NULL,",
98+
" name STRING,",
99+
" description STRING,",
100+
" weight DECIMAL(10,3),",
101+
" primary key (`id`) not enforced",
102+
") WITH (",
103+
" 'connector' = 'jdbc',",
104+
String.format(
105+
" 'url' = 'jdbc:mysql://%s:3306/%s',",
106+
INTER_CONTAINER_MYSQL_ALIAS,
107+
mysqlInventoryDatabase.getDatabaseName()),
108+
" 'table-name' = 'products_sink',",
109+
" 'username' = '" + MYSQL_TEST_USER + "',",
110+
" 'password' = '" + MYSQL_TEST_PASSWORD + "'",
111+
");",
112+
"INSERT INTO products_sink",
113+
"SELECT * FROM products_source;");
114+
115+
submitSQLJob(sqlLines, sqlServerCdcJar, jdbcJar, mysqlDriverJar);
116+
waitUntilJobRunning(Duration.ofSeconds(30));
117+
118+
// generate binlogs
119+
try (Connection conn = getSqlServerJdbcConnection();
120+
Statement statement = conn.createStatement()) {
121+
122+
statement.execute(
123+
"UPDATE inventory.dbo.products SET description='18oz carpenter hammer' WHERE id=106;");
124+
statement.execute("UPDATE inventory.dbo.products SET weight='5.1' WHERE id=107;");
125+
statement.execute(
126+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
127+
statement.execute(
128+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
129+
statement.execute(
130+
"UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
131+
statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;");
132+
statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;");
133+
} catch (SQLException e) {
134+
LOG.error("Update table for CDC failed.", e);
135+
throw e;
136+
}
137+
138+
// assert final results
139+
String mysqlUrl =
140+
String.format(
141+
"jdbc:mysql://%s:%s/%s",
142+
MYSQL.getHost(),
143+
MYSQL.getDatabasePort(),
144+
mysqlInventoryDatabase.getDatabaseName());
145+
JdbcProxy proxy =
146+
new JdbcProxy(mysqlUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD, MYSQL_DRIVER_CLASS);
147+
List<String> expectResult =
148+
Arrays.asList(
149+
"101,scooter,Small 2-wheel scooter,3.14",
150+
"102,car battery,12V car battery,8.1",
151+
"103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.8",
152+
"104,hammer,12oz carpenter's hammer,0.75",
153+
"105,hammer,14oz carpenter's hammer,0.875",
154+
"106,hammer,18oz carpenter hammer,1.0",
155+
"107,rocks,box of assorted rocks,5.1",
156+
"108,jacket,water resistent black wind breaker,0.1",
157+
"109,spare tire,24 inch spare tire,22.2",
158+
"110,jacket,new water resistent white wind breaker,0.5");
159+
proxy.checkResultWithTimeout(
160+
expectResult,
161+
"products_sink",
162+
new String[] {"id", "name", "description", "weight"},
163+
60000L);
164+
}
165+
166+
private void initializeSqlServerTable(String sqlFile) {
167+
final String ddlFile = String.format("ddl/%s.sql", sqlFile);
168+
final URL ddlTestFile = SqlServerE2eITCase.class.getClassLoader().getResource(ddlFile);
169+
assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
170+
try (Connection connection = getSqlServerJdbcConnection();
171+
Statement statement = connection.createStatement()) {
172+
final List<String> statements =
173+
Arrays.stream(
174+
Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream()
175+
.map(String::trim)
176+
.filter(x -> !x.startsWith("--") && !x.isEmpty())
177+
.map(
178+
x -> {
179+
final Matcher m =
180+
COMMENT_PATTERN.matcher(x);
181+
return m.matches() ? m.group(1) : x;
182+
})
183+
.collect(Collectors.joining("\n"))
184+
.split(";"))
185+
.collect(Collectors.toList());
186+
for (String stmt : statements) {
187+
statement.execute(stmt);
188+
}
189+
} catch (Exception e) {
190+
throw new RuntimeException(e);
191+
}
192+
}
193+
194+
private Connection getSqlServerJdbcConnection() throws SQLException {
195+
return DriverManager.getConnection(
196+
MSSQL_SERVER_CONTAINER.getJdbcUrl(),
197+
MSSQL_SERVER_CONTAINER.getUsername(),
198+
MSSQL_SERVER_CONTAINER.getPassword());
199+
}
200+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mcr.microsoft.com/mssql/server:2019-latest
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
-- Unless required by applicable law or agreed to in writing,
10+
-- software distributed under the License is distributed on an
11+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
12+
-- KIND, either express or implied. See the License for the
13+
-- specific language governing permissions and limitations
14+
-- under the License.
15+
16+
-- ----------------------------------------------------------------------------------------------------------------
17+
-- DATABASE: inventory
18+
-- ----------------------------------------------------------------------------------------------------------------
19+
-- Create the inventory database
20+
CREATE DATABASE inventory;
21+
22+
USE inventory;
23+
EXEC sys.sp_cdc_enable_db;
24+
25+
-- Create and populate our products using a single insert with many rows
26+
CREATE TABLE products (
27+
id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY,
28+
name VARCHAR(255) NOT NULL,
29+
description VARCHAR(512),
30+
weight FLOAT
31+
);
32+
INSERT INTO products(name,description,weight)
33+
VALUES ('scooter','Small 2-wheel scooter',3.14);
34+
INSERT INTO products(name,description,weight)
35+
VALUES ('car battery','12V car battery',8.1);
36+
INSERT INTO products(name,description,weight)
37+
VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
38+
INSERT INTO products(name,description,weight)
39+
VALUES ('hammer','12oz carpenter''s hammer',0.75);
40+
INSERT INTO products(name,description,weight)
41+
VALUES ('hammer','14oz carpenter''s hammer',0.875);
42+
INSERT INTO products(name,description,weight)
43+
VALUES ('hammer','16oz carpenter''s hammer',1.0);
44+
INSERT INTO products(name,description,weight)
45+
VALUES ('rocks','box of assorted rocks',5.3);
46+
INSERT INTO products(name,description,weight)
47+
VALUES ('jacket','water resistent black wind breaker',0.1);
48+
INSERT INTO products(name,description,weight)
49+
VALUES ('spare tire','24 inch spare tire',22.2);
50+
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;

0 commit comments

Comments
 (0)