Skip to content

Commit a4e4216

Browse files
authored
Checksum refactor (pingcap#58)
* backup: run backup first and then checksum Running backup and checksum concurrently slows down the total time Signed-off-by: Neil Shen <[email protected]> * restore: run restore first then checksum Signed-off-by: Neil Shen <[email protected]> * *: unify checksum computation Signed-off-by: Neil Shen <[email protected]> * backup: log total backup checksum time cost Signed-off-by: Neil Shen <[email protected]> * checksum: use low priority Signed-off-by: Neil Shen <[email protected]> * backup: remove duplicated range schema build function Signed-off-by: Neil Shen <[email protected]>
1 parent 5d88a75 commit a4e4216

File tree

22 files changed

+1194
-630
lines changed

22 files changed

+1194
-630
lines changed

cmd/raw.go

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,13 @@ func NewBackupCommand() *cobra.Command {
3838
"ratelimit", "", 0, "The rate limit of the backup task, MB/s per node")
3939
command.PersistentFlags().Uint32P(
4040
"concurrency", "", 4, "The size of thread pool on each node that execute the backup task")
41+
command.PersistentFlags().BoolP("checksum", "", true,
42+
"Run checksum after backup")
4143

42-
command.PersistentFlags().BoolP("checksum", "", false,
44+
command.PersistentFlags().BoolP("fastchecksum", "", false,
4345
"fast checksum backup sst file by calculate all sst file")
4446

45-
_ = command.PersistentFlags().MarkHidden("checksum")
47+
_ = command.PersistentFlags().MarkHidden("fastchecksum")
4648
return command
4749
}
4850

@@ -97,7 +99,17 @@ func newFullBackupCommand() *cobra.Command {
9799
return errors.New("at least one thread required")
98100
}
99101

100-
ranges, err := client.PreBackupAllTableRanges(backupTS)
102+
checksum, err := command.Flags().GetBool("checksum")
103+
if err != nil {
104+
return err
105+
}
106+
fastChecksum, err := command.Flags().GetBool("fastchecksum")
107+
if err != nil {
108+
return err
109+
}
110+
111+
ranges, backupSchemas, err := raw.BuildBackupRangeAndSchema(
112+
client.GetDomain(), backer.GetTiKV(), backupTS, "", "")
101113
if err != nil {
102114
return err
103115
}
@@ -108,38 +120,50 @@ func newFullBackupCommand() *cobra.Command {
108120
return err
109121
}
110122

123+
// Backup
111124
ctx, cancel := context.WithCancel(defaultBacker.Context())
112125
defer cancel()
113126
// Redirect to log if there is no log file to avoid unreadable output.
114127
updateCh := utils.StartProgress(
115128
ctx, "Full Backup", int64(approximateRegions), !HasLogFile())
116-
117129
err = client.BackupRanges(
118130
ranges, u, backupTS, rate, concurrency, updateCh)
119131
if err != nil {
120132
return err
121133
}
134+
// Backup has finished
135+
close(updateCh)
122136

123-
err = client.CompleteMeta()
124-
if err != nil {
125-
return err
137+
// Checksum
138+
backupSchemasConcurrency := raw.DefaultSchemaConcurrency
139+
if backupSchemas.Len() < backupSchemasConcurrency {
140+
backupSchemasConcurrency = backupSchemas.Len()
126141
}
142+
cksctx, ckscancel := context.WithCancel(defaultBacker.Context())
143+
defer ckscancel()
144+
updateCh = utils.StartProgress(
145+
cksctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
146+
backupSchemas.SetSkipChecksum(!checksum)
147+
backupSchemas.Start(
148+
cksctx, backer.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)
127149

128-
checksumSwitch, err := command.Flags().GetBool("checksum")
150+
err = client.CompleteMeta(backupSchemas)
129151
if err != nil {
130152
return err
131153
}
132-
if checksumSwitch {
154+
155+
if fastChecksum {
133156
valid, err := client.FastChecksum()
134157
if err != nil {
135158
return err
136159
}
137-
138160
if !valid {
139161
log.Error("backup FastChecksum not passed!")
140162
}
141-
142163
}
164+
// Checksum has finished
165+
close(updateCh)
166+
143167
return client.SaveBackupMeta(u)
144168
},
145169
}
@@ -210,13 +234,22 @@ func newTableBackupCommand() *cobra.Command {
210234
if concurrency == 0 {
211235
return errors.New("at least one thread required")
212236
}
237+
checksum, err := command.Flags().GetBool("checksum")
238+
if err != nil {
239+
return err
240+
}
241+
fastChecksum, err := command.Flags().GetBool("fastchecksum")
242+
if err != nil {
243+
return err
244+
}
213245

214-
// TODO: include admin check in progress bar.
215-
ranges, err := client.PreBackupTableRanges(db, table, u, backupTS)
246+
ranges, backupSchemas, err := raw.BuildBackupRangeAndSchema(
247+
client.GetDomain(), backer.GetTiKV(), backupTS, db, table)
216248
if err != nil {
217249
return err
218250
}
219-
// the count of regions need to backup
251+
252+
// The number of regions need to backup
220253
approximateRegions := 0
221254
for _, r := range ranges {
222255
var regionCount int
@@ -227,28 +260,35 @@ func newTableBackupCommand() *cobra.Command {
227260
approximateRegions += regionCount
228261
}
229262

263+
// Backup
230264
ctx, cancel := context.WithCancel(defaultBacker.Context())
231265
defer cancel()
232266
// Redirect to log if there is no log file to avoid unreadable output.
233267
updateCh := utils.StartProgress(
234268
ctx, "Table Backup", int64(approximateRegions), !HasLogFile())
235-
236269
err = client.BackupRanges(
237270
ranges, u, backupTS, rate, concurrency, updateCh)
238271
if err != nil {
239272
return err
240273
}
274+
// Backup has finished
275+
close(updateCh)
241276

242-
err = client.CompleteMeta()
243-
if err != nil {
244-
return err
245-
}
277+
// Checksum
278+
cksctx, ckscancel := context.WithCancel(defaultBacker.Context())
279+
defer ckscancel()
280+
updateCh = utils.StartProgress(
281+
cksctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
282+
backupSchemas.SetSkipChecksum(!checksum)
283+
backupSchemas.Start(
284+
cksctx, backer.GetTiKV(), backupTS, 1, updateCh)
246285

247-
checksumSwitch, err := command.Flags().GetBool("checksum")
286+
err = client.CompleteMeta(backupSchemas)
248287
if err != nil {
249288
return err
250289
}
251-
if checksumSwitch {
290+
291+
if fastChecksum {
252292
valid, err := client.FastChecksum()
253293
if err != nil {
254294
return err
@@ -257,6 +297,8 @@ func newTableBackupCommand() *cobra.Command {
257297
log.Error("backup FastChecksum not passed!")
258298
}
259299
}
300+
// Checksum has finished
301+
close(updateCh)
260302

261303
return client.SaveBackupMeta(u)
262304
},

cmd/restore.go

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
// NewRestoreCommand returns a restore subcommand
2020
func NewRestoreCommand() *cobra.Command {
21-
bp := &cobra.Command{
21+
command := &cobra.Command{
2222
Use: "restore",
2323
Short: "restore a TiKV cluster from a backup",
2424
PersistentPreRunE: func(c *cobra.Command, args []string) error {
@@ -30,12 +30,24 @@ func NewRestoreCommand() *cobra.Command {
3030
return nil
3131
},
3232
}
33-
bp.AddCommand(
33+
command.AddCommand(
3434
newFullRestoreCommand(),
3535
newDbRestoreCommand(),
3636
newTableRestoreCommand(),
3737
)
38-
return bp
38+
39+
command.PersistentFlags().String("connect", "",
40+
"the address to connect tidb, format: username:password@protocol(address)/")
41+
command.PersistentFlags().Uint("concurrency", 128,
42+
"The size of thread pool that execute the restore task")
43+
command.PersistentFlags().BoolP("checksum", "", true,
44+
"Run checksum after restore")
45+
46+
if err := command.MarkPersistentFlagRequired("connect"); err != nil {
47+
panic(err)
48+
}
49+
50+
return command
3951
}
4052

4153
func newFullRestoreCommand() *cobra.Command {
@@ -120,16 +132,20 @@ func newFullRestoreCommand() *cobra.Command {
120132
if err != nil {
121133
return errors.Trace(err)
122134
}
123-
err = client.ValidateChecksum(tables, newTables)
124-
return errors.Trace(err)
125-
},
126-
}
135+
// Restore has finished.
136+
close(updateCh)
127137

128-
command.Flags().String("connect", "", "the address to connect tidb, format: username:password@protocol(address)/")
129-
command.Flags().Uint("concurrency", 128, "The size of thread pool that execute the restore task")
138+
// Checksum
139+
updateCh = utils.StartProgress(
140+
ctx, "Checksum", int64(len(newTables)), !HasLogFile())
141+
err = client.ValidateChecksum(ctx, tables, newTables, updateCh)
142+
if err != nil {
143+
return err
144+
}
145+
close(updateCh)
130146

131-
if err := command.MarkFlagRequired("connect"); err != nil {
132-
panic(err)
147+
return nil
148+
},
133149
}
134150

135151
return command
@@ -212,19 +228,24 @@ func newDbRestoreCommand() *cobra.Command {
212228
if err != nil {
213229
return errors.Trace(err)
214230
}
215-
err = client.ValidateChecksum(db.Tables, newTables)
216-
return errors.Trace(err)
217-
},
218-
}
219231

220-
command.Flags().String("connect", "", "the address to connect tidb, format: username:password@protocol(address)/")
221-
command.Flags().Uint("concurrency", 128, "The size of thread pool that execute the restore task")
232+
// Restore has finished.
233+
close(updateCh)
222234

223-
command.Flags().String("db", "", "database name")
235+
// Checksum
236+
updateCh = utils.StartProgress(
237+
ctx, "Checksum", int64(len(newTables)), !HasLogFile())
238+
err = client.ValidateChecksum(ctx, db.Tables, newTables, updateCh)
239+
if err != nil {
240+
return err
241+
}
242+
close(updateCh)
224243

225-
if err := command.MarkFlagRequired("connect"); err != nil {
226-
panic(err)
244+
return nil
245+
},
227246
}
247+
command.Flags().String("db", "", "database name")
248+
228249
if err := command.MarkFlagRequired("db"); err != nil {
229250
panic(err)
230251
}
@@ -310,20 +331,26 @@ func newTableRestoreCommand() *cobra.Command {
310331
if err != nil {
311332
return errors.Trace(err)
312333
}
313-
err = client.ValidateChecksum([]*utils.Table{table}, newTables)
314-
return errors.Trace(err)
334+
// Restore has finished.
335+
close(updateCh)
336+
337+
// Checksum
338+
updateCh = utils.StartProgress(
339+
ctx, "Checksum", int64(len(newTables)), !HasLogFile())
340+
err = client.ValidateChecksum(
341+
ctx, []*utils.Table{table}, newTables, updateCh)
342+
if err != nil {
343+
return err
344+
}
345+
close(updateCh)
346+
347+
return nil
315348
},
316349
}
317350

318-
command.Flags().String("connect", "", "the address to connect tidb, format: username:password@protocol(address)/")
319-
command.Flags().Uint("concurrency", 128, "The size of thread pool that execute the restore task")
320-
321351
command.Flags().String("db", "", "database name")
322352
command.Flags().String("table", "", "table name")
323353

324-
if err := command.MarkFlagRequired("connect"); err != nil {
325-
panic(err)
326-
}
327354
if err := command.MarkFlagRequired("db"); err != nil {
328355
panic(err)
329356
}

0 commit comments

Comments
 (0)