Skip to content

Commit 4866875

Browse files
committed
[sqlserver] Fix old change data that will be captured when the latest mode starts
1 parent 8cef4af commit 4866875

File tree

4 files changed

+113
-4
lines changed

4 files changed

+113
-4
lines changed

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
public class LsnFactory extends OffsetFactory {
2828
@Override
2929
public Offset newOffset(Map<String, String> offset) {
30-
return new LsnOffset(Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY)));
30+
Lsn changeLsn = Lsn.valueOf(offset.get(SourceInfo.CHANGE_LSN_KEY));
31+
Lsn commitLsn = Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
32+
return new LsnOffset(changeLsn, commitLsn, null);
3133
}
3234

3335
@Override

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ private SqlServerOffsetContext loadStartingOffsetState(
163163
: sourceSplitBase.asStreamSplit().getStartingOffset();
164164

165165
SqlServerOffsetContext sqlServerOffsetContext = loader.load(offset.getOffset());
166+
sqlServerOffsetContext.preSnapshotCompletion();
166167
return sqlServerOffsetContext;
167168
}
168169

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/utils/SqlServerUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,16 @@ public static LsnOffset getLsnPosition(Map<String, ?> offset) {
192192
offsetStrMap.put(
193193
entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
194194
}
195-
return new LsnOffset(Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY)));
195+
Lsn changeLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.CHANGE_LSN_KEY));
196+
Lsn commitLsn = Lsn.valueOf(offsetStrMap.get(SourceInfo.COMMIT_LSN_KEY));
197+
return new LsnOffset(changeLsn, commitLsn, null);
196198
}
197199

198200
/** Fetch current largest log sequence number (LSN) of the database. */
199201
public static LsnOffset currentLsn(SqlServerConnection connection) {
200202
try {
201203
Lsn maxLsn = connection.getMaxLsn();
202-
return new LsnOffset(maxLsn);
204+
return new LsnOffset(maxLsn, maxLsn, null);
203205
} catch (SQLException e) {
204206
throw new FlinkRuntimeException(e.getMessage(), e);
205207
}

flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerConnectorITCase.java

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.junit.Before;
2828
import org.junit.ClassRule;
2929
import org.junit.Test;
30+
import org.junit.runner.RunWith;
31+
import org.junit.runners.Parameterized;
3032

3133
import java.sql.Connection;
3234
import java.sql.SQLException;
@@ -36,12 +38,14 @@
3638
import java.util.List;
3739
import java.util.concurrent.ExecutionException;
3840

41+
import static org.apache.flink.api.common.JobStatus.RUNNING;
3942
import static org.hamcrest.Matchers.containsInAnyOrder;
4043
import static org.junit.Assert.assertEquals;
4144
import static org.junit.Assert.assertThat;
4245
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
4346

4447
/** Integration tests for SqlServer Table source. */
48+
@RunWith(Parameterized.class)
4549
public class SqlServerConnectorITCase extends SqlServerTestBase {
4650

4751
private final StreamExecutionEnvironment env =
@@ -52,10 +56,28 @@ public class SqlServerConnectorITCase extends SqlServerTestBase {
5256

5357
@ClassRule public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
5458

59+
// enable the parallelismSnapshot (i.e: The new source OracleParallelSource)
60+
private final boolean parallelismSnapshot;
61+
62+
public SqlServerConnectorITCase(boolean parallelismSnapshot) {
63+
this.parallelismSnapshot = parallelismSnapshot;
64+
}
65+
66+
@Parameterized.Parameters(name = "parallelismSnapshot: {0}")
67+
public static Object[] parameters() {
68+
return new Object[][] {new Object[] {false}, new Object[] {true}};
69+
}
70+
5571
@Before
5672
public void before() {
5773
TestValuesTableFactory.clearAllData();
58-
env.setParallelism(1);
74+
75+
if (parallelismSnapshot) {
76+
env.setParallelism(4);
77+
env.enableCheckpointing(200);
78+
} else {
79+
env.setParallelism(1);
80+
}
5981
}
6082

6183
@Test
@@ -75,13 +97,15 @@ public void testConsumingAllEvents()
7597
+ " 'port' = '%s',"
7698
+ " 'username' = '%s',"
7799
+ " 'password' = '%s',"
100+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
78101
+ " 'database-name' = '%s',"
79102
+ " 'table-name' = '%s'"
80103
+ ")",
81104
MSSQL_SERVER_CONTAINER.getHost(),
82105
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
83106
MSSQL_SERVER_CONTAINER.getUsername(),
84107
MSSQL_SERVER_CONTAINER.getPassword(),
108+
parallelismSnapshot,
85109
"inventory",
86110
"dbo.products");
87111
String sinkDDL =
@@ -160,6 +184,82 @@ public void testConsumingAllEvents()
160184
result.getJobClient().get().cancel().get();
161185
}
162186

187+
@Test
188+
public void testStartupFromLatestOffset() throws Exception {
189+
initializeSqlServerTable("inventory");
190+
191+
Connection connection = getJdbcConnection();
192+
Statement statement = connection.createStatement();
193+
194+
// The following two change records will be discarded in the 'latest-offset' mode
195+
statement.execute(
196+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('jacket','water resistent white wind breaker',0.2);"); // 110
197+
statement.execute(
198+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
199+
Thread.sleep(5000L);
200+
201+
String sourceDDL =
202+
String.format(
203+
"CREATE TABLE debezium_source ("
204+
+ " id INT NOT NULL,"
205+
+ " name STRING,"
206+
+ " description STRING,"
207+
+ " weight DECIMAL(10,3)"
208+
+ ") WITH ("
209+
+ " 'connector' = 'sqlserver-cdc',"
210+
+ " 'hostname' = '%s',"
211+
+ " 'port' = '%s',"
212+
+ " 'username' = '%s',"
213+
+ " 'password' = '%s',"
214+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
215+
+ " 'database-name' = '%s',"
216+
+ " 'table-name' = '%s',"
217+
+ " 'scan.startup.mode' = 'latest-offset'"
218+
+ ")",
219+
MSSQL_SERVER_CONTAINER.getHost(),
220+
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
221+
MSSQL_SERVER_CONTAINER.getUsername(),
222+
MSSQL_SERVER_CONTAINER.getPassword(),
223+
parallelismSnapshot,
224+
"inventory",
225+
"dbo.products");
226+
String sinkDDL =
227+
"CREATE TABLE sink "
228+
+ " WITH ("
229+
+ " 'connector' = 'values',"
230+
+ " 'sink-insert-only' = 'false'"
231+
+ ") LIKE debezium_source (EXCLUDING OPTIONS)";
232+
tEnv.executeSql(sourceDDL);
233+
tEnv.executeSql(sinkDDL);
234+
235+
// async submit job
236+
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
237+
238+
// wait for the source startup, we don't have a better way to wait it, use sleep for now
239+
do {
240+
Thread.sleep(5000L);
241+
} while (result.getJobClient().get().getJobStatus().get() != RUNNING);
242+
Thread.sleep(30000L);
243+
244+
statement.execute(
245+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('hammer','18oz carpenters hammer',1.2);");
246+
statement.execute(
247+
"INSERT INTO inventory.dbo.products (name,description,weight) VALUES ('scooter','Big 3-wheel scooter',5.20);");
248+
249+
waitForSinkSize("sink", 2);
250+
251+
String[] expected =
252+
new String[] {
253+
"112,hammer,18oz carpenters hammer,1.200",
254+
"113,scooter,Big 3-wheel scooter,5.200"
255+
};
256+
257+
List<String> actual = TestValuesTableFactory.getResults("sink");
258+
assertThat(actual, containsInAnyOrder(expected));
259+
260+
result.getJobClient().get().cancel().get();
261+
}
262+
163263
@Test
164264
public void testAllTypes() throws Throwable {
165265
initializeSqlServerTable("column_type_test");
@@ -199,13 +299,15 @@ public void testAllTypes() throws Throwable {
199299
+ " 'port' = '%s',"
200300
+ " 'username' = '%s',"
201301
+ " 'password' = '%s',"
302+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
202303
+ " 'database-name' = '%s',"
203304
+ " 'table-name' = '%s'"
204305
+ ")",
205306
MSSQL_SERVER_CONTAINER.getHost(),
206307
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
207308
MSSQL_SERVER_CONTAINER.getUsername(),
208309
MSSQL_SERVER_CONTAINER.getPassword(),
310+
parallelismSnapshot,
209311
"column_type_test",
210312
"dbo.full_types");
211313
String sinkDDL =
@@ -288,13 +390,15 @@ public void testMetadataColumns() throws Throwable {
288390
+ " 'port' = '%s',"
289391
+ " 'username' = '%s',"
290392
+ " 'password' = '%s',"
393+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
291394
+ " 'database-name' = '%s',"
292395
+ " 'table-name' = '%s'"
293396
+ ")",
294397
MSSQL_SERVER_CONTAINER.getHost(),
295398
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
296399
MSSQL_SERVER_CONTAINER.getUsername(),
297400
MSSQL_SERVER_CONTAINER.getPassword(),
401+
parallelismSnapshot,
298402
"inventory",
299403
"dbo.products");
300404

0 commit comments

Comments
 (0)