Skip to content

Commit 6fa02ef

Browse files
authored
[hotfix][sqlserver] Fix backfill stream task hang (apache#2374)
* [sqlserver] Fix backfill stream task hang * [sqlserver] Fix backfill stream task hang
1 parent 8e7378d commit 6fa02ef

File tree

6 files changed

+43
-41
lines changed

6 files changed

+43
-41
lines changed

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/dialect/SqlServerChunkSplitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
274274
while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
275275
// we start from [null, min + chunk_size) and avoid [null, min)
276276
splits.add(ChunkRange.of(chunkStart, chunkEnd));
277-
// may sleep a while to avoid DDOS on SqlServer server
277+
// may sleep awhile to avoid DDOS on SqlServer server
278278
maySleep(count++, tableId);
279279
chunkStart = chunkEnd;
280280
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnOffset.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,8 @@ public LsnOffset(Lsn scn, Lsn commitScn, Long eventSerialNo) {
4747
this.offset = offsetMap;
4848
}
4949

50-
public LsnOffset(Lsn lsn) {
51-
this(lsn, null, null);
52-
}
53-
54-
public Lsn getLcn() {
55-
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
50+
public LsnOffset(Lsn changeLsn) {
51+
this(changeLsn, null, null);
5652
}
5753

5854
@Override
@@ -69,8 +65,8 @@ public int compareTo(Offset offset) {
6965
return -1;
7066
}
7167

72-
Lsn lsn = this.getLcn();
73-
Lsn targetLsn = that.getLcn();
68+
Lsn lsn = Lsn.valueOf(this.offset.get(SourceInfo.COMMIT_LSN_KEY));
69+
Lsn targetLsn = Lsn.valueOf(that.offset.get(SourceInfo.COMMIT_LSN_KEY));
7470
if (targetLsn.isAvailable()) {
7571
if (lsn.isAvailable()) {
7672
return lsn.compareTo(targetLsn);

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
2424
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
2525
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
26-
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.LsnSplitReadTask;
26+
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.StreamSplitReadTask;
2727
import io.debezium.DebeziumException;
2828
import io.debezium.config.Configuration;
2929
import io.debezium.connector.sqlserver.SqlServerConnection;
@@ -118,7 +118,7 @@ public void execute(Context context) throws Exception {
118118
final SqlServerOffsetContext streamOffsetContext =
119119
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
120120

121-
final LsnSplitReadTask backfillBinlogReadTask =
121+
final StreamSplitReadTask backfillBinlogReadTask =
122122
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
123123
backfillBinlogReadTask.execute(
124124
new SnapshotBinlogSplitChangeEventSourceContext(),
@@ -154,21 +154,27 @@ private void dispatchLsnEndEvent(
154154
WatermarkKind.END);
155155
}
156156

157-
private LsnSplitReadTask createBackFillLsnSplitReadTask(
157+
private StreamSplitReadTask createBackFillLsnSplitReadTask(
158158
StreamSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
159159
// we should only capture events for the current table,
160160
// otherwise, we may can't find corresponding schema
161161
Configuration dezConf =
162-
context.getSourceConfig()
163-
.getDbzConfiguration()
162+
context.getDbzConnectorConfig()
163+
.getConfig()
164164
.edit()
165-
.with("table.include.list", split.getTableId().toString())
165+
// table.include.list is schema.table format
166+
.with(
167+
"table.include.list",
168+
new TableId(
169+
null,
170+
split.getTableId().schema(),
171+
split.getTableId().table()))
166172
// Disable heartbeat event in snapshot split fetcher
167173
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
168174
.build();
169175
// task to read binlog and backfill for current split
170-
return new LsnSplitReadTask(
171-
context.getDbzConnectorConfig(),
176+
return new StreamSplitReadTask(
177+
new SqlServerConnectorConfig(dezConf),
172178
context.getConnection(),
173179
context.getMetaDataConnection(),
174180
context.getDispatcher(),

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
7777
* A separate connection for retrieving details of the schema changes; without it, adaptive
7878
* buffering will not work.
7979
*
80-
* @link
81-
* https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering
80+
* <p>For more details, please refer to <a
81+
* href="https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering">guidelines-for-using-adaptive-buffering</a>
8282
*/
8383
private final SqlServerConnection metaDataConnection;
8484

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;
1818

1919
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
20+
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
2021
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
2122
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
2223
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
2324
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
2425
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
2526
import io.debezium.DebeziumException;
27+
import io.debezium.connector.sqlserver.Lsn;
2628
import io.debezium.connector.sqlserver.SqlServerConnection;
2729
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
2830
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
@@ -36,14 +38,13 @@
3638
import org.slf4j.LoggerFactory;
3739

3840
import static com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
39-
import static com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerUtils.getLsnPosition;
4041

4142
/** The task to work for fetching data of SqlServer table stream split . */
4243
public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
4344

4445
private final StreamSplit split;
4546
private volatile boolean taskRunning = false;
46-
private LsnSplitReadTask redoLogSplitReadTask;
47+
private StreamSplitReadTask redoLogSplitReadTask;
4748

4849
public SqlServerStreamFetchTask(StreamSplit split) {
4950
this.split = split;
@@ -56,7 +57,7 @@ public void execute(Context context) throws Exception {
5657
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
5758
taskRunning = true;
5859
redoLogSplitReadTask =
59-
new LsnSplitReadTask(
60+
new StreamSplitReadTask(
6061
sourceFetchContext.getDbzConnectorConfig(),
6162
sourceFetchContext.getConnection(),
6263
sourceFetchContext.getMetaDataConnection(),
@@ -91,15 +92,15 @@ public void close() {
9192
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
9293
* to highWatermark) binlog.
9394
*/
94-
public static class LsnSplitReadTask extends SqlServerStreamingChangeEventSource {
95+
public static class StreamSplitReadTask extends SqlServerStreamingChangeEventSource {
9596

96-
private static final Logger LOG = LoggerFactory.getLogger(LsnSplitReadTask.class);
97+
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
9798
private final StreamSplit lsnSplit;
9899
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
99100
private final ErrorHandler errorHandler;
100101
private ChangeEventSourceContext context;
101102

102-
public LsnSplitReadTask(
103+
public StreamSplitReadTask(
103104
SqlServerConnectorConfig connectorConfig,
104105
SqlServerConnection connection,
105106
SqlServerConnection metadataConnection,
@@ -121,26 +122,25 @@ public LsnSplitReadTask(
121122
}
122123

123124
@Override
124-
public void afterHandleLsn(
125-
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
125+
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
126126
// check do we need to stop for fetch binlog for snapshot split.
127127
if (isBoundedRead()) {
128-
final LsnOffset currentRedoLogOffset = getLsnPosition(offsetContext.getOffset());
129-
// reach the high watermark, the binlog fetcher should be finished
130-
if (currentRedoLogOffset.isAtOrAfter(lsnSplit.getEndingOffset())) {
131-
// send binlog end event
128+
LsnOffset currentLsnOffset = new LsnOffset(null, toLsn, null);
129+
Offset endingOffset = lsnSplit.getEndingOffset();
130+
if (currentLsnOffset.isAtOrAfter(endingOffset)) {
131+
// send streaming end event
132132
try {
133133
dispatcher.dispatchWatermarkEvent(
134134
partition.getSourcePartition(),
135135
lsnSplit,
136-
currentRedoLogOffset,
136+
currentLsnOffset,
137137
WatermarkKind.END);
138138
} catch (InterruptedException e) {
139139
LOG.error("Send signal event error.", e);
140140
errorHandler.setProducerThrowable(
141141
new DebeziumException("Error processing binlog signal event", e));
142142
}
143-
// tell fetcher the binlog task finished
143+
// tell fetcher the streaming task finished
144144
((SqlServerScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
145145
.finished();
146146
}

flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838

3939
/**
4040
* Copied from Debezium project(1.9.7.final) to add method {@link
41-
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
42-
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
43-
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
44-
* Server change data capture functionality. A main loop polls database DDL change and change data
45-
* tables and turns them into change events.
41+
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
42+
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
43+
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
44+
* capture functionality. A main loop polls database DDL change and change data tables and turns
45+
* them into change events.
4646
*
4747
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
4848
* monitors source table and write changes from the table into the change table.
@@ -198,7 +198,6 @@ public boolean executeIteration(
198198

199199
if (context.isRunning()) {
200200
commitTransaction();
201-
afterHandleLsn(partition, offsetContext);
202201
final Lsn toLsn =
203202
getToLsn(
204203
dataConnection,
@@ -449,6 +448,8 @@ public boolean executeIteration(
449448
TxLogPosition.valueOf(toLsn));
450449
// Terminate the transaction otherwise CDC could not be disabled for tables
451450
dataConnection.rollback();
451+
// Determine whether to continue streaming in sqlserver cdc snapshot phase
452+
afterHandleLsn(partition, toLsn);
452453
} catch (SQLException e) {
453454
tablesSlot.set(
454455
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
@@ -625,8 +626,7 @@ private Lsn getToLsn(
625626
}
626627

627628
/** expose control to the user to stop the connector. */
628-
protected void afterHandleLsn(
629-
SqlServerPartition partition, SqlServerOffsetContext offsetContext) {
629+
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
630630
// do nothing
631631
}
632632
}

0 commit comments

Comments
 (0)