Skip to content

Commit 19843ce

Browse files
authored
br: restore checksum shouldn't rely on backup checksum (#56712) (#58335)
close #56373
1 parent 7f86cec commit 19843ce

File tree

18 files changed

+152
-81
lines changed

18 files changed

+152
-81
lines changed

br/cmd/br/backup.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ import (
2020

2121
func runBackupCommand(command *cobra.Command, cmdName string) error {
2222
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
23-
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
23+
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
2424
command.SilenceUsage = false
2525
return errors.Trace(err)
2626
}
27+
overrideDefaultBackupConfigIfNeeded(&cfg, command)
2728

2829
ctx := GetDefaultContext()
2930
if cfg.EnableOpenTracing {
@@ -165,3 +166,10 @@ func newRawBackupCommand() *cobra.Command {
165166
task.DefineRawBackupFlags(command)
166167
return command
167168
}
169+
170+
func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) {
171+
// override only if flag not set by user
172+
if !cmd.Flags().Changed(task.FlagChecksum) {
173+
config.Checksum = false
174+
}
175+
}

br/cmd/br/cmd.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func timestampLogFileName() string {
7979
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
8080
}
8181

82-
// AddFlags adds flags to the given cmd.
83-
func AddFlags(cmd *cobra.Command) {
82+
// DefineCommonFlags defines the common flags for all BR cmd operation.
83+
func DefineCommonFlags(cmd *cobra.Command) {
8484
cmd.Version = build.Info()
8585
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
8686
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")
@@ -97,6 +97,8 @@ func AddFlags(cmd *cobra.Command) {
9797
"Set whether to redact sensitive info in log")
9898
cmd.PersistentFlags().String(FlagStatusAddr, "",
9999
"Set the HTTP listening address for the status report service. Set to empty string to disable")
100+
101+
// defines BR task common flags, this is shared by cmd and sql(brie)
100102
task.DefineCommonFlags(cmd.PersistentFlags())
101103

102104
cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",

br/cmd/br/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func main() {
2020
TraverseChildren: true,
2121
SilenceUsage: true,
2222
}
23-
AddFlags(rootCmd)
23+
DefineCommonFlags(rootCmd)
2424
SetDefaultContext(ctx)
2525
rootCmd.AddCommand(
2626
NewDebugCommand(),

br/cmd/br/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222

2323
func runRestoreCommand(command *cobra.Command, cmdName string) error {
2424
cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
25-
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
25+
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
2626
command.SilenceUsage = false
2727
return errors.Trace(err)
2828
}

br/pkg/backup/schema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (ss *Schemas) BackupSchemas(
111111
}
112112

113113
var checksum *checkpoint.ChecksumItem
114-
var exists bool = false
114+
var exists = false
115115
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
116116
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
117117
}
@@ -153,7 +153,7 @@ func (ss *Schemas) BackupSchemas(
153153
zap.Uint64("Crc64Xor", schema.crc64xor),
154154
zap.Uint64("TotalKvs", schema.totalKvs),
155155
zap.Uint64("TotalBytes", schema.totalBytes),
156-
zap.Duration("calculate-take", calculateCost),
156+
zap.Duration("TimeTaken", calculateCost),
157157
zap.Duration("flush-take", flushCost))
158158
}
159159
}

br/pkg/metautil/metafile.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,6 @@ type Table struct {
156156
Stats *handle.JSONTable
157157
}
158158

159-
// NoChecksum checks whether the table has a calculated checksum.
160-
func (tbl *Table) NoChecksum() bool {
161-
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
162-
}
163-
164159
// MetaReader wraps a reader to read both old and new version of backupmeta.
165160
type MetaReader struct {
166161
storage storage.ExternalStorage
@@ -225,14 +220,40 @@ func (reader *MetaReader) readDataFiles(ctx context.Context, output func(*backup
225220
}
226221

227222
// ArchiveSize return the size of Archive data
228-
func (*MetaReader) ArchiveSize(_ context.Context, files []*backuppb.File) uint64 {
223+
func ArchiveSize(files []*backuppb.File) uint64 {
229224
total := uint64(0)
230225
for _, file := range files {
231226
total += file.Size_
232227
}
233228
return total
234229
}
235230

231+
// ChecksumStats contains checksum summary for group of files from a table
232+
type ChecksumStats struct {
233+
Crc64Xor uint64
234+
TotalKvs uint64
235+
TotalBytes uint64
236+
}
237+
238+
// ChecksumExists checks whether it contains the checksum summary
239+
func (stats ChecksumStats) ChecksumExists() bool {
240+
if stats.Crc64Xor == 0 && stats.TotalKvs == 0 && stats.TotalBytes == 0 {
241+
return false
242+
}
243+
return true
244+
}
245+
246+
// CalculateChecksumStatsOnFiles returns the ChecksumStats for the given files
247+
func CalculateChecksumStatsOnFiles(files []*backuppb.File) ChecksumStats {
248+
var stats ChecksumStats
249+
for _, file := range files {
250+
stats.Crc64Xor ^= file.Crc64Xor
251+
stats.TotalKvs += file.TotalKvs
252+
stats.TotalBytes += file.TotalBytes
253+
}
254+
return stats
255+
}
256+
236257
// ReadDDLs reads the ddls from the backupmeta.
237258
// This function is compatible with the old backupmeta.
238259
func (reader *MetaReader) ReadDDLs(ctx context.Context) ([]byte, error) {

br/pkg/restore/client.go

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,8 @@ func (rc *Client) SetRawKVClient(c *RawKVBatchClient) {
361361
rc.rawKVClient = c
362362
}
363363

364-
// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
365-
func (rc *Client) InitBackupMeta(
364+
// LoadSchemaIfNeededAndInitClient loads schemas from BackupMeta to initialize RestoreClient.
365+
func (rc *Client) LoadSchemaIfNeededAndInitClient(
366366
c context.Context,
367367
backupMeta *backuppb.BackupMeta,
368368
backend *backuppb.StorageBackend,
@@ -1397,7 +1397,7 @@ func (rc *Client) GoValidateChecksum(
13971397
elapsed := time.Since(start)
13981398
summary.CollectSuccessUnit("table checksum", 1, elapsed)
13991399
}()
1400-
err := rc.execChecksum(c, tbl, kvClient, concurrency)
1400+
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
14011401
if err != nil {
14021402
return errors.Trace(err)
14031403
}
@@ -1409,7 +1409,7 @@ func (rc *Client) GoValidateChecksum(
14091409
return outCh
14101410
}
14111411

1412-
func (rc *Client) execChecksum(
1412+
func (rc *Client) execAndValidateChecksum(
14131413
ctx context.Context,
14141414
tbl *CreatedTable,
14151415
kvClient kv.Client,
@@ -1420,13 +1420,14 @@ func (rc *Client) execChecksum(
14201420
zap.String("table", tbl.OldTable.Info.Name.O),
14211421
)
14221422

1423-
if tbl.OldTable.NoChecksum() {
1423+
expectedChecksumStats := metautil.CalculateChecksumStatsOnFiles(tbl.OldTable.Files)
1424+
if !expectedChecksumStats.ChecksumExists() {
14241425
logger.Warn("table has no checksum, skipping checksum")
14251426
return nil
14261427
}
14271428

14281429
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
1429-
span1 := span.Tracer().StartSpan("Client.execChecksum", opentracing.ChildOf(span.Context()))
1430+
span1 := span.Tracer().StartSpan("Client.execAndValidateChecksum", opentracing.ChildOf(span.Context()))
14301431
defer span1.Finish()
14311432
ctx = opentracing.ContextWithSpan(ctx, span1)
14321433
}
@@ -1449,22 +1450,25 @@ func (rc *Client) execChecksum(
14491450
return errors.Trace(err)
14501451
}
14511452

1452-
table := tbl.OldTable
1453-
if checksumResp.Checksum != table.Crc64Xor ||
1454-
checksumResp.TotalKvs != table.TotalKvs ||
1455-
checksumResp.TotalBytes != table.TotalBytes {
1453+
checksumMatch := checksumResp.Checksum == expectedChecksumStats.Crc64Xor &&
1454+
checksumResp.TotalKvs == expectedChecksumStats.TotalKvs &&
1455+
checksumResp.TotalBytes == expectedChecksumStats.TotalBytes
1456+
failpoint.Inject("full-restore-validate-checksum", func(_ failpoint.Value) {
1457+
checksumMatch = false
1458+
})
1459+
if !checksumMatch {
14561460
logger.Error("failed in validate checksum",
1457-
zap.Uint64("origin tidb crc64", table.Crc64Xor),
1461+
zap.Uint64("origin tidb crc64", checksumResp.Checksum),
14581462
zap.Uint64("calculated crc64", checksumResp.Checksum),
1459-
zap.Uint64("origin tidb total kvs", table.TotalKvs),
1463+
zap.Uint64("origin tidb total kvs", checksumResp.TotalKvs),
14601464
zap.Uint64("calculated total kvs", checksumResp.TotalKvs),
1461-
zap.Uint64("origin tidb total bytes", table.TotalBytes),
1465+
zap.Uint64("origin tidb total bytes", checksumResp.TotalBytes),
14621466
zap.Uint64("calculated total bytes", checksumResp.TotalBytes),
14631467
)
14641468
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
14651469
}
14661470

1467-
logger.Info("success in validate checksum")
1471+
logger.Info("success in validating checksum")
14681472
return nil
14691473
}
14701474

br/pkg/task/backup.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/pingcap/tidb/util/mathutil"
3838
"github.com/spf13/pflag"
3939
"github.com/tikv/client-go/v2/oracle"
40-
"go.uber.org/multierr"
4140
"go.uber.org/zap"
4241
)
4342

@@ -141,7 +140,7 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
141140
}
142141

143142
// ParseFromFlags parses the backup-related flags from the flag set.
144-
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
143+
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
145144
timeAgo, err := flags.GetDuration(flagBackupTimeago)
146145
if err != nil {
147146
return errors.Trace(err)
@@ -198,9 +197,13 @@ func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
198197
}
199198
cfg.CompressionConfig = *compressionCfg
200199

201-
if err = cfg.Config.ParseFromFlags(flags); err != nil {
202-
return errors.Trace(err)
200+
// parse common flags if needed
201+
if !skipCommonConfig {
202+
if err = cfg.Config.ParseFromFlags(flags); err != nil {
203+
return errors.Trace(err)
204+
}
203205
}
206+
204207
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
205208
if err != nil {
206209
return errors.Trace(err)
@@ -713,18 +716,15 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) {
713716
return oracle.GoTimeToTS(t1), nil
714717
}
715718

716-
func DefaultBackupConfig() BackupConfig {
719+
func DefaultBackupConfig(commonConfig Config) BackupConfig {
717720
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
718-
DefineCommonFlags(fs)
719721
DefineBackupFlags(fs)
720722
cfg := BackupConfig{}
721-
err := multierr.Combine(
722-
cfg.ParseFromFlags(fs),
723-
cfg.Config.ParseFromFlags(fs),
724-
)
723+
err := cfg.ParseFromFlags(fs, true)
725724
if err != nil {
726-
log.Panic("infallible operation failed.", zap.Error(err))
725+
log.Panic("failed to parse backup flags to config", zap.Error(err))
727726
}
727+
cfg.Config = commonConfig
728728
return cfg
729729
}
730730

br/pkg/task/backup_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func TestCheckpointConfigAdjust(t *testing.T) {
8484
DefineBackupFlags(flags)
8585
// in default
8686
flags.Parse([]string{""})
87-
config.ParseFromFlags(flags)
87+
config.ParseFromFlags(flags, false)
8888
require.True(t, config.UseCheckpoint)
8989
require.False(t, config.UseBackupMetaV2)
9090
require.Equal(t, uint64(0), config.LastBackupTS)
@@ -95,7 +95,7 @@ func TestCheckpointConfigAdjust(t *testing.T) {
9595
DefineBackupFlags(flags)
9696
// use incremental backup feature
9797
flags.Parse([]string{"--lastbackupts", "1"})
98-
config.ParseFromFlags(flags)
98+
config.ParseFromFlags(flags, false)
9999
require.False(t, config.UseCheckpoint)
100100
require.False(t, config.UseBackupMetaV2)
101101
require.Equal(t, uint64(1), config.LastBackupTS)
@@ -106,7 +106,7 @@ func TestCheckpointConfigAdjust(t *testing.T) {
106106
DefineBackupFlags(flags)
107107
// use backupmeta v2 feature
108108
flags.Parse([]string{"--use-backupmeta-v2"})
109-
config.ParseFromFlags(flags)
109+
config.ParseFromFlags(flags, false)
110110
require.False(t, config.UseCheckpoint)
111111
require.True(t, config.UseBackupMetaV2)
112112
require.Equal(t, uint64(0), config.LastBackupTS)
@@ -117,7 +117,7 @@ func TestCheckpointConfigAdjust(t *testing.T) {
117117
DefineBackupFlags(flags)
118118
// use both backupmeta v2 feature and incremental backup feature
119119
flags.Parse([]string{"--use-backupmeta-v2", "--lastbackupts", "1"})
120-
config.ParseFromFlags(flags)
120+
config.ParseFromFlags(flags, false)
121121
require.False(t, config.UseCheckpoint)
122122
require.True(t, config.UseBackupMetaV2)
123123
require.Equal(t, uint64(1), config.LastBackupTS)

br/pkg/task/common.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const (
6464
flagRateLimit = "ratelimit"
6565
flagRateLimitUnit = "ratelimit-unit"
6666
flagConcurrency = "concurrency"
67-
flagChecksum = "checksum"
67+
FlagChecksum = "checksum"
6868
flagFilter = "filter"
6969
flagCaseSensitive = "case-sensitive"
7070
flagRemoveTiFlash = "remove-tiflash"
@@ -270,7 +270,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
270270
_ = flags.MarkHidden(flagChecksumConcurrency)
271271

272272
flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node")
273-
flags.Bool(flagChecksum, true, "Run checksum at end of task")
273+
flags.Bool(FlagChecksum, true, "Run checksum at end of task")
274274
flags.Bool(flagRemoveTiFlash, true,
275275
"Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")
276276

@@ -321,7 +321,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
321321

322322
// HiddenFlagsForStream temporary hidden flags that stream cmd not support.
323323
func HiddenFlagsForStream(flags *pflag.FlagSet) {
324-
_ = flags.MarkHidden(flagChecksum)
324+
_ = flags.MarkHidden(FlagChecksum)
325325
_ = flags.MarkHidden(flagChecksumConcurrency)
326326
_ = flags.MarkHidden(flagRateLimit)
327327
_ = flags.MarkHidden(flagRateLimitUnit)
@@ -510,7 +510,7 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
510510
if cfg.Concurrency, err = flags.GetUint32(flagConcurrency); err != nil {
511511
return errors.Trace(err)
512512
}
513-
if cfg.Checksum, err = flags.GetBool(flagChecksum); err != nil {
513+
if cfg.Checksum, err = flags.GetBool(FlagChecksum); err != nil {
514514
return errors.Trace(err)
515515
}
516516
if cfg.ChecksumConcurrency, err = flags.GetUint(flagChecksumConcurrency); err != nil {
@@ -623,6 +623,11 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
623623
return cfg.normalizePDURLs()
624624
}
625625

626+
// OverrideDefaultForBackup override common config for backup tasks
627+
func (cfg *Config) OverrideDefaultForBackup() {
628+
cfg.Checksum = false
629+
}
630+
626631
// NewMgr creates a new mgr at the given PD address.
627632
func NewMgr(ctx context.Context,
628633
g glue.Glue, pds []string,

0 commit comments

Comments
 (0)