Skip to content

Commit c96926b

Browse files
authored
[mysql] Release the debezium reader thread resources after reading finished (#1358)
This closes #1350.
1 parent 2de4903 commit c96926b

File tree

4 files changed

+60
-6
lines changed

4 files changed

+60
-6
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.ververica.cdc.connectors.mysql.debezium.reader;
1818

19+
import org.apache.flink.annotation.VisibleForTesting;
1920
import org.apache.flink.table.types.logical.RowType;
2021
import org.apache.flink.util.FlinkRuntimeException;
2122

@@ -51,6 +52,7 @@
5152
import java.util.concurrent.ExecutorService;
5253
import java.util.concurrent.Executors;
5354
import java.util.concurrent.ThreadFactory;
55+
import java.util.concurrent.TimeUnit;
5456

5557
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
5658
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
@@ -65,7 +67,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
6567

6668
private static final Logger LOG = LoggerFactory.getLogger(BinlogSplitReader.class);
6769
private final StatefulTaskContext statefulTaskContext;
68-
private final ExecutorService executor;
70+
private final ExecutorService executorService;
6971

7072
private volatile ChangeEventQueue<DataChangeEvent> queue;
7173
private volatile boolean currentTaskRunning;
@@ -79,11 +81,13 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
7981
private final Set<TableId> pureBinlogPhaseTables;
8082
private Tables.TableFilter capturedTableFilter;
8183

84+
private static final long READER_CLOSE_TIMEOUT = 30L;
85+
8286
public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
8387
this.statefulTaskContext = statefulTaskContext;
8488
ThreadFactory threadFactory =
8589
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
86-
this.executor = Executors.newSingleThreadExecutor(threadFactory);
90+
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
8791
this.currentTaskRunning = true;
8892
this.pureBinlogPhaseTables = new HashSet<>();
8993
}
@@ -108,7 +112,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
108112
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
109113
currentBinlogSplit);
110114

111-
executor.submit(
115+
executorService.submit(
112116
() -> {
113117
try {
114118
binlogSplitReadTask.execute(
@@ -176,6 +180,17 @@ public void close() {
176180
if (statefulTaskContext.getBinaryLogClient() != null) {
177181
statefulTaskContext.getBinaryLogClient().disconnect();
178182
}
183+
// set currentTaskRunning to false to terminate the
184+
// while loop in MySqlStreamingChangeEventSource's execute method
185+
currentTaskRunning = false;
186+
if (executorService != null) {
187+
executorService.shutdown();
188+
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
189+
LOG.warn(
190+
"Failed to close the binlog split reader in {} seconds.",
191+
READER_CLOSE_TIMEOUT);
192+
}
193+
}
179194
} catch (Exception e) {
180195
LOG.error("Close binlog reader error", e);
181196
}
@@ -284,4 +299,9 @@ private void configureFilter() {
284299
public void stopBinlogReadTask() {
285300
this.currentTaskRunning = false;
286301
}
302+
303+
@VisibleForTesting
304+
public ExecutorService getExecutorService() {
305+
return executorService;
306+
}
287307
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.ververica.cdc.connectors.mysql.debezium.reader;
1818

19+
import org.apache.flink.annotation.VisibleForTesting;
1920
import org.apache.flink.util.FlinkRuntimeException;
2021

2122
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -54,6 +55,7 @@
5455
import java.util.concurrent.ExecutorService;
5556
import java.util.concurrent.Executors;
5657
import java.util.concurrent.ThreadFactory;
58+
import java.util.concurrent.TimeUnit;
5759
import java.util.concurrent.atomic.AtomicBoolean;
5860

5961
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp;
@@ -73,7 +75,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
7375

7476
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
7577
private final StatefulTaskContext statefulTaskContext;
76-
private final ExecutorService executor;
78+
private final ExecutorService executorService;
7779

7880
private volatile ChangeEventQueue<DataChangeEvent> queue;
7981
private volatile boolean currentTaskRunning;
@@ -86,11 +88,13 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
8688
public AtomicBoolean hasNextElement;
8789
public AtomicBoolean reachEnd;
8890

91+
private static final long READER_CLOSE_TIMEOUT = 30L;
92+
8993
public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
9094
this.statefulTaskContext = statefulTaskContext;
9195
ThreadFactory threadFactory =
9296
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subtaskId).build();
93-
this.executor = Executors.newSingleThreadExecutor(threadFactory);
97+
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
9498
this.currentTaskRunning = false;
9599
this.hasNextElement = new AtomicBoolean(false);
96100
this.reachEnd = new AtomicBoolean(false);
@@ -114,7 +118,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
114118
statefulTaskContext.getSnapshotReceiver(),
115119
StatefulTaskContext.getClock(),
116120
currentSnapshotSplit);
117-
executor.submit(
121+
executorService.submit(
118122
() -> {
119123
try {
120124
currentTaskRunning = true;
@@ -328,11 +332,24 @@ public void close() {
328332
if (statefulTaskContext.getBinaryLogClient() != null) {
329333
statefulTaskContext.getBinaryLogClient().disconnect();
330334
}
335+
if (executorService != null) {
336+
executorService.shutdown();
337+
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
338+
LOG.warn(
339+
"Failed to close the snapshot split reader in {} seconds.",
340+
READER_CLOSE_TIMEOUT);
341+
}
342+
}
331343
} catch (Exception e) {
332344
LOG.error("Close snapshot reader error", e);
333345
}
334346
}
335347

348+
@VisibleForTesting
349+
public ExecutorService getExecutorService() {
350+
return executorService;
351+
}
352+
336353
/**
337354
* {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high
338355
* watermark for each {@link MySqlSnapshotSplit}.

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
6666
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
6767
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
68+
import static org.junit.Assert.assertNotNull;
69+
import static org.junit.Assert.assertTrue;
6870

6971
/** Tests for {@link BinlogSplitReader}. */
7072
public class BinlogSplitReaderTest extends MySqlSourceTestBase {
@@ -441,6 +443,10 @@ private List<String> readBinlogSplits(
441443
}
442444
}
443445
}
446+
snapshotSplitReader.close();
447+
448+
assertNotNull(snapshotSplitReader.getExecutorService());
449+
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
444450

445451
// step-2: create binlog split according the finished snapshot splits
446452
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
@@ -483,6 +489,11 @@ private List<String> readBinlogSplits(
483489
pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord),
484490
dataType));
485491
}
492+
binlogReader.close();
493+
494+
assertNotNull(snapshotSplitReader.getExecutorService());
495+
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
496+
486497
return actual;
487498
}
488499

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.function.Supplier;
5454
import java.util.stream.Collectors;
5555

56+
import static org.junit.Assert.assertNotNull;
5657
import static org.junit.Assert.assertTrue;
5758
import static org.junit.Assert.fail;
5859

@@ -411,6 +412,11 @@ private List<String> readTableSnapshotSplits(
411412
if (binaryLogClient != null) {
412413
binaryLogClient.disconnect();
413414
}
415+
snapshotSplitReader.close();
416+
417+
assertNotNull(snapshotSplitReader.getExecutorService());
418+
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());
419+
414420
return formatResult(result, dataType);
415421
}
416422

0 commit comments

Comments
 (0)