Skip to content

Commit 6dba62e

Browse files
leonardBangruanhang1993
authored andcommitted
[debezium] Fix DebeizumSourceFunction can not do savepoint after close (#2259)
1 parent 8221f51 commit 6dba62e

File tree

3 files changed

+168
-3
lines changed

3 files changed

+168
-3
lines changed

flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import java.util.concurrent.ThreadFactory;
7272
import java.util.concurrent.TimeUnit;
7373

74+
import static com.ververica.cdc.debezium.internal.Handover.ClosedException.isGentlyClosedException;
7475
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
7576
import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
7677

@@ -304,9 +305,11 @@ private void restoreHistoryRecordsState() throws Exception {
304305
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
305306
if (handover.hasError()) {
306307
LOG.debug("snapshotState() called on closed source");
307-
throw new FlinkRuntimeException(
308-
"Call snapshotState() on closed source, checkpoint failed.",
309-
handover.getError());
308+
if (!isGentlyClosedException(handover.getError())) {
309+
throw new FlinkRuntimeException(
310+
"Call snapshotState() on failed source, checkpoint failed.",
311+
handover.getError());
312+
}
310313
} else {
311314
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
312315
snapshotHistoryRecordsState();
@@ -591,4 +594,9 @@ private Class<?> determineDatabase() {
591594
public String getEngineInstanceName() {
592595
return engineInstanceName;
593596
}
597+
598+
@VisibleForTesting
599+
public Handover getHandover() {
600+
return handover;
601+
}
594602
}

flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/internal/Handover.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ public void close() {
191191

192192
if (error == null) {
193193
error = new ClosedException();
194+
} else if (!(error instanceof ClosedException)) {
195+
error = new ClosedException("Close handover with error.", error);
194196
}
195197
lock.notifyAll();
196198
}
@@ -205,5 +207,20 @@ public void close() {
205207
public static final class ClosedException extends Exception {
206208

207209
private static final long serialVersionUID = 1L;
210+
211+
private static final String GENTLY_CLOSED_MESSAGE = "Close handover gently.";
212+
213+
public ClosedException() {
214+
super(GENTLY_CLOSED_MESSAGE);
215+
}
216+
217+
public ClosedException(String message, Throwable cause) {
218+
super(message, cause);
219+
}
220+
221+
public static boolean isGentlyClosedException(Throwable cause) {
222+
return cause instanceof ClosedException
223+
&& GENTLY_CLOSED_MESSAGE.equals(((ClosedException) cause).getMessage());
224+
}
208225
}
209226
}

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/LegacyMySqlSourceTest.java

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.flink.api.java.tuple.Tuple2;
2020
import org.apache.flink.core.testutils.CheckedThread;
2121
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
22+
import org.apache.flink.util.FlinkRuntimeException;
2223

2324
import com.fasterxml.jackson.core.JsonParseException;
2425
import com.jayway.jsonpath.JsonPath;
@@ -29,6 +30,8 @@
2930
import com.ververica.cdc.connectors.utils.TestSourceContext;
3031
import com.ververica.cdc.debezium.DebeziumSourceFunction;
3132
import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
33+
import com.ververica.cdc.debezium.internal.Handover;
34+
import io.debezium.DebeziumException;
3235
import io.debezium.document.Document;
3336
import io.debezium.document.DocumentWriter;
3437
import io.debezium.relational.Column;
@@ -915,6 +918,143 @@ public void go() throws Exception {
915918
}
916919
}
917920

921+
@Test
922+
public void testSnapshotOnClosedSource() throws Exception {
923+
final TestingListState<byte[]> offsetState = new TestingListState<>();
924+
final TestingListState<String> historyState = new TestingListState<>();
925+
926+
{
927+
try (Connection connection = database.getJdbcConnection();
928+
Statement statement = connection.createStatement()) {
929+
// Step-1: start the source from empty state
930+
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
931+
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
932+
// setup source with empty state
933+
setupSource(source, false, offsetState, historyState, true, 0, 1);
934+
935+
final CheckedThread runThread =
936+
new CheckedThread() {
937+
@Override
938+
public void go() throws Exception {
939+
source.run(sourceContext);
940+
}
941+
};
942+
runThread.start();
943+
944+
// wait until the source finishes the database snapshot
945+
List<SourceRecord> records = drain(sourceContext, 9);
946+
assertEquals(9, records.size());
947+
948+
// state is still empty
949+
assertEquals(0, offsetState.list.size());
950+
assertEquals(0, historyState.list.size());
951+
952+
statement.execute(
953+
"INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110
954+
955+
int received = drain(sourceContext, 1).size();
956+
assertEquals(1, received);
957+
958+
// Step-2: trigger a checkpoint
959+
synchronized (sourceContext.getCheckpointLock()) {
960+
// trigger checkpoint-1
961+
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
962+
}
963+
964+
assertTrue(historyState.list.size() > 0);
965+
assertTrue(offsetState.list.size() > 0);
966+
967+
// Step-3: mock the engine stop with savepoint, trigger a
968+
// checkpoint on closed source
969+
final Handover handover = source.getHandover();
970+
handover.close();
971+
synchronized (sourceContext.getCheckpointLock()) {
972+
// trigger checkpoint-2
973+
source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102));
974+
}
975+
976+
assertTrue(historyState.list.size() > 0);
977+
assertTrue(offsetState.list.size() > 0);
978+
979+
source.close();
980+
runThread.sync();
981+
}
982+
}
983+
}
984+
985+
@Test
986+
public void testSnapshotOnFailedSource() throws Exception {
987+
final TestingListState<byte[]> offsetState = new TestingListState<>();
988+
final TestingListState<String> historyState = new TestingListState<>();
989+
990+
{
991+
try (Connection connection = database.getJdbcConnection();
992+
Statement statement = connection.createStatement()) {
993+
// Step-1: start the source from empty state
994+
final DebeziumSourceFunction<SourceRecord> source = createMySqlBinlogSource();
995+
final TestSourceContext<SourceRecord> sourceContext = new TestSourceContext<>();
996+
// setup source with empty state
997+
setupSource(source, false, offsetState, historyState, true, 0, 1);
998+
999+
final CheckedThread runThread =
1000+
new CheckedThread() {
1001+
@Override
1002+
public void go() throws Exception {
1003+
source.run(sourceContext);
1004+
}
1005+
};
1006+
runThread.start();
1007+
1008+
// wait until the source finishes the database snapshot
1009+
List<SourceRecord> records = drain(sourceContext, 9);
1010+
assertEquals(9, records.size());
1011+
1012+
// state is still empty
1013+
assertEquals(0, offsetState.list.size());
1014+
assertEquals(0, historyState.list.size());
1015+
1016+
statement.execute(
1017+
"INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)"); // 110
1018+
1019+
int received = drain(sourceContext, 1).size();
1020+
assertEquals(1, received);
1021+
1022+
// Step-2: trigger a checkpoint
1023+
synchronized (sourceContext.getCheckpointLock()) {
1024+
// trigger checkpoint-1
1025+
source.snapshotState(new StateSnapshotContextSynchronousImpl(101, 101));
1026+
}
1027+
1028+
assertTrue(historyState.list.size() > 0);
1029+
assertTrue(offsetState.list.size() > 0);
1030+
1031+
// Step-3: mock the engine stop due to underlying debezium exception, trigger a
1032+
// checkpoint on failed source
1033+
final Handover handover = source.getHandover();
1034+
handover.reportError(new DebeziumException("Mocked debezium exception"));
1035+
handover.close();
1036+
try {
1037+
synchronized (sourceContext.getCheckpointLock()) {
1038+
// trigger checkpoint-2
1039+
source.snapshotState(new StateSnapshotContextSynchronousImpl(102, 102));
1040+
}
1041+
fail("Should fail.");
1042+
} catch (Exception e) {
1043+
assertTrue(e instanceof FlinkRuntimeException);
1044+
assertTrue(
1045+
e.getMessage()
1046+
.contains(
1047+
"Call snapshotState() on failed source, checkpoint failed."));
1048+
assertTrue(e.getCause() instanceof Handover.ClosedException);
1049+
assertTrue(e.getCause().getMessage().contains("Close handover with error."));
1050+
} finally {
1051+
source.close();
1052+
runThread.sync();
1053+
}
1054+
}
1055+
}
1056+
}
1057+
9181058
// ------------------------------------------------------------------------------------------
9191059
// Public Utilities
9201060
// ------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)