Skip to content

Commit 90e6cd0

Browse files
loserwang1024lvyanquan
authored andcommitted
[cdc-connector][mongodb] Avoid mongodb source to read data after high_watermark in backfill phase (#2893)
(cherry picked from commit 6b6d9fc)
1 parent 1675788 commit 90e6cd0

File tree

3 files changed

+116
-37
lines changed

3 files changed

+116
-37
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.slf4j.LoggerFactory;
4949

5050
import java.time.Instant;
51-
import java.util.Map;
5251
import java.util.Optional;
5352

5453
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.CLUSTER_TIME_FIELD;
@@ -170,15 +169,19 @@ public void execute(Context context) throws Exception {
170169
LOG.info("Ignored {} record: {}", operationType, changeStreamDocument);
171170
}
172171
}
173-
174-
if (changeRecord != null) {
172+
if (changeRecord != null && !isBoundedRead()) {
175173
queue.enqueue(new DataChangeEvent(changeRecord));
176174
}
177175

178176
if (isBoundedRead()) {
179177
ChangeStreamOffset currentOffset;
180178
if (changeRecord != null) {
181179
currentOffset = new ChangeStreamOffset(getResumeToken(changeRecord));
180+
// The log after the high watermark won't emit.
181+
if (currentOffset.isAtOrBefore(streamSplit.getEndingOffset())) {
182+
queue.enqueue(new DataChangeEvent(changeRecord));
183+
}
184+
182185
} else {
183186
// Heartbeat is not turned on or there is no update event
184187
currentOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
@@ -227,8 +230,7 @@ public void close() {}
227230
private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
228231
ChangeStreamDescriptor changeStreamDescriptor) {
229232
ChangeStreamOffset offset =
230-
new ChangeStreamOffset(
231-
(Map<String, String>) streamSplit.getStartingOffset().getOffset());
233+
new ChangeStreamOffset(streamSplit.getStartingOffset().getOffset());
232234
ChangeStreamIterable<Document> changeStreamIterable =
233235
getChangeStreamIterable(sourceConfig, changeStreamDescriptor);
234236

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
7373

7474
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
7575
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
76+
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
7677

7778
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
7879

@@ -224,8 +225,7 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
224225
return;
225226
}
226227

227-
List<String> records =
228-
testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, true);
228+
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
229229

230230
List<String> expectedRecords =
231231
Arrays.asList(
@@ -261,8 +261,7 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
261261
return;
262262
}
263263

264-
List<String> records =
265-
testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, true);
264+
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
266265

267266
List<String> expectedRecords =
268267
Arrays.asList(
@@ -292,14 +291,50 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
292291
assertEqualsInAnyOrder(expectedRecords, records);
293292
}
294293

294+
@Test
295+
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
296+
if (!parallelismSnapshot) {
297+
return;
298+
}
299+
List<String> records =
300+
testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK);
301+
List<String> expectedRecords =
302+
Arrays.asList(
303+
"+I[101, user_1, Shanghai, 123567891234]",
304+
"+I[102, user_2, Shanghai, 123567891234]",
305+
"+I[103, user_3, Shanghai, 123567891234]",
306+
"+I[109, user_4, Shanghai, 123567891234]",
307+
"+I[110, user_5, Shanghai, 123567891234]",
308+
"+I[111, user_6, Shanghai, 123567891234]",
309+
"+I[118, user_7, Shanghai, 123567891234]",
310+
"+I[121, user_8, Shanghai, 123567891234]",
311+
"+I[123, user_9, Shanghai, 123567891234]",
312+
"+I[1009, user_10, Shanghai, 123567891234]",
313+
"+I[1010, user_11, Shanghai, 123567891234]",
314+
"+I[1011, user_12, Shanghai, 123567891234]",
315+
"+I[1012, user_13, Shanghai, 123567891234]",
316+
"+I[1013, user_14, Shanghai, 123567891234]",
317+
"+I[1014, user_15, Shanghai, 123567891234]",
318+
"+I[1015, user_16, Shanghai, 123567891234]",
319+
"+I[1016, user_17, Shanghai, 123567891234]",
320+
"+I[1017, user_18, Shanghai, 123567891234]",
321+
"+I[1018, user_19, Shanghai, 123567891234]",
322+
"+I[1019, user_20, Shanghai, 123567891234]",
323+
"+I[2000, user_21, Shanghai, 123567891234]",
324+
"+I[15213, user_15213, Shanghai, 123567891234]",
325+
"-U[2000, user_21, Shanghai, 123567891234]",
326+
"+U[2000, user_21, Pittsburgh, 123567891234]",
327+
"-D[1019, user_20, Shanghai, 123567891234]");
328+
assertEqualsInAnyOrder(expectedRecords, records);
329+
}
330+
295331
@Test
296332
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
297333
if (!parallelismSnapshot) {
298334
return;
299335
}
300336

301-
List<String> records =
302-
testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK, true);
337+
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK);
303338

304339
List<String> expectedRecords =
305340
Arrays.asList(
@@ -339,8 +374,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
339374
return;
340375
}
341376

342-
List<String> records =
343-
testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK, true);
377+
List<String> records = testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK);
344378

345379
List<String> expectedRecords =
346380
Arrays.asList(
@@ -376,8 +410,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
376410
}
377411

378412
private List<String> testBackfillWhenWritingEvents(
379-
boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
380-
throws Exception {
413+
boolean skipBackFill, int fetchSize, int hookType) throws Exception {
381414

382415
String customerDatabase =
383416
"customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
@@ -415,11 +448,11 @@ private List<String> testBackfillWhenWritingEvents(
415448
.username(FLINK_USER)
416449
.password(FLINK_USER_PASSWORD)
417450
.startupOptions(StartupOptions.initial())
418-
.scanFullChangelog(enableFullDocPrePostImage)
451+
.scanFullChangelog(true)
419452
.collectionList(
420453
getCollectionNameRegex(
421454
customerDatabase, new String[] {"customers"}))
422-
.deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
455+
.deserializer(customerTable.getDeserializer(true))
423456
.skipSnapshotBackfill(skipBackFill)
424457
.build();
425458

@@ -443,10 +476,16 @@ private List<String> testBackfillWhenWritingEvents(
443476
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
444477
};
445478

446-
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
447-
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
448-
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
449-
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
479+
switch (hookType) {
480+
case USE_POST_LOWWATERMARK_HOOK:
481+
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
482+
break;
483+
case USE_PRE_HIGHWATERMARK_HOOK:
484+
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
485+
break;
486+
case USE_POST_HIGHWATERMARK_HOOK:
487+
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
488+
break;
450489
}
451490
source.setSnapshotHooks(hooks);
452491

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/source/MongoDBParallelSourceITCase.java

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
public class MongoDBParallelSourceITCase extends MongoDBSourceTestBase {
6868
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
6969
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
70+
private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
7071

7172
@Rule public final Timeout timeoutPerTest = Timeout.seconds(300);
7273

@@ -150,8 +151,7 @@ public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exc
150151
@Test
151152
public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
152153

153-
List<String> records =
154-
testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, false);
154+
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK);
155155

156156
List<String> expectedRecords =
157157
Arrays.asList(
@@ -184,8 +184,7 @@ public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
184184
@Test
185185
public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
186186

187-
List<String> records =
188-
testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, false);
187+
List<String> records = testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK);
189188

190189
List<String> expectedRecords =
191190
Arrays.asList(
@@ -216,10 +215,45 @@ public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
216215
}
217216

218217
@Test
219-
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
218+
public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
220219

221220
List<String> records =
222-
testBackfillWhenWritingEvents(true, 24, USE_PRE_HIGHWATERMARK_HOOK, false);
221+
testBackfillWhenWritingEvents(false, 24, USE_POST_HIGHWATERMARK_HOOK);
222+
List<String> expectedRecords =
223+
Arrays.asList(
224+
"+I[101, user_1, Shanghai, 123567891234]",
225+
"+I[102, user_2, Shanghai, 123567891234]",
226+
"+I[103, user_3, Shanghai, 123567891234]",
227+
"+I[109, user_4, Shanghai, 123567891234]",
228+
"+I[110, user_5, Shanghai, 123567891234]",
229+
"+I[111, user_6, Shanghai, 123567891234]",
230+
"+I[118, user_7, Shanghai, 123567891234]",
231+
"+I[121, user_8, Shanghai, 123567891234]",
232+
"+I[123, user_9, Shanghai, 123567891234]",
233+
"+I[1009, user_10, Shanghai, 123567891234]",
234+
"+I[1010, user_11, Shanghai, 123567891234]",
235+
"+I[1011, user_12, Shanghai, 123567891234]",
236+
"+I[1012, user_13, Shanghai, 123567891234]",
237+
"+I[1013, user_14, Shanghai, 123567891234]",
238+
"+I[1014, user_15, Shanghai, 123567891234]",
239+
"+I[1015, user_16, Shanghai, 123567891234]",
240+
"+I[1016, user_17, Shanghai, 123567891234]",
241+
"+I[1017, user_18, Shanghai, 123567891234]",
242+
"+I[1018, user_19, Shanghai, 123567891234]",
243+
"+I[1019, user_20, Shanghai, 123567891234]",
244+
"+I[2000, user_21, Shanghai, 123567891234]",
245+
"+I[15213, user_15213, Shanghai, 123567891234]",
246+
"+U[2000, user_21, Pittsburgh, 123567891234]",
247+
// delete message only contains _id, sql job contain value because of
248+
// changelog normalization
249+
"-D[0, null, null, null]");
250+
assertEqualsInAnyOrder(expectedRecords, records);
251+
}
252+
253+
@Test
254+
public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
255+
256+
List<String> records = testBackfillWhenWritingEvents(true, 24, USE_PRE_HIGHWATERMARK_HOOK);
223257

224258
List<String> expectedRecords =
225259
Arrays.asList(
@@ -257,8 +291,7 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
257291
@Test
258292
public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
259293

260-
List<String> records =
261-
testBackfillWhenWritingEvents(true, 24, USE_POST_LOWWATERMARK_HOOK, false);
294+
List<String> records = testBackfillWhenWritingEvents(true, 24, USE_POST_LOWWATERMARK_HOOK);
262295

263296
List<String> expectedRecords =
264297
Arrays.asList(
@@ -295,8 +328,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
295328
}
296329

297330
private List<String> testBackfillWhenWritingEvents(
298-
boolean skipBackFill, int fetchSize, int hookType, boolean enableFullDocPrePostImage)
299-
throws Exception {
331+
boolean skipBackFill, int fetchSize, int hookType) throws Exception {
300332
String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
301333
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
302334
env.enableCheckpointing(1000);
@@ -319,11 +351,11 @@ private List<String> testBackfillWhenWritingEvents(
319351
.username(FLINK_USER)
320352
.password(FLINK_USER_PASSWORD)
321353
.startupOptions(StartupOptions.initial())
322-
.scanFullChangelog(enableFullDocPrePostImage)
354+
.scanFullChangelog(false)
323355
.collectionList(
324356
getCollectionNameRegex(
325357
customerDatabase, new String[] {"customers"}))
326-
.deserializer(customerTable.getDeserializer(enableFullDocPrePostImage))
358+
.deserializer(customerTable.getDeserializer(false))
327359
.skipSnapshotBackfill(skipBackFill)
328360
.build();
329361

@@ -347,10 +379,16 @@ private List<String> testBackfillWhenWritingEvents(
347379
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
348380
};
349381

350-
if (hookType == USE_POST_LOWWATERMARK_HOOK) {
351-
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
352-
} else if (hookType == USE_PRE_HIGHWATERMARK_HOOK) {
353-
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
382+
switch (hookType) {
383+
case USE_POST_LOWWATERMARK_HOOK:
384+
hooks.setPostLowWatermarkAction(snapshotPhaseHook);
385+
break;
386+
case USE_PRE_HIGHWATERMARK_HOOK:
387+
hooks.setPreHighWatermarkAction(snapshotPhaseHook);
388+
break;
389+
case USE_POST_HIGHWATERMARK_HOOK:
390+
hooks.setPostHighWatermarkAction(snapshotPhaseHook);
391+
break;
354392
}
355393
source.setSnapshotHooks(hooks);
356394

0 commit comments

Comments
 (0)