[FLINK-36086] Set isSnapshotCompleted only immediately after snapshot #3552
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
If the SQL Server source connector is restarted while handling updates from a transaction with multiple updates, upon restart, it will skip the non-processed changes and proceed from the next transaction.
This is an analog of DBZ-1128 but reproducible only in Flink CDC.
This is a regression introduced in #2176.
Lower-level details
The
isSnapshotCompleted
offset context flag in Debezium tells the source connector to jump one transaction ahead in the beginning of the streaming phase. This is only necessary during the transition from the initial state snapshot to streaming. Without this flag set, the streaming change data source would stream the changes from the transaction that was already included in the snapshot. This is is the issue that #2176 attempted to address.The following line sets this flag unconditionally for the stream split:
flink-cdc/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerStreamFetchTask.java
Line 59 in c5396fb
It makes Debezium jump one transaction ahead not only after completing the initial state snapshot but always during the start of streaming. This way, it may potentially skip changes from a transaction that wasn't fully captured prior to the restart.
The proposed solution is to set it only during the transition from the initial state snapshot to streaming.
Testing considerations
start(..., Predicate<SourceRecord> isStopRecord)
in the Debezium testing framework). How could I implement a similar case in Flink CDC?