Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down
27 changes: 20 additions & 7 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TableImporter is a helper struct to import a table.
Expand Down Expand Up @@ -1034,15 +1036,26 @@ func (tr *TableImporter) postProcess(

var remoteChecksum *local.RemoteChecksum
remoteChecksum, err = DoChecksum(ctx, tr.tableInfo)
failpoint.Inject("checksum-error", func() {
tr.logger.Info("failpoint checksum-error injected.")
remoteChecksum = nil
err = status.Error(codes.Unknown, "Checksum meets error.")
})
if err != nil {
return false, err
if rc.cfg.PostRestore.Checksum != config.OpLevelOptional {
return false, err
}
tr.logger.Warn("do checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
err = tr.compareChecksum(remoteChecksum, localChecksum)
// with post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
if remoteChecksum != nil {
err = tr.compareChecksum(remoteChecksum, localChecksum)
// with post restore level 'optional', we will skip checksum error
if rc.cfg.PostRestore.Checksum == config.OpLevelOptional {
if err != nil {
tr.logger.Warn("compare checksum failed, will skip this error and go on", log.ShortError(err))
err = nil
}
}
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_routes/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ schema-pattern = "routes_a*"
table-pattern = "t*"
target-schema = "routes_b"
target-table = "u"

[post-restore]
checksum = "optional"
5 changes: 5 additions & 0 deletions br/tests/lightning_routes/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

set -eux

echo "testing checksum-error..."
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/checksum-error=1*return()"

run_sql 'DROP DATABASE IF EXISTS routes_a0;'
run_sql 'DROP DATABASE IF EXISTS routes_a1;'
run_sql 'DROP DATABASE IF EXISTS routes_b;'

run_lightning

echo "test checksum-error success!"

run_sql 'SELECT count(1), sum(x) FROM routes_b.u;'
check_contains 'count(1): 4'
check_contains 'sum(x): 259'
Expand Down
29 changes: 17 additions & 12 deletions disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,26 @@ func verifyChecksum(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostPr
}
remoteChecksum, err := checksumTable(ctx, globalTaskManager, taskMeta, logger)
if err != nil {
return err
if taskMeta.Plan.Checksum != config.OpLevelOptional {
return err
}
logger.Warn("checksumTable failed, will skip this error and go on", zap.Error(err))
}
if !remoteChecksum.IsEqual(&localChecksum) {
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
remoteChecksum.Checksum, localChecksum.Sum(),
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
remoteChecksum.TotalBytes, localChecksum.SumSize(),
)
if taskMeta.Plan.Checksum == config.OpLevelOptional {
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
err2 = nil
if remoteChecksum != nil {
if !remoteChecksum.IsEqual(&localChecksum) {
err2 := common.ErrChecksumMismatch.GenWithStackByArgs(
remoteChecksum.Checksum, localChecksum.Sum(),
remoteChecksum.TotalKVs, localChecksum.SumKVS(),
remoteChecksum.TotalBytes, localChecksum.SumSize(),
)
if taskMeta.Plan.Checksum == config.OpLevelOptional {
logger.Warn("verify checksum failed, but checksum is optional, will skip it", zap.Error(err2))
err2 = nil
}
return err2
}
return err2
logger.Info("checksum pass", zap.Object("local", &localChecksum))
}
logger.Info("checksum pass", zap.Object("local", &localChecksum))
return nil
}

Expand Down