Skip to content

Commit b61c748

Browse files
committed
[mysql] Avoid duplicate split requests when add new table.(#1156)
This closes #1149.
1 parent 2a9c473 commit b61c748

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import javax.annotation.Nullable;
5050

5151
import java.util.ArrayList;
52-
import java.util.HashMap;
5352
import java.util.Iterator;
53+
import java.util.LinkedHashMap;
5454
import java.util.List;
5555
import java.util.Map;
5656
import java.util.concurrent.ExecutorService;
@@ -247,7 +247,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
247247
boolean reachBinlogEnd = false;
248248
SourceRecord lowWatermark = null;
249249
SourceRecord highWatermark = null;
250-
Map<Struct, SourceRecord> snapshotRecords = new HashMap<>();
250+
Map<Struct, SourceRecord> snapshotRecords = new LinkedHashMap<>();
251251
while (!reachBinlogEnd) {
252252
checkReadException();
253253
List<DataChangeEvent> batch = queue.poll();

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
import java.util.function.Supplier;
6464
import java.util.stream.Collectors;
6565

66-
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.BINLOG_READER;
66+
import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER;
6767
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit;
6868
import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit;
6969
import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId;
@@ -144,6 +144,7 @@ public List<MySqlSplit> snapshotState(long checkpointId) {
144144

145145
@Override
146146
protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
147+
boolean requestNextSplit = true;
147148
for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
148149
MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
149150
if (mySqlSplit.isBinlogSplit()) {
@@ -154,12 +155,17 @@ protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
154155
mySqlSourceReaderContext.resetStopBinlogSplitReader();
155156
suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit());
156157
context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent());
158+
// do not request next split when the reader is suspended, the suspended reader will
159+
// automatically request the next split after it has been wakeup
160+
requestNextSplit = false;
157161
} else {
158162
finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
159163
}
160164
}
161165
reportFinishedSnapshotSplitsIfNeed();
162-
context.sendSplitRequest();
166+
if (requestNextSplit) {
167+
context.sendSplitRequest();
168+
}
163169
}
164170

165171
@Override
@@ -246,7 +252,9 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
246252
mySqlSourceReaderContext.setStopBinlogSplitReader();
247253
} else if (sourceEvent instanceof WakeupReaderEvent) {
248254
WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent;
249-
if (wakeupReaderEvent.getTarget() == BINLOG_READER) {
255+
if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) {
256+
context.sendSplitRequest();
257+
} else {
250258
if (suspendedBinlogSplit != null) {
251259
context.sendSourceEventToCoordinator(
252260
new LatestFinishedSplitsSizeRequestEvent());

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/RecordUtils.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import java.util.Arrays;
4242
import java.util.Collection;
4343
import java.util.HashMap;
44-
import java.util.LinkedHashMap;
4544
import java.util.List;
4645
import java.util.Map;
4746
import java.util.Optional;

0 commit comments

Comments
 (0)