Skip to content

Commit 891da87

Browse files
ruanhang1993zhangchaoming.zcm
authored andcommitted
[cdc-base] Optimize pure binlog phase check logic to improve performance (apache#1620)
This closes apache#1620.
1 parent 928293e commit 891da87

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/external/JdbcSourceStreamFetcher.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@
3939

4040
import java.util.ArrayList;
4141
import java.util.HashMap;
42+
import java.util.HashSet;
4243
import java.util.Iterator;
4344
import java.util.List;
4445
import java.util.Map;
46+
import java.util.Set;
4547
import java.util.concurrent.ExecutorService;
4648
import java.util.concurrent.Executors;
4749
import java.util.concurrent.ThreadFactory;
@@ -56,6 +58,7 @@ public class JdbcSourceStreamFetcher implements Fetcher<SourceRecords, SourceSpl
5658

5759
private final JdbcSourceFetchTaskContext taskContext;
5860
private final ExecutorService executor;
61+
private final Set<TableId> pureBinlogPhaseTables;
5962

6063
private volatile ChangeEventQueue<DataChangeEvent> queue;
6164
private volatile Throwable readException;
@@ -72,6 +75,7 @@ public JdbcSourceStreamFetcher(JdbcSourceFetchTaskContext taskContext, int subTa
7275
ThreadFactory threadFactory =
7376
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
7477
this.executor = Executors.newSingleThreadExecutor(threadFactory);
78+
this.pureBinlogPhaseTables = new HashSet<>();
7579
}
7680

7781
@Override
@@ -178,9 +182,13 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
178182
}
179183

180184
private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
185+
if (pureBinlogPhaseTables.contains(tableId)) {
186+
return true;
187+
}
181188
// the existed tables those have finished snapshot reading
182189
if (maxSplitHighWatermarkMap.containsKey(tableId)
183190
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
191+
pureBinlogPhaseTables.add(tableId);
184192
return true;
185193
}
186194

@@ -217,5 +225,6 @@ private void configureFilter() {
217225
}
218226
this.finishedSplitsInfo = splitsInfoMap;
219227
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
228+
this.pureBinlogPhaseTables.clear();
220229
}
221230
}

0 commit comments

Comments
 (0)