Skip to content

Commit f4faa50

Browse files
authored
br: add integration test for ingest corner case test (#52734) (#52765)
close #52733
1 parent b1e0241 commit f4faa50

File tree

10 files changed

+250
-33
lines changed

10 files changed

+250
-33
lines changed

br/pkg/task/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ go_library(
4444
"//br/pkg/utils",
4545
"//br/pkg/version",
4646
"//config",
47-
"//ddl",
4847
"//kv",
4948
"//parser/model",
5049
"//parser/mysql",

br/pkg/task/stream.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ import (
4949
"github.com/pingcap/tidb/br/pkg/streamhelper/daemon"
5050
"github.com/pingcap/tidb/br/pkg/summary"
5151
"github.com/pingcap/tidb/br/pkg/utils"
52-
"github.com/pingcap/tidb/ddl"
5352
"github.com/pingcap/tidb/kv"
5453
"github.com/pingcap/tidb/parser/model"
5554
"github.com/pingcap/tidb/util/mathutil"
@@ -473,10 +472,6 @@ func (s *streamMgr) checkStreamStartEnable(g glue.Glue) error {
473472
return errors.New("Unable to create task about log-backup. " +
474473
"please set TiKV config `log-backup.enable` to true and restart TiKVs.")
475474
}
476-
if !ddl.IngestJobsNotExisted(se.GetSessionCtx()) {
477-
return errors.Annotate(berrors.ErrUnknown,
478-
"Unable to create log backup task. Please wait until the DDL jobs(add index with ingest method) are finished.")
479-
}
480475

481476
return nil
482477
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2024 PingCAP, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -eu
18+
19+
# check index schema
20+
## check table test.pairs
21+
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
22+
check_contains "Column_name: y"
23+
check_contains "Column_name: z"
24+
25+
# check index data
26+
run_sql "select count(*) AS RESCNT from test.pairs use index(i1) where y = 0 and z = 0;"
27+
check_not_contains "RESCNT: 0"
28+
run_sql "admin check table test.pairs;"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE test.pairs ADD INDEX i1(y, z);
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE TABLE test.pairs(x int auto_increment primary key, y int DEFAULT RAND(), z int DEFAULT RAND());
2+
INSERT INTO test.pairs (y,z) VALUES (0,0);
3+
INSERT INTO test.pairs VALUES (),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),(),();

br/tests/br_pitr_failpoint/run.sh

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
#!/bin/bash
2+
#
3+
# Copyright 2024 PingCAP, Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
set -eu
18+
. run_services
19+
CUR=$(cd `dirname $0`; pwd)
20+
21+
# const value
22+
PREFIX="pitr_backup_failpoint" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
23+
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
24+
hint_sig_file_public=$TEST_DIR/hint_sig_file_public
25+
hint_sig_file_history=$TEST_DIR/hint_sig_file_history
26+
27+
# inject some failpoints for TiDB-server
28+
export GO_FAILPOINTS="github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-public=return(\"$hint_sig_file_public\");\
29+
github.com/pingcap/tidb/pkg/ddl/create-index-stuck-before-ddlhistory=return(\"$hint_sig_file_history\")"
30+
31+
# start a new cluster
32+
echo "restart a services"
33+
restart_services
34+
35+
# prepare the data
36+
echo "prepare the data"
37+
run_sql_file $CUR/prepare_data/ingest_repair.sql
38+
39+
# prepare the intersect data
40+
run_sql_file $CUR/intersect_data/ingest_repair1.sql &
41+
sql_pid=$!
42+
43+
# start the log backup task
44+
echo "start log task"
45+
run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log"
46+
47+
# wait until the index creation is running
48+
retry_cnt=0
49+
while true; do
50+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
51+
if grep -Fq "1. row" $res_file; then
52+
break
53+
fi
54+
55+
retry_cnt=$((retry_cnt+1))
56+
if [ "$retry_cnt" -gt 50 ]; then
57+
echo 'the wait lag is too large'
58+
exit 1
59+
fi
60+
61+
sleep 1
62+
done
63+
64+
# run snapshot backup 1 -- before the index becomes public
65+
echo "run snapshot backup"
66+
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-1"
67+
68+
# advance the progress of index creation, make the index become public
69+
touch $hint_sig_file_public
70+
71+
# wait until the index creation is done
72+
retry_cnt=0
73+
while true; do
74+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
75+
if grep -Fq "1. row" $res_file; then
76+
break
77+
fi
78+
79+
retry_cnt=$((retry_cnt+1))
80+
if [ "$retry_cnt" -gt 50 ]; then
81+
echo 'the wait lag is too large'
82+
exit 1
83+
fi
84+
85+
sleep 1
86+
done
87+
88+
# run snapshot backup 2 -- before the ddl history is generated
89+
echo "run snapshot backup"
90+
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full-2"
91+
92+
# advance the progress of index creation, generate ddl history
93+
touch $hint_sig_file_history
94+
95+
# wait index creation done
96+
wait $sql_pid
97+
98+
# wait until the index creation is done
99+
retry_cnt=0
100+
while true; do
101+
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
102+
if grep -Fq "1. row" $res_file; then
103+
break
104+
fi
105+
106+
retry_cnt=$((retry_cnt+1))
107+
if [ "$retry_cnt" -gt 50 ]; then
108+
echo 'the wait lag is too large'
109+
exit 1
110+
fi
111+
112+
sleep 1
113+
done
114+
115+
# clean the failpoints
116+
export GO_FAILPOINTS=""
117+
118+
# check something in the upstream
119+
run_sql "SHOW INDEX FROM test.pairs WHERE Key_name = 'i1';"
120+
check_contains "Column_name: y"
121+
check_contains "Column_name: z"
122+
123+
# wait checkpoint advance
124+
echo "wait checkpoint advance"
125+
sleep 10
126+
current_ts=$(echo $(($(date +%s%3N) << 18)))
127+
echo "current ts: $current_ts"
128+
i=0
129+
while true; do
130+
# extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
131+
log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null)
132+
echo "log backup status: $log_backup_status"
133+
checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end')
134+
echo "checkpoint ts: $checkpoint_ts"
135+
136+
# check whether the checkpoint ts is a number
137+
if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
138+
# check whether the checkpoint has advanced
139+
if [ $checkpoint_ts -gt $current_ts ]; then
140+
echo "the checkpoint has advanced"
141+
break
142+
fi
143+
# the checkpoint hasn't advanced
144+
echo "the checkpoint hasn't advanced"
145+
i=$((i+1))
146+
if [ "$i" -gt 50 ]; then
147+
echo 'the checkpoint lag is too large'
148+
exit 1
149+
fi
150+
sleep 10
151+
else
152+
# unknown status, maybe somewhere is wrong
153+
echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
154+
exit 1
155+
fi
156+
done
157+
158+
# start a new cluster
159+
echo "restart a services"
160+
restart_services
161+
162+
# PITR restore - 1
163+
echo "run pitr 1 -- before the index becomes public"
164+
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-1" > $res_file 2>&1
165+
166+
# check something in downstream cluster
167+
echo "check br log"
168+
check_contains "restore log success summary"
169+
## check feature compatibility between PITR and accelerate indexing
170+
bash $CUR/check/check_ingest_repair.sh
171+
172+
# Clean the data
173+
run_sql "DROP DATABASE test; CREATE DATABASE test;"
174+
175+
# PITR restore - 2
176+
echo "run pitr 2 -- before the index becomes public"
177+
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full-2" > $res_file 2>&1
178+
179+
# check something in downstream cluster
180+
echo "check br log"
181+
check_contains "restore log success summary"
182+
## check feature compatibility between PITR and accelerate indexing
183+
bash $CUR/check/check_ingest_repair.sh

br/tests/config/tikv.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ data-encryption-method = "aes256-ctr"
3333
[security.encryption.master-key]
3434
type = "file"
3535
path = "/tmp/backup_restore_test/master-key-file"
36+
37+
[log-backup]
38+
max-flush-interval = "50s"

br/tests/run_group.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ declare -A groups
2323
groups=(
2424
["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
2525
["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
26-
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history"
26+
["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history br_pitr_failpoint"
2727
["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
2828
["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index'
2929
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'

ddl/ddl_worker.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"fmt"
2020
"math/rand"
21+
"os"
2122
"strconv"
2223
"sync"
2324
"sync/atomic"
@@ -758,6 +759,21 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
758759
if job.IsDone() {
759760
job.State = model.JobStateSynced
760761
}
762+
// Inject the failpoint to prevent the progress of index creation.
763+
failpoint.Inject("create-index-stuck-before-ddlhistory", func(v failpoint.Value) {
764+
if sigFile, ok := v.(string); ok && job.Type == model.ActionAddIndex {
765+
for {
766+
time.Sleep(1 * time.Second)
767+
if _, err := os.Stat(sigFile); err != nil {
768+
if os.IsNotExist(err) {
769+
continue
770+
}
771+
failpoint.Return(0, errors.Trace(err))
772+
}
773+
break
774+
}
775+
}
776+
})
761777
err = w.HandleJobDone(d, job, t)
762778
return 0, err
763779
}

ddl/index.go

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,21 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
695695
return ver, errors.Trace(err)
696696
}
697697
}
698+
// Inject the failpoint to prevent the progress of index creation.
699+
failpoint.Inject("create-index-stuck-before-public", func(v failpoint.Value) {
700+
if sigFile, ok := v.(string); ok {
701+
for {
702+
time.Sleep(1 * time.Second)
703+
if _, err := os.Stat(sigFile); err != nil {
704+
if os.IsNotExist(err) {
705+
continue
706+
}
707+
failpoint.Return(ver, errors.Trace(err))
708+
}
709+
break
710+
}
711+
}
712+
})
698713
indexInfo.State = model.StatePublic
699714
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State)
700715
if err != nil {
@@ -806,32 +821,6 @@ func cleanupSortPath(currentJobID int64) error {
806821
return nil
807822
}
808823

809-
// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
810-
func IngestJobsNotExisted(ctx sessionctx.Context) bool {
811-
se := sess.NewSession(ctx)
812-
template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;"
813-
sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey)
814-
rows, err := se.Execute(context.Background(), sql, "check-pitr")
815-
if err != nil {
816-
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
817-
return false
818-
}
819-
for _, row := range rows {
820-
jobBinary := row.GetBytes(0)
821-
runJob := model.Job{}
822-
err := runJob.Decode(jobBinary)
823-
if err != nil {
824-
logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err))
825-
return false
826-
}
827-
// Check whether this add index job is using lightning to do the backfill work.
828-
if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
829-
return false
830-
}
831-
}
832-
return true
833-
}
834-
835824
func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
836825
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
837826
if job.MultiSchemaInfo.Revertible {

0 commit comments

Comments
 (0)