Skip to content

Commit 15afae8

Browse files
authored
Fix changefeed stuck problem when overwrite resume with a forward checkpointTs (#12058)
close #12055
1 parent a085125 commit 15afae8

File tree

4 files changed

+74
-2
lines changed

4 files changed

+74
-2
lines changed

cdc/owner/changefeed.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,8 +524,10 @@ LOOP2:
524524
}
525525
}
526526

527-
if c.resolvedTs == 0 {
527+
// Invariant: ResolvedTs must >= checkpointTs!
528+
if c.resolvedTs == 0 || c.resolvedTs < checkpointTs {
528529
c.resolvedTs = checkpointTs
530+
log.Info("Initialize changefeed resolvedTs!", zap.Uint64("resolvedTs", c.resolvedTs), zap.Uint64("checkpointTs", checkpointTs))
529531
}
530532

531533
failpoint.Inject("NewChangefeedNoRetryError", func() {
@@ -794,6 +796,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
794796
c.barriers = nil
795797
c.initialized.Store(false)
796798
c.isReleased = true
799+
c.resolvedTs = 0
797800

798801
// when closing a changefeed, we must clean the warningCh.
799802
// otherwise, the old warning errors will be handled when the reused changefeed instance is ticked again
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
enable-sync-point = true
2+
sync-point-interval = "30s"
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#!/bin/bash
2+
# the script test when we enable syncpoint, and pause the changefeed,
3+
# then resume with a forward checkpoint, to ensure the changefeed can be sync correctly.
4+
5+
set -eux
6+
7+
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
8+
source $CUR/../_utils/test_prepare
9+
WORK_DIR=$OUT_DIR/$TEST_NAME
10+
CDC_BINARY=cdc.test
11+
SINK_TYPE=$1
12+
13+
function check_ts_forward() {
14+
changefeedid=$1
15+
rts1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
16+
checkpoint1=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
17+
sleep 1
18+
rts2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.resolved_ts')
19+
checkpoint2=$(cdc cli changefeed query --changefeed-id=${changefeedid} 2>&1 | jq '.checkpoint_tso')
20+
if [[ "$rts1" != "null" ]] && [[ "$rts1" != "0" ]]; then
21+
if [[ "$rts1" -ne "$rts2" ]] || [[ "$checkpoint1" -ne "$checkpoint2" ]]; then
22+
echo "changefeed is working normally rts: ${rts1}->${rts2} checkpoint: ${checkpoint1}->${checkpoint2}"
23+
return
24+
fi
25+
fi
26+
exit 1
27+
}
28+
29+
function run() {
30+
# No need to test kafka and storage sink.
31+
if [ "$SINK_TYPE" != "mysql" ]; then
32+
return
33+
fi
34+
35+
rm -rf $WORK_DIR && mkdir -p $WORK_DIR
36+
37+
start_tidb_cluster --workdir $WORK_DIR
38+
39+
cd $WORK_DIR
40+
41+
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
42+
43+
SINK_URI="mysql://[email protected]:3306/"
44+
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --config=$CUR/conf/changefeed.toml --changefeed-id="test4"
45+
46+
check_ts_forward "test4"
47+
48+
run_cdc_cli changefeed pause --changefeed-id="test4"
49+
50+
sleep 15
51+
52+
checkpoint1=$(cdc cli changefeed query --changefeed-id="test4" 2>&1 | jq '.checkpoint_tso')
53+
# add a large number to avoid the problem of losing precision when jq processing large integers
54+
checkpoint1=$((checkpoint1 + 1000000))
55+
56+
# resume a forward checkpointTs
57+
run_cdc_cli changefeed resume --changefeed-id="test4" --no-confirm --overwrite-checkpoint-ts=$checkpoint1
58+
59+
check_ts_forward "test4"
60+
61+
cleanup_process $CDC_BINARY
62+
}
63+
64+
trap stop_tidb_cluster EXIT
65+
run $*
66+
check_logs $WORK_DIR
67+
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

tests/integration_tests/run_group.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ groups=(
4141
["G07"]='kv_client_stream_reconnect cdc split_region'
4242
["G08"]='processor_err_chan changefeed_reconstruct multi_capture'
4343
["G09"]='gc_safepoint changefeed_pause_resume cli savepoint synced_status'
44-
["G10"]='default_value simple cdc_server_tips event_filter'
44+
["G10"]='default_value simple cdc_server_tips event_filter overwrite_resume_with_syncpoint'
4545
["G11"]='resolve_lock move_table autorandom generate_column'
4646
["G12"]='many_pk_or_uk capture_session_done_during_task ddl_attributes'
4747
["G13"]='tiflash region_merge common_1'

0 commit comments

Comments
 (0)