Skip to content

Commit d8470dc

Browse files
committed
[sqlserver] Fix backfill stream task hang
1 parent 54df7e9 commit d8470dc

File tree

4 files changed

+25
-20
lines changed

4 files changed

+25
-20
lines changed

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
274274
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
275275
// we start from [null, min + chunk_size) and avoid [null, min)
276276
splits.add(ChunkRange.of(chunkStart, chunkEnd));
277-
// may sleep a while to avoid DDOS on SqlServer server
277+
// may sleep awhile to avoid DDOS on SqlServer server
278278
maySleep(count++, tableId);
279279
chunkStart = chunkEnd;
280280
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,22 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask(
159159
// we should only capture events for the current table,
160160
// otherwise, we may can't find corresponding schema
161161
Configuration dezConf =
162-
context.getSourceConfig()
163-
.getDbzConfiguration()
162+
context.getDbzConnectorConfig()
163+
.getConfig()
164164
.edit()
165-
.with("table.include.list", split.getTableId().toString())
165+
// table.include.list is schema.table format
166+
.with(
167+
"table.include.list",
168+
new TableId(
169+
null,
170+
split.getTableId().schema(),
171+
split.getTableId().table()))
166172
// Disable heartbeat event in snapshot split fetcher
167173
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
168174
.build();
169175
// task to read binlog and backfill for current split
170176
return new LsnSplitReadTask(
171-
context.getDbzConnectorConfig(),
177+
new SqlServerConnectorConfig(dezConf),
172178
context.getConnection(),
173179
context.getMetaDataConnection(),
174180
context.getDispatcher(),

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;
1818

1919
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
20+
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
2021
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
2122
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
2223
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
2324
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
2425
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
2526
import io.debezium.DebeziumException;
27+
import io.debezium.connector.sqlserver.Lsn;
2628
import io.debezium.connector.sqlserver.SqlServerConnection;
2729
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
2830
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
@@ -36,7 +38,6 @@
3638
import org.slf4j.LoggerFactory;
3739

3840
import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
39-
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;
4041

4142
/** The task to work for fetching data of SqlServer table stream split . */
4243
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
@@ -121,19 +122,17 @@ public LsnSplitReadTask(
121122
}
122123

123124
@Override
124-
public void afterHandleLsn(
125-
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
125+
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
126126
// check do we need to stop for fetch binlog for snapshot split.
127127
if (isBoundedRead()) {
128-
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
129-
// reach the high watermark, the binlog fetcher should be finished
130-
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
128+
Offset endingOffset = lsnSplit.getEndingOffset();
129+
if (toLsn.compareTo(((LsnOffset) endingOffset).getLcn()) >= 0) {
131130
// send binlog end event
132131
try {
133132
dispatcher.dispatchWatermarkEvent(
134133
partition.getSourcePartition(),
135134
lsnSplit,
136-
currentRedoLogOffset,
135+
endingOffset,
137136
WatermarkKind.END);
138137
} catch (InterruptedException e) {
139138
LOG.error("Send signal event error.", e);

flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838

3939
/**
4040
* Copied from Debezium project(1.9.7.final) to add method {@link
41-
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
42-
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
43-
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
44-
* Server change data capture functionality. A main loop polls database DDL change and change data
45-
* tables and turns them into change events.
41+
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
42+
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
43+
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
44+
* capture functionality. A main loop polls database DDL change and change data tables and turns
45+
* them into change events.
4646
*
4747
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
4848
* monitors source table and write changes from the table into the change table.
@@ -198,7 +198,6 @@ public boolean executeIteration(
198198

199199
if (context.isRunning()) {
200200
commitTransaction();
201-
afterHandleLsn(partition, offsetContext);
202201
final Lsn toLsn =
203202
getToLsn(
204203
dataConnection,
@@ -444,6 +443,8 @@ public boolean executeIteration(
444443
clock));
445444
tableWithSmallestLsn.next();
446445
}
446+
// call after handle to judge whether to complete the stream
447+
afterHandleLsn(partition, toLsn);
447448
});
448449
streamingExecutionContext.setLastProcessedPosition(
449450
TxLogPosition.valueOf(toLsn));
@@ -625,8 +626,7 @@ private Lsn getToLsn(
625626
}
626627

627628
/** expose control to the user to stop the connector. */
628-
protected void afterHandleLsn(
629-
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
629+
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
630630
// do nothing
631631
}
632632
}

0 commit comments

Comments
 (0)