Skip to content

Commit 65f00a9

Browse files
committed
[sqlserver] Fix backfill stream task hang
1 parent d8470dc commit 65f00a9

File tree

5 files changed

+49
-29
lines changed

5 files changed

+49
-29
lines changed

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/offset/LsnOffset.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public class LsnOffset extends Offset {
3131
public static final LsnOffset NO_STOPPING_OFFSET =
3232
new LsnOffset(Lsn.valueOf(new byte[] {Byte.MAX_VALUE}));
3333

34+
LsnOffset(Map<String, String> offset) {
35+
this.offset = offset;
36+
}
37+
3438
public LsnOffset(Lsn scn, Lsn commitScn, Long eventSerialNo) {
3539
Map<String, String> offsetMap = new HashMap<>();
3640

@@ -47,12 +51,17 @@ public LsnOffset(Lsn scn, Lsn commitScn, Long eventSerialNo) {
4751
this.offset = offsetMap;
4852
}
4953

50-
public LsnOffset(Lsn lsn) {
51-
this(lsn, null, null);
54+
public LsnOffset(Lsn changeLsn) {
55+
this(changeLsn, null, null);
5256
}
5357

54-
public Lsn getLcn() {
55-
return Lsn.valueOf(offset.get(SourceInfo.COMMIT_LSN_KEY));
58+
public static LsnOffset of(Map<String, ?> offsetMap) {
59+
Map<String, String> offsetStrMap = new HashMap<>();
60+
for (Map.Entry<String, ?> entry : offsetMap.entrySet()) {
61+
offsetStrMap.put(
62+
entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
63+
}
64+
return new LsnOffset(offsetStrMap);
5665
}
5766

5867
@Override
@@ -69,8 +78,8 @@ public int compareTo(Offset offset) {
6978
return -1;
7079
}
7180

72-
Lsn lsn = this.getLcn();
73-
Lsn targetLsn = that.getLcn();
81+
Lsn lsn = Lsn.valueOf(this.offset.get(SourceInfo.COMMIT_LSN_KEY));
82+
Lsn targetLsn = Lsn.valueOf(that.offset.get(SourceInfo.COMMIT_LSN_KEY));
7483
if (targetLsn.isAvailable()) {
7584
if (lsn.isAvailable()) {
7685
return lsn.compareTo(targetLsn);

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
2424
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
2525
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
26-
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.LsnSplitReadTask;
26+
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.StreamSplitReadTask;
2727
import io.debezium.DebeziumException;
2828
import io.debezium.config.Configuration;
2929
import io.debezium.connector.sqlserver.SqlServerConnection;
@@ -118,7 +118,7 @@ public void execute(Context context) throws Exception {
118118
final SqlServerOffsetContext streamOffsetContext =
119119
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
120120

121-
final LsnSplitReadTask backfillBinlogReadTask =
121+
final StreamSplitReadTask backfillBinlogReadTask =
122122
createBackFillLsnSplitReadTask(backfillBinlogSplit, sourceFetchContext);
123123
backfillBinlogReadTask.execute(
124124
new SnapshotBinlogSplitChangeEventSourceContext(),
@@ -154,7 +154,7 @@ private void dispatchLsnEndEvent(
154154
WatermarkKind.END);
155155
}
156156

157-
private LsnSplitReadTask createBackFillLsnSplitReadTask(
157+
private StreamSplitReadTask createBackFillLsnSplitReadTask(
158158
StreamSplit backfillBinlogSplit, SqlServerSourceFetchTaskContext context) {
159159
// we should only capture events for the current table,
160160
// otherwise, we may can't find corresponding schema
@@ -173,7 +173,7 @@ private LsnSplitReadTask createBackFillLsnSplitReadTask(
173173
.with(Heartbeat.HEARTBEAT_INTERVAL, 0)
174174
.build();
175175
// task to read binlog and backfill for current split
176-
return new LsnSplitReadTask(
176+
return new StreamSplitReadTask(
177177
new SqlServerConnectorConfig(dezConf),
178178
context.getConnection(),
179179
context.getMetaDataConnection(),

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerSourceFetchTaskContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext
7777
* A separate connection for retrieving details of the schema changes; without it, adaptive
7878
* buffering will not work.
7979
*
80-
* @link
81-
* https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering
80+
* <p>For more details, please refer to <a
81+
* href="https://docs.microsoft.com/en-us/sql/connect/jdbc/using-adaptive-buffering?view=sql-server-2017#guidelines-for-using-adaptive-buffering">guidelines-for-using-adaptive-buffering</a>
8282
*/
8383
private final SqlServerConnection metaDataConnection;
8484

flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package com.ververica.cdc.connectors.sqlserver.source.reader.fetch;
1818

1919
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
20-
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
2120
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
2221
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
2322
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
2423
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
2524
import com.ververica.cdc.connectors.sqlserver.source.offset.LsnOffset;
2625
import io.debezium.DebeziumException;
2726
import io.debezium.connector.sqlserver.Lsn;
27+
import io.debezium.connector.sqlserver.SourceInfo;
2828
import io.debezium.connector.sqlserver.SqlServerConnection;
2929
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
3030
import io.debezium.connector.sqlserver.SqlServerDatabaseSchema;
@@ -44,7 +44,7 @@ public class SqlServerStreamFetchTask implements FetchTask<SourceSplitBase> {
4444

4545
private final StreamSplit split;
4646
private volatile boolean taskRunning = false;
47-
private LsnSplitReadTask redoLogSplitReadTask;
47+
private StreamSplitReadTask redoLogSplitReadTask;
4848

4949
public SqlServerStreamFetchTask(StreamSplit split) {
5050
this.split = split;
@@ -57,7 +57,7 @@ public void execute(Context context) throws Exception {
5757
sourceFetchContext.getOffsetContext().preSnapshotCompletion();
5858
taskRunning = true;
5959
redoLogSplitReadTask =
60-
new LsnSplitReadTask(
60+
new StreamSplitReadTask(
6161
sourceFetchContext.getDbzConnectorConfig(),
6262
sourceFetchContext.getConnection(),
6363
sourceFetchContext.getMetaDataConnection(),
@@ -92,15 +92,15 @@ public void close() {
9292
* A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark
9393
* to highWatermark) binlog.
9494
*/
95-
public static class LsnSplitReadTask extends SqlServerStreamingChangeEventSource {
95+
public static class StreamSplitReadTask extends SqlServerStreamingChangeEventSource {
9696

97-
private static final Logger LOG = LoggerFactory.getLogger(LsnSplitReadTask.class);
97+
private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
9898
private final StreamSplit lsnSplit;
9999
private final JdbcSourceEventDispatcher<SqlServerPartition> dispatcher;
100100
private final ErrorHandler errorHandler;
101101
private ChangeEventSourceContext context;
102102

103-
public LsnSplitReadTask(
103+
public StreamSplitReadTask(
104104
SqlServerConnectorConfig connectorConfig,
105105
SqlServerConnection connection,
106106
SqlServerConnection metadataConnection,
@@ -122,17 +122,26 @@ public LsnSplitReadTask(
122122
}
123123

124124
@Override
125-
public void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
125+
public void afterHandleLsn(
126+
SqlServerPartition partition, SqlServerOffsetContext sqlServerOffsetContext) {
126127
// check do we need to stop for fetch binlog for snapshot split.
127128
if (isBoundedRead()) {
128-
Offset endingOffset = lsnSplit.getEndingOffset();
129-
if (toLsn.compareTo(((LsnOffset) endingOffset).getLcn()) >= 0) {
129+
130+
if (sqlServerOffsetContext
131+
.getChangePosition()
132+
.getCommitLsn()
133+
.compareTo(
134+
Lsn.valueOf(
135+
lsnSplit.getEndingOffset()
136+
.getOffset()
137+
.get(SourceInfo.COMMIT_LSN_KEY)))
138+
>= 0) {
130139
// send binlog end event
131140
try {
132141
dispatcher.dispatchWatermarkEvent(
133142
partition.getSourcePartition(),
134143
lsnSplit,
135-
endingOffset,
144+
LsnOffset.of(sqlServerOffsetContext.getOffset()),
136145
WatermarkKind.END);
137146
} catch (InterruptedException e) {
138147
LOG.error("Send signal event error.", e);

flink-connector-sqlserver-cdc/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@
3838

3939
/**
4040
* Copied from Debezium project(1.9.7.final) to add method {@link
41-
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, Lsn)}. Also implemented
42-
* {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext, SqlServerPartition,
43-
* SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL Server change data
44-
* capture functionality. A main loop polls database DDL change and change data tables and turns
45-
* them into change events.
41+
* SqlServerStreamingChangeEventSource#afterHandleLsn(SqlServerPartition, SqlServerOffsetContext)}.
42+
* Also implemented {@link SqlServerStreamingChangeEventSource#execute( ChangeEventSourceContext,
43+
* SqlServerPartition, SqlServerOffsetContext)}. A {@link StreamingChangeEventSource} based on SQL
44+
* Server change data capture functionality. A main loop polls database DDL change and change data
45+
* tables and turns them into change events.
4646
*
4747
* <p>The connector uses CDC functionality of SQL Server that is implemented as as a process that
4848
* monitors source table and write changes from the table into the change table.
@@ -444,12 +444,13 @@ public boolean executeIteration(
444444
tableWithSmallestLsn.next();
445445
}
446446
// call after handle to judge whether to complete the stream
447-
afterHandleLsn(partition, toLsn);
448447
});
449448
streamingExecutionContext.setLastProcessedPosition(
450449
TxLogPosition.valueOf(toLsn));
451450
// Terminate the transaction otherwise CDC could not be disabled for tables
452451
dataConnection.rollback();
452+
// Determine whether to continue streaming in sqlserver cdc snapshot phase
453+
afterHandleLsn(partition, offsetContext);
453454
} catch (SQLException e) {
454455
tablesSlot.set(
455456
processErrorFromChangeTableQuery(databaseName, e, tablesSlot.get()));
@@ -626,7 +627,8 @@ private Lsn getToLsn(
626627
}
627628

628629
/** expose control to the user to stop the connector. */
629-
protected void afterHandleLsn(SqlServerPartition partition, Lsn toLsn) {
630+
protected void afterHandleLsn(
631+
SqlServerPartition partition, SqlServerOffsetContext sqlServerOffsetContext) {
630632
// do nothing
631633
}
632634
}

0 commit comments

Comments
 (0)