diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index a1ea735d699..3f91a6043d0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -49,8 +49,8 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -247,7 +247,7 @@ public Iterator pollSplitRecords() throws InterruptedException { boolean reachBinlogEnd = false; SourceRecord lowWatermark = null; SourceRecord highWatermark = null; - Map snapshotRecords = new HashMap<>(); + Map snapshotRecords = new LinkedHashMap<>(); while (!reachBinlogEnd) { checkReadException(); List batch = queue.poll(); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java index 43c34f0aa60..a2a2ba1504a 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/reader/MySqlSourceReader.java @@ -144,6 +144,7 @@ public List snapshotState(long checkpointId) { @Override protected void onSplitFinished(Map finishedSplitIds) { + boolean requestNextSplit = true; for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); if (mySqlSplit.isBinlogSplit()) { @@ -154,12 +155,17 @@ protected void onSplitFinished(Map finishedSplitIds) { mySqlSourceReaderContext.resetStopBinlogSplitReader(); suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit()); context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent()); + // do not request next split when the reader is suspended, the suspended reader will + // automatically request the next split after it has been wakeup + requestNextSplit = false; } else { finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); } } reportFinishedSnapshotSplitsIfNeed(); - context.sendSplitRequest(); + if (requestNextSplit) { + context.sendSplitRequest(); + } } @Override