Skip to content

Commit 7b505a3

Browse files
author
Leonard Xu
committed
[mysql] Optimize the checkpoint be optional under single parallelism
1 parent d40765e commit 7b505a3

File tree

8 files changed

+131
-14
lines changed

8 files changed

+131
-14
lines changed

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlParallelSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,11 @@ private Configuration getReaderConfig(SourceReaderContext readerContext) {
130130
public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
131131
SplitEnumeratorContext<MySqlSplit> enumContext) {
132132
MySqlValidator validator = new MySqlValidator(config);
133+
final int currentParallelism = enumContext.currentParallelism();
133134

134135
final MySqlSplitAssigner splitAssigner =
135136
startupMode.equals("initial")
136-
? new MySqlHybridSplitAssigner(config)
137+
? new MySqlHybridSplitAssigner(config, currentParallelism)
137138
: new MySqlBinlogSplitAssigner(config);
138139

139140
return new MySqlSourceEnumerator(enumContext, splitAssigner, validator);
@@ -144,9 +145,11 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
144145
SplitEnumeratorContext<MySqlSplit> enumContext, PendingSplitsState checkpoint) {
145146
MySqlValidator validator = new MySqlValidator(config);
146147
final MySqlSplitAssigner splitAssigner;
148+
final int currentParallelism = enumContext.currentParallelism();
147149
if (checkpoint instanceof HybridPendingSplitsState) {
148150
splitAssigner =
149-
new MySqlHybridSplitAssigner(config, (HybridPendingSplitsState) checkpoint);
151+
new MySqlHybridSplitAssigner(
152+
config, currentParallelism, (HybridPendingSplitsState) checkpoint);
150153
} else if (checkpoint instanceof BinlogPendingSplitsState) {
151154
splitAssigner =
152155
new MySqlBinlogSplitAssigner(config, (BinlogPendingSplitsState) checkpoint);

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,17 @@ public class MySqlHybridSplitAssigner implements MySqlSplitAssigner {
5050

5151
private final MySqlSnapshotSplitAssigner snapshotSplitAssigner;
5252

53-
public MySqlHybridSplitAssigner(Configuration configuration) {
54-
this(new MySqlSnapshotSplitAssigner(configuration), false);
53+
public MySqlHybridSplitAssigner(Configuration configuration, int currentParallelism) {
54+
this(new MySqlSnapshotSplitAssigner(configuration, currentParallelism), false);
5555
}
5656

5757
public MySqlHybridSplitAssigner(
58-
Configuration configuration, HybridPendingSplitsState checkpoint) {
58+
Configuration configuration,
59+
int currentParallelism,
60+
HybridPendingSplitsState checkpoint) {
5961
this(
6062
new MySqlSnapshotSplitAssigner(
61-
configuration, checkpoint.getSnapshotPendingSplits()),
63+
configuration, currentParallelism, checkpoint.getSnapshotPendingSplits()),
6264
checkpoint.isBinlogSplitAssigned());
6365
}
6466

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
6969
private boolean assignerFinished;
7070

7171
private final Configuration configuration;
72+
private final int currentParallelism;
7273
private final LinkedList<TableId> remainingTables;
7374
private final RelationalTableFilters tableFilters;
7475
private final int chunkSize;
@@ -78,9 +79,10 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
7879

7980
@Nullable private Long checkpointIdToFinish;
8081

81-
public MySqlSnapshotSplitAssigner(Configuration configuration) {
82+
public MySqlSnapshotSplitAssigner(Configuration configuration, int currentParallelism) {
8283
this(
8384
configuration,
85+
currentParallelism,
8486
new ArrayList<>(),
8587
new ArrayList<>(),
8688
new HashMap<>(),
@@ -89,9 +91,12 @@ public MySqlSnapshotSplitAssigner(Configuration configuration) {
8991
}
9092

9193
public MySqlSnapshotSplitAssigner(
92-
Configuration configuration, SnapshotPendingSplitsState checkpoint) {
94+
Configuration configuration,
95+
int currentParallelism,
96+
SnapshotPendingSplitsState checkpoint) {
9397
this(
9498
configuration,
99+
currentParallelism,
95100
checkpoint.getAlreadyProcessedTables(),
96101
checkpoint.getRemainingSplits(),
97102
checkpoint.getAssignedSplits(),
@@ -101,12 +106,14 @@ public MySqlSnapshotSplitAssigner(
101106

102107
private MySqlSnapshotSplitAssigner(
103108
Configuration configuration,
109+
int currentParallelism,
104110
List<TableId> alreadyProcessedTables,
105111
List<MySqlSnapshotSplit> remainingSplits,
106112
Map<String, MySqlSnapshotSplit> assignedSplits,
107113
Map<String, BinlogOffset> splitFinishedOffsets,
108114
boolean assignerFinished) {
109115
this.configuration = configuration;
116+
this.currentParallelism = currentParallelism;
110117
this.alreadyProcessedTables = alreadyProcessedTables;
111118
this.remainingSplits = remainingSplits;
112119
this.assignedSplits = assignedSplits;
@@ -166,8 +173,18 @@ public boolean waitingForFinishedSplits() {
166173
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
167174
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
168175
if (allSplitsFinished()) {
169-
LOG.info(
170-
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
176+
// Skip wait one more checkpoint if the parallelism is 1 which means we donot need to
177+
// worry the global data order
178+
// of snapshot splits and binlog split.
179+
if (currentParallelism == 1) {
180+
assignerFinished = true;
181+
LOG.info(
182+
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
183+
184+
} else {
185+
LOG.info(
186+
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
187+
}
171188
}
172189
}
173190

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
/** Tests for {@link BinlogSplitReader}. */
7676
public class BinlogSplitReaderTest extends MySqlTestBase {
7777

78+
private static final int currentParallelism = 4;
7879
private final UniqueDatabase customerDatabase =
7980
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
8081

@@ -586,7 +587,8 @@ private List<String> formatResult(List<SourceRecord> records, DataType dataType)
586587
}
587588

588589
private List<MySqlSnapshotSplit> getMySqlSplits(Configuration configuration, RowType pkType) {
589-
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
590+
final MySqlSnapshotSplitAssigner assigner =
591+
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
590592
assigner.open();
591593
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
592594
while (true) {

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
/** Tests for {@link SnapshotSplitReader}. */
6161
public class SnapshotSplitReaderTest extends MySqlTestBase {
6262

63+
private static final int currentParallelism = 4;
6364
private static final UniqueDatabase customerDatabase =
6465
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
6566

@@ -277,7 +278,8 @@ private List<String> formatResult(List<SourceRecord> records, DataType dataType)
277278
}
278279

279280
private List<MySqlSplit> getMySqlSplits(Configuration configuration, RowType pkType) {
280-
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
281+
final MySqlSnapshotSplitAssigner assigner =
282+
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
281283
assigner.open();
282284
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
283285
while (true) {

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
/** Tests for {@link MySqlHybridSplitAssigner}. */
5656
public class MySqlHybridSplitAssignerTest extends MySqlTestBase {
5757

58+
private static final int currentParallelism = 4;
5859
private static final UniqueDatabase customerDatabase =
5960
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
6061

@@ -114,7 +115,7 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
114115
HybridPendingSplitsState checkpoint =
115116
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
116117
final MySqlHybridSplitAssigner assigner =
117-
new MySqlHybridSplitAssigner(configuration, checkpoint);
118+
new MySqlHybridSplitAssigner(configuration, currentParallelism, checkpoint);
118119

119120
// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
120121
Optional<MySqlSplit> binlogSplit = assigner.getNext();

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
/** Tests for {@link MySqlSnapshotSplitAssigner}. */
4949
public class MySqlSnapshotSplitAssignerTest extends MySqlTestBase {
5050

51+
private static final int currentParallelism = 4;
5152
private static final UniqueDatabase customerDatabase =
5253
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
5354

@@ -121,7 +122,8 @@ private List<String> getTestAssignSnapshotSplits(int splitSize, String[] capture
121122
.collect(Collectors.toList());
122123
configuration.setString("table.whitelist", String.join(",", captureTableIds));
123124

124-
final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner(configuration);
125+
final MySqlSnapshotSplitAssigner assigner =
126+
new MySqlSnapshotSplitAssigner(configuration, currentParallelism);
125127

126128
assigner.open();
127129
List<MySqlSplit> sqlSplits = new ArrayList<>();

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,94 @@ public void testConsumingAllEvents() throws Exception {
206206
result.getJobClient().get().cancel().get();
207207
}
208208

209+
@Test
210+
public void testCheckpointIsOptionalUnderSingleParallelism() throws Exception {
211+
if (incrementalSnapshot) {
212+
env.setParallelism(1);
213+
// check the checkpoint is optional when parallelism is 1
214+
env.getCheckpointConfig().disableCheckpointing();
215+
} else {
216+
return;
217+
}
218+
inventoryDatabase.createAndInitialize();
219+
String sourceDDL =
220+
String.format(
221+
"CREATE TABLE debezium_source ("
222+
+ " `id` INT NOT NULL,"
223+
+ " name STRING,"
224+
+ " description STRING,"
225+
+ " weight DECIMAL(10,3),"
226+
+ " primary key (`id`) not enforced"
227+
+ ") WITH ("
228+
+ " 'connector' = 'mysql-cdc',"
229+
+ " 'hostname' = '%s',"
230+
+ " 'port' = '%s',"
231+
+ " 'username' = '%s',"
232+
+ " 'password' = '%s',"
233+
+ " 'database-name' = '%s',"
234+
+ " 'table-name' = '%s',"
235+
+ " 'debezium.internal.implementation' = '%s',"
236+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
237+
+ " 'server-id' = '%s',"
238+
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
239+
+ ")",
240+
MYSQL_CONTAINER.getHost(),
241+
MYSQL_CONTAINER.getDatabasePort(),
242+
inventoryDatabase.getUsername(),
243+
inventoryDatabase.getPassword(),
244+
inventoryDatabase.getDatabaseName(),
245+
"products",
246+
getDezImplementation(),
247+
incrementalSnapshot,
248+
getServerId(),
249+
getSplitSize());
250+
tEnv.executeSql(sourceDDL);
251+
252+
// async submit job
253+
TableResult result = tEnv.executeSql("SELECT * FROM debezium_source");
254+
CloseableIterator<Row> iterator = result.collect();
255+
String[] expectedSnapshot =
256+
new String[] {
257+
"+I[101, scooter, Small 2-wheel scooter, 3.140]",
258+
"+I[102, car battery, 12V car battery, 8.100]",
259+
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.800]",
260+
"+I[104, hammer, 12oz carpenter's hammer, 0.750]",
261+
"+I[105, hammer, 14oz carpenter's hammer, 0.875]",
262+
"+I[106, hammer, 16oz carpenter's hammer, 1.000]",
263+
"+I[107, rocks, box of assorted rocks, 5.300]",
264+
"+I[108, jacket, water resistent black wind breaker, 0.100]",
265+
"+I[109, spare tire, 24 inch spare tire, 22.200]"
266+
};
267+
assertThat(
268+
fetchRows(iterator, expectedSnapshot.length), containsInAnyOrder(expectedSnapshot));
269+
270+
try (Connection connection = inventoryDatabase.getJdbcConnection();
271+
Statement statement = connection.createStatement()) {
272+
273+
statement.execute(
274+
"INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110
275+
statement.execute(
276+
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);");
277+
statement.execute(
278+
"UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
279+
statement.execute("UPDATE products SET weight='5.17' WHERE id=111;");
280+
statement.execute("DELETE FROM products WHERE id=111;");
281+
}
282+
283+
String[] expectedBinlog =
284+
new String[] {
285+
"+I[110, jacket, water resistent white wind breaker, 0.200]",
286+
"+I[111, scooter, Big 2-wheel scooter , 5.180]",
287+
"-U[110, jacket, water resistent white wind breaker, 0.200]",
288+
"+U[110, jacket, new water resistent white wind breaker, 0.500]",
289+
"-U[111, scooter, Big 2-wheel scooter , 5.180]",
290+
"+U[111, scooter, Big 2-wheel scooter , 5.170]",
291+
"-D[111, scooter, Big 2-wheel scooter , 5.170]"
292+
};
293+
assertThat(fetchRows(iterator, expectedBinlog.length), containsInAnyOrder(expectedBinlog));
294+
result.getJobClient().get().cancel().get();
295+
}
296+
209297
@Test
210298
public void testAllTypes() throws Throwable {
211299
fullTypesDatabase.createAndInitialize();

0 commit comments

Comments
 (0)