Skip to content

Commit 742642a

Browse files
lyzx2001ti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#45486
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 325a8b8 commit 742642a

File tree

5 files changed

+278
-7
lines changed

5 files changed

+278
-7
lines changed

br/pkg/lightning/restore/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,12 @@ go_library(
8282
"@com_github_tikv_pd_client//:client",
8383
"@io_etcd_go_etcd_client_v3//:client",
8484
"@org_golang_google_grpc//:grpc",
85+
<<<<<<< HEAD:br/pkg/lightning/restore/BUILD.bazel
8586
"@org_golang_google_grpc//keepalive",
87+
=======
88+
"@org_golang_google_grpc//codes",
89+
"@org_golang_google_grpc//status",
90+
>>>>>>> dc5d5394dfd (lightning: make OpLevelOptional suppress the error of DoChecksum (#45486)):br/pkg/lightning/importer/BUILD.bazel
8691
"@org_golang_x_exp//maps",
8792
"@org_golang_x_exp//slices",
8893
"@org_golang_x_sync//errgroup",

br/pkg/lightning/restore/table_restore.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import (
4444
"go.uber.org/multierr"
4545
"go.uber.org/zap"
4646
"golang.org/x/exp/slices"
47+
"google.golang.org/grpc/codes"
48+
"google.golang.org/grpc/status"
4749
)
4850

4951
type TableRestore struct {
@@ -843,15 +845,26 @@ func (tr *TableRestore) postProcess(
843845

844846
var remoteChecksum *RemoteChecksum
845847
remoteChecksum, err = DoChecksum(ctx, tr.tableInfo)
848+
failpoint.Inject("checksum-error", func() {
849+
tr.logger.Info("failpoint checksum-error injected.")
850+
remoteChecksum = nil
851+
err = status.Error(codes.Unknown, "Checksum meets error.")
852+
})
846853
if err != nil {
847-
return false, err
854+
if rc.cfg.PostRestore.Checksum != config.OpLevelOptional {
855+
return false, err
856+
}
857+
tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err))
858+
err = nil
848859
}
849-
err = tr.compareChecksum(remoteChecksum, localChecksum)
850-
// with post restore level 'optional', we will skip checksum error
851-
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
852-
if err != nil {
853-
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
854-
err = nil
860+
if remoteChecksum != nil {
861+
err = tr.compareChecksum(remoteChecksum, localChecksum)
862+
// with post restore level 'optional', we will skip checksum error
863+
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
864+
if err != nil {
865+
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
866+
err = nil
867+
}
855868
}
856869
}
857870
} else {

br/tests/lightning_routes/config.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ schema-pattern = "routes_a*"
88
table-pattern = "t*"
99
target-schema = "routes_b"
1010
target-table = "u"
11+
12+
[post-restore]
13+
checksum = "optional"

br/tests/lightning_routes/run.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@
44

55
set -eux
66

7+
echo "testing checksum-error..."
8+
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()"
9+
710
run_sql 'DROP DATABASE IF EXISTS routes_a0;'
811
run_sql 'DROP DATABASE IF EXISTS routes_a1;'
912
run_sql 'DROP DATABASE IF EXISTS routes_b;'
1013

1114
run_lightning
1215

16+
echo "test checksum-error success!"
17+
1318
run_sql 'SELECT count(1), sum(x) FROM routes_b.u;'
1419
check_contains 'count(1): 4'
1520
check_contains 'sum(x): 259'
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package importinto
16+
17+
import (
18+
"context"
19+
"strconv"
20+
"time"
21+
22+
"github.com/pingcap/errors"
23+
"github.com/pingcap/failpoint"
24+
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
25+
"github.com/pingcap/tidb/br/pkg/lightning/common"
26+
"github.com/pingcap/tidb/br/pkg/lightning/config"
27+
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
28+
"github.com/pingcap/tidb/disttask/framework/proto"
29+
"github.com/pingcap/tidb/disttask/framework/scheduler"
30+
"github.com/pingcap/tidb/disttask/framework/storage"
31+
"github.com/pingcap/tidb/executor/importer"
32+
"github.com/pingcap/tidb/kv"
33+
"github.com/pingcap/tidb/sessionctx"
34+
"github.com/pingcap/tidb/sessionctx/variable"
35+
"github.com/pingcap/tidb/util/logutil"
36+
"github.com/pingcap/tidb/util/mathutil"
37+
"github.com/tikv/client-go/v2/util"
38+
"go.uber.org/zap"
39+
)
40+
41+
// TestSyncChan is used to test.
42+
var TestSyncChan = make(chan struct{})
43+
44+
// ImportMinimalTaskExecutor is a minimal task executor for IMPORT INTO.
45+
type ImportMinimalTaskExecutor struct {
46+
mTtask *importStepMinimalTask
47+
}
48+
49+
// Run implements the SubtaskExecutor.Run interface.
50+
func (e *ImportMinimalTaskExecutor) Run(ctx context.Context) error {
51+
logger := logutil.BgLogger().With(zap.String("type", proto.ImportInto), zap.Int64("table-id", e.mTtask.Plan.TableInfo.ID))
52+
logger.Info("run minimal task")
53+
failpoint.Inject("waitBeforeSortChunk", func() {
54+
time.Sleep(3 * time.Second)
55+
})
56+
failpoint.Inject("errorWhenSortChunk", func() {
57+
failpoint.Return(errors.New("occur an error when sort chunk"))
58+
})
59+
failpoint.Inject("syncBeforeSortChunk", func() {
60+
TestSyncChan <- struct{}{}
61+
<-TestSyncChan
62+
})
63+
chunkCheckpoint := toChunkCheckpoint(e.mTtask.Chunk)
64+
sharedVars := e.mTtask.SharedVars
65+
if err := importer.ProcessChunk(ctx, &chunkCheckpoint, sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, sharedVars.Progress, logger); err != nil {
66+
return err
67+
}
68+
69+
sharedVars.mu.Lock()
70+
defer sharedVars.mu.Unlock()
71+
sharedVars.Checksum.Add(&chunkCheckpoint.Checksum)
72+
return nil
73+
}
74+
75+
type postProcessMinimalTaskExecutor struct {
76+
mTask *postProcessStepMinimalTask
77+
}
78+
79+
func (e *postProcessMinimalTaskExecutor) Run(ctx context.Context) error {
80+
mTask := e.mTask
81+
failpoint.Inject("waitBeforePostProcess", func() {
82+
time.Sleep(5 * time.Second)
83+
})
84+
return postProcess(ctx, mTask.taskMeta, &mTask.meta, mTask.logger)
85+
}
86+
87+
// postProcess does the post-processing for the task.
88+
func postProcess(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
89+
failpoint.Inject("syncBeforePostProcess", func() {
90+
TestSyncChan <- struct{}{}
91+
<-TestSyncChan
92+
})
93+
94+
logger.Info("post process")
95+
96+
// TODO: create table indexes depends on the option.
97+
// create table indexes even if the post process is failed.
98+
// defer func() {
99+
// err2 := createTableIndexes(ctx, globalTaskManager, taskMeta, logger)
100+
// err = multierr.Append(err, err2)
101+
// }()
102+
103+
return verifyChecksum(ctx, taskMeta, subtaskMeta, logger)
104+
}
105+
106+
func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) error {
107+
if taskMeta.Plan.Checksum == config.OpLevelOff {
108+
return nil
109+
}
110+
localChecksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum)
111+
logger.Info("local checksum", zap.Object("checksum", &localChecksum))
112+
113+
failpoint.Inject("waitCtxDone", func() {
114+
<-ctx.Done()
115+
})
116+
117+
globalTaskManager, err := storage.GetTaskManager()
118+
if err != nil {
119+
return err
120+
}
121+
remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger)
122+
if err != nil {
123+
if taskMeta.Plan.Checksum != config.OpLevelOptional {
124+
return err
125+
}
126+
logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err))
127+
}
128+
if remoteChecksum != nil {
129+
if !remoteChecksum.IsEqual(&localChecksum) {
130+
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
131+
remoteChecksum.Checksum, localChecksum.Sum(),
132+
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
133+
remoteChecksum.TotalBytes, localChecksum.SumSize(),
134+
)
135+
if taskMeta.Plan.Checksum == config.OpLevelOptional {
136+
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
137+
err2 = nil
138+
}
139+
return err2
140+
}
141+
logger.Info("checksum pass", zap.Object("local", &localChecksum))
142+
}
143+
return nil
144+
}
145+
146+
func checksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) {
147+
var (
148+
tableName = common.UniqueTable(taskMeta.Plan.DBName, taskMeta.Plan.TableInfo.Name.L)
149+
sql = "ADMIN CHECKSUM TABLE " + tableName
150+
maxErrorRetryCount = 3
151+
distSQLScanConcurrencyFactor = 1
152+
remoteChecksum *local.RemoteChecksum
153+
txnErr error
154+
)
155+
156+
ctx = util.WithInternalSourceType(ctx, kv.InternalImportInto)
157+
for i := 0; i < maxErrorRetryCount; i++ {
158+
txnErr = executor.WithNewTxn(ctx, func(se sessionctx.Context) error {
159+
// increase backoff weight
160+
if err := setBackoffWeight(se, taskMeta, logger); err != nil {
161+
logger.Warn("set tidb_backoff_weight failed", zap.Error(err))
162+
}
163+
164+
distSQLScanConcurrency := se.GetSessionVars().DistSQLScanConcurrency()
165+
se.GetSessionVars().SetDistSQLScanConcurrency(mathutil.Max(distSQLScanConcurrency/distSQLScanConcurrencyFactor, local.MinDistSQLScanConcurrency))
166+
defer func() {
167+
se.GetSessionVars().SetDistSQLScanConcurrency(distSQLScanConcurrency)
168+
}()
169+
170+
rs, err := storage.ExecSQL(ctx, se, sql)
171+
if err != nil {
172+
return err
173+
}
174+
if len(rs) < 1 {
175+
return errors.New("empty checksum result")
176+
}
177+
178+
failpoint.Inject("errWhenChecksum", func() {
179+
if i == 0 {
180+
failpoint.Return(errors.New("occur an error when checksum, coprocessor task terminated due to exceeding the deadline"))
181+
}
182+
})
183+
184+
// ADMIN CHECKSUM TABLE <schema>.<table> example.
185+
// mysql> admin checksum table test.t;
186+
// +---------+------------+---------------------+-----------+-------------+
187+
// | Db_name | Table_name | Checksum_crc64_xor | Total_kvs | Total_bytes |
188+
// +---------+------------+---------------------+-----------+-------------+
189+
// | test | t | 8520875019404689597 | 7296873 | 357601387 |
190+
// +---------+------------+-------------
191+
remoteChecksum = &local.RemoteChecksum{
192+
Schema: rs[0].GetString(0),
193+
Table: rs[0].GetString(1),
194+
Checksum: rs[0].GetUint64(2),
195+
TotalKVs: rs[0].GetUint64(3),
196+
TotalBytes: rs[0].GetUint64(4),
197+
}
198+
return nil
199+
})
200+
if !common.IsRetryableError(txnErr) {
201+
break
202+
}
203+
distSQLScanConcurrencyFactor *= 2
204+
logger.Warn("retry checksum table", zap.Int("retry count", i+1), zap.Error(txnErr))
205+
}
206+
return remoteChecksum, txnErr
207+
}
208+
209+
// TestChecksumTable is used to test checksum table in unit test.
210+
func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error) {
211+
return checksumTable(ctx, executor, taskMeta, logger)
212+
}
213+
214+
func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Logger) error {
215+
backoffWeight := local.DefaultBackoffWeight
216+
if val, ok := taskMeta.Plan.ImportantSysVars[variable.TiDBBackOffWeight]; ok {
217+
if weight, err := strconv.Atoi(val); err == nil && weight > backoffWeight {
218+
backoffWeight = weight
219+
}
220+
}
221+
logger.Info("set backoff weight", zap.Int("weight", backoffWeight))
222+
return se.GetSessionVars().SetSystemVar(variable.TiDBBackOffWeight, strconv.Itoa(backoffWeight))
223+
}
224+
225+
func init() {
226+
scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepImport,
227+
// The order of the subtask executors is the same as the order of the subtasks.
228+
func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) {
229+
task, ok := minimalTask.(*importStepMinimalTask)
230+
if !ok {
231+
return nil, errors.Errorf("invalid task type %T", minimalTask)
232+
}
233+
return &ImportMinimalTaskExecutor{mTtask: task}, nil
234+
},
235+
)
236+
scheduler.RegisterSubtaskExectorConstructor(proto.ImportInto, StepPostProcess,
237+
func(minimalTask proto.MinimalTask, step int64) (scheduler.SubtaskExecutor, error) {
238+
mTask, ok := minimalTask.(*postProcessStepMinimalTask)
239+
if !ok {
240+
return nil, errors.Errorf("invalid task type %T", minimalTask)
241+
}
242+
return &postProcessMinimalTaskExecutor{mTask: mTask}, nil
243+
},
244+
)
245+
}

0 commit comments

Comments
 (0)