Skip to content

Commit c94f509

Browse files
committed
fix review issue
1 parent ea94851 commit c94f509

File tree

3 files changed

+18
-2
lines changed

3 files changed

+18
-2
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
8080
private final Set<TableId> pureBinlogPhaseTables;
8181
private Tables.TableFilter capturedTableFilter;
8282

83+
private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5;
84+
8385
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
8486
this.statefulTaskContext = statefulTaskContext;
8587
ThreadFactory threadFactory =
@@ -181,7 +183,12 @@ public void close() {
181183
// while loop in MySqlStreamingChangeEventSource's execute method
182184
currentTaskRunning = false;
183185
executor.shutdown();
184-
executor.awaitTermination(5, TimeUnit.SECONDS);
186+
boolean isShutdown =
187+
executor.awaitTermination(
188+
EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
189+
if (!isShutdown) {
190+
LOG.warn("The thread executor of BinlogSplitReader wasn't shutdown properly.");
191+
}
185192
} catch (Exception e) {
186193
LOG.error("Close binlog reader error", e);
187194
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
8787
public AtomicBoolean hasNextElement;
8888
public AtomicBoolean reachEnd;
8989

90+
private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5;
91+
9092
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
9193
this.statefulTaskContext = statefulTaskContext;
9294
ThreadFactory threadFactory =
@@ -330,7 +332,12 @@ public void close() {
330332
statefulTaskContext.getBinaryLogClient().disconnect();
331333
}
332334
executor.shutdown();
333-
executor.awaitTermination(5, TimeUnit.SECONDS);
335+
boolean isShutdown =
336+
executor.awaitTermination(
337+
EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
338+
if (!isShutdown) {
339+
LOG.warn("The thread executor of SnapshotSplitReader wasn't shutdown properly.");
340+
}
334341
} catch (Exception e) {
335342
LOG.error("Close snapshot reader error", e);
336343
}

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,8 @@ private List<String> readBinlogSplits(
443443
}
444444
snapshotSplitReader.close();
445445

446+
assertExecutorIsTerminated(snapshotSplitReader);
447+
446448
// step-2: create binlog split according the finished snapshot splits
447449
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
448450
getFinishedSplitsInfo(sqlSplits, snapshotRecords);

0 commit comments

Comments
 (0)