Skip to content

Commit c627e37

Browse files
ruanhang1993zhongqishang
authored andcommitted
[mysql] skip closing reader when the reader received the binlog split (apache#2261)
1 parent 874b1f9 commit c627e37

File tree

5 files changed

+128
-6
lines changed

5 files changed

+128
-6
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ public void close() {
189189
snapshotSplitAssigner.close();
190190
}
191191

192+
public boolean noMoreSnapshotSplits() {
193+
return snapshotSplitAssigner.noMoreSnapshotSplits();
194+
}
195+
192196
// --------------------------------------------------------------------------------------------
193197

194198
private MySqlBinlogSplit createBinlogSplit() {

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424

2525
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
2626

27+
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
2728
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner;
2829
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSplitAssigner;
2930
import com.ververica.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
3031
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
32+
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
3133
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
3234
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
3335
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
@@ -39,6 +41,7 @@
3941
import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
4042
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
4143
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
44+
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
4245
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
4346
import org.slf4j.Logger;
4447
import org.slf4j.LoggerFactory;
@@ -72,6 +75,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
7275
private final TreeSet<Integer> readersAwaitingSplit;
7376
private List<List<FinishedSnapshotSplitInfo>> binlogSplitMeta;
7477

78+
@Nullable private Integer binlogSplitTaskId;
79+
7580
public MySqlSourceEnumerator(
7681
SplitEnumeratorContext<MySqlSplit> context,
7782
MySqlSourceConfig sourceConfig,
@@ -107,6 +112,12 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
107112
@Override
108113
public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
109114
LOG.debug("The enumerator adds splits back: {}", splits);
115+
Optional<MySqlSplit> binlogSplit =
116+
splits.stream().filter(MySqlSplit::isBinlogSplit).findAny();
117+
if (binlogSplit.isPresent()) {
118+
LOG.info("The enumerator adds add binlog split back: {}", binlogSplit);
119+
this.binlogSplitTaskId = null;
120+
}
110121
splitAssigner.addSplits(splits);
111122
}
112123

@@ -153,6 +164,11 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
153164
"The enumerator receives request from subtask {} for the latest finished splits number after added newly tables. ",
154165
subtaskId);
155166
handleLatestFinishedSplitNumberRequest(subtaskId);
167+
} else if (sourceEvent instanceof BinlogSplitAssignedEvent) {
168+
LOG.info(
169+
"The enumerator receives notice from subtask {} for the binlog split assignment. ",
170+
subtaskId);
171+
binlogSplitTaskId = subtaskId;
156172
}
157173
}
158174

@@ -188,7 +204,10 @@ private void assignSplits() {
188204
continue;
189205
}
190206

191-
if (splitAssigner.isStreamSplitAssigned() && sourceConfig.isCloseIdleReaders()) {
207+
if (splitAssigner.isStreamSplitAssigned()
208+
&& sourceConfig.isCloseIdleReaders()
209+
&& noMoreSnapshotSplits()
210+
&& (binlogSplitTaskId != null && !binlogSplitTaskId.equals(nextAwaiting))) {
192211
// close idle readers when snapshot phase finished.
193212
context.signalNoMoreSplits(nextAwaiting);
194213
awaitingReader.remove();
@@ -200,6 +219,9 @@ private void assignSplits() {
200219
if (split.isPresent()) {
201220
final MySqlSplit mySqlSplit = split.get();
202221
context.assignSplit(mySqlSplit, nextAwaiting);
222+
if (mySqlSplit instanceof MySqlBinlogSplit) {
223+
this.binlogSplitTaskId = nextAwaiting;
224+
}
203225
awaitingReader.remove();
204226
LOG.info("The enumerator assigns split {} to subtask {}", mySqlSplit, nextAwaiting);
205227
} else {
@@ -210,6 +232,16 @@ private void assignSplits() {
210232
}
211233
}
212234

235+
private boolean noMoreSnapshotSplits() {
236+
if (splitAssigner instanceof MySqlHybridSplitAssigner) {
237+
return ((MySqlHybridSplitAssigner) splitAssigner).noMoreSnapshotSplits();
238+
} else if (splitAssigner instanceof MySqlBinlogSplitAssigner) {
239+
return true;
240+
}
241+
throw new IllegalStateException(
242+
"Unexpected subtype of MySqlSplitAssigner class when invoking noMoreSnapshotSplits.");
243+
}
244+
213245
private int[] getRegisteredReader() {
214246
return this.context.registeredReaders().keySet().stream()
215247
.mapToInt(Integer::intValue)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2023 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.mysql.source.events;
18+
19+
import org.apache.flink.api.connector.source.SourceEvent;
20+
21+
import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator;
22+
import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader;
23+
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
24+
25+
/**
26+
* The {@link SourceEvent} that {@link MySqlSourceReader} sends to {@link MySqlSourceEnumerator} to
27+
* notify the {@link MySqlBinlogSplit} assigned to itself.
28+
*/
29+
public class BinlogSplitAssignedEvent implements SourceEvent {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
public BinlogSplitAssignedEvent() {}
34+
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
3030
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
31+
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitAssignedEvent;
3132
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent;
3233
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent;
3334
import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitUpdateAckEvent;
@@ -270,6 +271,9 @@ private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlo
270271
binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
271272
unfinishedSplits.add(mySqlBinlogSplit);
272273
}
274+
LOG.info(
275+
"Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
276+
context.sendSourceEventToCoordinator(new BinlogSplitAssignedEvent());
273277
}
274278
}
275279
// notify split enumerator again about the finished unacked snapshot splits

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/NewlyAddedTableITCase.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,22 @@ public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlog() throws Ex
172172
"address_shanghai");
173173
}
174174

175+
@Test
176+
public void testNewlyAddedTableForExistsPipelineTwiceWithAheadBinlogAndAutoCloseReader()
177+
throws Exception {
178+
Map<String, String> otherOptions = new HashMap<>();
179+
otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
180+
testNewlyAddedTableOneByOne(
181+
DEFAULT_PARALLELISM,
182+
otherOptions,
183+
FailoverType.NONE,
184+
FailoverPhase.NEVER,
185+
true,
186+
"address_hangzhou",
187+
"address_beijing",
188+
"address_shanghai");
189+
}
190+
175191
@Test
176192
public void testNewlyAddedTableForExistsPipelineThrice() throws Exception {
177193
testNewlyAddedTableOneByOne(
@@ -588,7 +604,8 @@ private void testRemoveTablesOneByOne(
588604
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
589605
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
590606

591-
String createTableStatement = getCreateTableStatement(captureAddressTables);
607+
String createTableStatement =
608+
getCreateTableStatement(new HashMap<>(), captureAddressTables);
592609
tEnv.executeSql(createTableStatement);
593610
tEnv.executeSql(
594611
"CREATE TABLE sink ("
@@ -630,7 +647,8 @@ private void testRemoveTablesOneByOne(
630647
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
631648
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
632649

633-
String createTableStatement = getCreateTableStatement(captureTablesThisRound);
650+
String createTableStatement =
651+
getCreateTableStatement(new HashMap<>(), captureTablesThisRound);
634652
tEnv.executeSql(createTableStatement);
635653
tEnv.executeSql(
636654
"CREATE TABLE sink ("
@@ -703,6 +721,23 @@ private void testNewlyAddedTableOneByOne(
703721
boolean makeBinlogBeforeCapture,
704722
String... captureAddressTables)
705723
throws Exception {
724+
testNewlyAddedTableOneByOne(
725+
parallelism,
726+
new HashMap<>(),
727+
failoverType,
728+
failoverPhase,
729+
makeBinlogBeforeCapture,
730+
captureAddressTables);
731+
}
732+
733+
private void testNewlyAddedTableOneByOne(
734+
int parallelism,
735+
Map<String, String> sourceOptions,
736+
FailoverType failoverType,
737+
FailoverPhase failoverPhase,
738+
boolean makeBinlogBeforeCapture,
739+
String... captureAddressTables)
740+
throws Exception {
706741

707742
// step 1: create mysql tables with initial data
708743
initialAddressTables(getConnection(), captureAddressTables);
@@ -727,7 +762,8 @@ private void testNewlyAddedTableOneByOne(
727762
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
728763
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
729764

730-
String createTableStatement = getCreateTableStatement(captureTablesThisRound);
765+
String createTableStatement =
766+
getCreateTableStatement(sourceOptions, captureTablesThisRound);
731767
tEnv.executeSql(createTableStatement);
732768
tEnv.executeSql(
733769
"CREATE TABLE sink ("
@@ -836,7 +872,8 @@ private void testNewlyAddedTableOneByOne(
836872
}
837873
}
838874

839-
private String getCreateTableStatement(String... captureTableNames) {
875+
private String getCreateTableStatement(
876+
Map<String, String> otherOptions, String... captureTableNames) {
840877
return format(
841878
"CREATE TABLE address ("
842879
+ " table_name STRING METADATA VIRTUAL,"
@@ -858,14 +895,25 @@ private String getCreateTableStatement(String... captureTableNames) {
858895
+ " 'server-time-zone' = 'UTC',"
859896
+ " 'server-id' = '%s',"
860897
+ " 'scan.newly-added-table.enabled' = 'true'"
898+
+ " %s"
861899
+ ")",
862900
MYSQL_CONTAINER.getHost(),
863901
MYSQL_CONTAINER.getDatabasePort(),
864902
customDatabase.getUsername(),
865903
customDatabase.getPassword(),
866904
customDatabase.getDatabaseName(),
867905
getTableNameRegex(captureTableNames),
868-
getServerId());
906+
getServerId(),
907+
otherOptions.isEmpty()
908+
? ""
909+
: ","
910+
+ otherOptions.entrySet().stream()
911+
.map(
912+
e ->
913+
String.format(
914+
"'%s'='%s'",
915+
e.getKey(), e.getValue()))
916+
.collect(Collectors.joining(",")));
869917
}
870918

871919
private StreamExecutionEnvironment getStreamExecutionEnvironment(

0 commit comments

Comments
 (0)