Skip to content

Commit 3f8520f

Browse files
Leavrthzeminzhou
authored andcommitted
br: improve summary and progress visualization of br (pingcap#56612)
close pingcap#56493
1 parent 9f0aaca commit 3f8520f

File tree

10 files changed

+250
-201
lines changed

10 files changed

+250
-201
lines changed

br/pkg/backup/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"encoding/base64"
99
"encoding/json"
10+
"fmt"
1011
"reflect"
1112
"sort"
1213
"strings"
@@ -1240,10 +1241,15 @@ func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *m
12401241
var rangeAscendErr error
12411242
progressRange.Res.Ascend(func(i btree.Item) bool {
12421243
r := i.(*rtree.Range)
1244+
cfCount := make(map[string]int)
12431245
for _, f := range r.Files {
1246+
cfCount[f.Cf] += 1
12441247
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
12451248
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
12461249
}
1250+
for cf, count := range cfCount {
1251+
summary.CollectInt(fmt.Sprintf("%s CF files", cf), count)
1252+
}
12471253
// we need keep the files in order after we support multi_ingest sst.
12481254
// default_sst and write_sst need to be together.
12491255
if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil {

br/pkg/glue/glue.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,17 @@ type Progress interface {
8282
// called.
8383
Close()
8484
}
85+
86+
// WithProgress execute some logic with the progress, and close it once the execution done.
87+
func WithProgress(
88+
ctx context.Context,
89+
g Glue,
90+
cmdName string,
91+
total int64,
92+
redirectLog bool,
93+
cc func(p Progress) error,
94+
) error {
95+
p := g.StartProgress(ctx, cmdName, total, redirectLog)
96+
defer p.Close()
97+
return cc(p)
98+
}

br/pkg/restore/restorer.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ type MultiTablesRestorer struct {
226226
workerPool *util.WorkerPool
227227
fileImporter BalancedFileImporter
228228
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]
229+
230+
fileCount int
231+
start time.Time
229232
}
230233

231234
func NewMultiTablesRestorer(
@@ -254,22 +257,16 @@ func (m *MultiTablesRestorer) WaitUntilFinish() error {
254257
log.Error("restore files failed", zap.Error(err))
255258
return errors.Trace(err)
256259
}
260+
elapsed := time.Since(m.start)
261+
log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed))
262+
summary.CollectDuration("restore files", elapsed)
263+
summary.CollectSuccessUnit("files", m.fileCount, elapsed)
257264
return nil
258265
}
259266

260-
func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) (err error) {
261-
start := time.Now()
262-
fileCount := 0
263-
defer func() {
264-
elapsed := time.Since(start)
265-
if err == nil {
266-
log.Info("Restore files", zap.Duration("take", elapsed))
267-
summary.CollectSuccessUnit("files", fileCount, elapsed)
268-
}
269-
}()
270-
271-
log.Debug("start to restore files", zap.Int("files", fileCount))
272-
267+
func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) error {
268+
m.start = time.Now()
269+
m.fileCount = 0
273270
if span := opentracing.SpanFromContext(m.ectx); span != nil && span.Tracer() != nil {
274271
span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context()))
275272
defer span1.Finish()
@@ -285,6 +282,9 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
285282
// breaking here directly is also a reasonable behavior.
286283
break
287284
}
285+
for _, fileSet := range batchFileSet {
286+
m.fileCount += len(fileSet.SSTFiles)
287+
}
288288
filesReplica := batchFileSet
289289
m.fileImporter.PauseForBackpressure()
290290
cx := logutil.ContextWithField(m.ectx, zap.Int("sn", i))
@@ -293,7 +293,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
293293
defer func() {
294294
if restoreErr == nil {
295295
logutil.CL(cx).Info("import files done", zap.Duration("take", time.Since(fileStart)))
296-
onProgress(int64(len(filesReplica)))
296+
onProgress(1)
297297
}
298298
}()
299299
if importErr := m.fileImporter.Import(cx, filesReplica...); importErr != nil {

br/pkg/restore/restorer_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,20 @@ func TestMultiTablesRestorerRestoreSuccess(t *testing.T) {
148148

149149
var progress int64
150150
fileSets := createSampleBatchFileSets()
151+
fileSets2 := createSampleBatchFileSets()
151152

152153
var mu sync.Mutex
153154
restorer.GoRestore(func(p int64) {
154155
mu.Lock()
155156
progress += p
156157
mu.Unlock()
157-
}, fileSets)
158+
}, fileSets, fileSets2)
158159
err := restorer.WaitUntilFinish()
159160
require.NoError(t, err)
160161

161162
// Ensure progress was tracked correctly
162163
require.Equal(t, int64(2), progress) // Total files group: 2
163-
require.Equal(t, 1, importer.unblockCount)
164+
require.Equal(t, 2, importer.unblockCount)
164165
}
165166

166167
func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) {

br/pkg/restore/snap_client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ func (rc *SnapClient) InitCheckpoint(
371371
if err != nil {
372372
return checkpointSetWithTableID, nil, errors.Trace(err)
373373
}
374-
374+
// t2 is the latest time the checkpoint checksum persisted to the external storage.
375375
checkpointChecksum, t2, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, execCtx)
376376
if err != nil {
377377
return checkpointSetWithTableID, nil, errors.Trace(err)

br/pkg/restore/snap_client/pipeline_items.go

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
tidbutil "github.com/pingcap/tidb/pkg/util"
3232
"github.com/pingcap/tidb/pkg/util/engine"
3333
pdhttp "github.com/tikv/pd/client/http"
34+
"go.uber.org/multierr"
3435
"go.uber.org/zap"
3536
"golang.org/x/sync/errgroup"
3637
)
@@ -59,6 +60,124 @@ func defaultOutputTableChan() chan *CreatedTable {
5960
return make(chan *CreatedTable, defaultChannelSize)
6061
}
6162

63+
// ExhaustErrors drains all remaining errors in the channel, into a slice of errors.
64+
func ExhaustErrors(ec <-chan error) []error {
65+
out := make([]error, 0, len(ec))
66+
for {
67+
select {
68+
case err := <-ec:
69+
out = append(out, err)
70+
default:
71+
// errCh will NEVER be closed(ya see, it has multi sender-part),
72+
// so we just consume the current backlog of this channel, then return.
73+
return out
74+
}
75+
}
76+
}
77+
78+
type PipelineContext struct {
79+
// pipeline item switch
80+
Checksum bool
81+
LoadStats bool
82+
WaitTiflashReady bool
83+
84+
// pipeline item configuration
85+
LogProgress bool
86+
ChecksumConcurrency uint
87+
StatsConcurrency uint
88+
89+
// pipeline item tool client
90+
KvClient kv.Client
91+
ExtStorage storage.ExternalStorage
92+
Glue glue.Glue
93+
}
94+
95+
// RestorePipeline does checksum, load stats and wait for tiflash to be ready.
96+
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
97+
start := time.Now()
98+
defer func() {
99+
summary.CollectDuration("restore pipeline", time.Since(start))
100+
}()
101+
// We make bigger errCh so we won't block on multi-part failed.
102+
errCh := make(chan error, 32)
103+
postHandleCh := afterTableRestoredCh(ctx, createdTables)
104+
progressLen := int64(0)
105+
if plCtx.Checksum {
106+
progressLen += int64(len(createdTables))
107+
}
108+
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
109+
if plCtx.WaitTiflashReady {
110+
progressLen += int64(len(createdTables))
111+
}
112+
113+
// Redirect to log if there is no log file to avoid unreadable output.
114+
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
115+
defer updateCh.Close()
116+
// pipeline checksum
117+
if plCtx.Checksum {
118+
postHandleCh = rc.GoValidateChecksum(ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
119+
}
120+
121+
// pipeline update meta and load stats
122+
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)
123+
124+
// pipeline wait Tiflash synced
125+
if plCtx.WaitTiflashReady {
126+
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
127+
}
128+
129+
finish := dropToBlackhole(ctx, postHandleCh, errCh)
130+
131+
select {
132+
case err = <-errCh:
133+
err = multierr.Append(err, multierr.Combine(ExhaustErrors(errCh)...))
134+
case <-finish:
135+
}
136+
137+
return errors.Trace(err)
138+
}
139+
140+
func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
141+
outCh := make(chan *CreatedTable)
142+
143+
go func() {
144+
defer close(outCh)
145+
146+
for _, createdTable := range createdTables {
147+
select {
148+
case <-ctx.Done():
149+
return
150+
default:
151+
outCh <- createdTable
152+
}
153+
}
154+
}()
155+
return outCh
156+
}
157+
158+
// dropToBlackhole drop all incoming tables into black hole,
159+
// i.e. don't execute checksum, just increase the process anyhow.
160+
func dropToBlackhole(ctx context.Context, inCh <-chan *CreatedTable, errCh chan<- error) <-chan struct{} {
161+
outCh := make(chan struct{}, 1)
162+
go func() {
163+
defer func() {
164+
close(outCh)
165+
}()
166+
for {
167+
select {
168+
case <-ctx.Done():
169+
errCh <- ctx.Err()
170+
return
171+
case _, ok := <-inCh:
172+
if !ok {
173+
return
174+
}
175+
}
176+
}
177+
}()
178+
return outCh
179+
}
180+
62181
func concurrentHandleTablesCh(
63182
ctx context.Context,
64183
inCh <-chan *CreatedTable,
@@ -114,11 +233,6 @@ func (rc *SnapClient) GoValidateChecksum(
114233
outCh := defaultOutputTableChan()
115234
workers := tidbutil.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
116235
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
117-
start := time.Now()
118-
defer func() {
119-
elapsed := time.Since(start)
120-
summary.CollectSuccessUnit("table checksum", 1, elapsed)
121-
}()
122236
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
123237
if err != nil {
124238
return errors.Trace(err)
@@ -136,6 +250,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
136250
s storage.ExternalStorage,
137251
inCh <-chan *CreatedTable,
138252
errCh chan<- error,
253+
updateCh glue.Progress,
139254
statsConcurrency uint,
140255
loadStats bool,
141256
) chan *CreatedTable {
@@ -186,6 +301,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
186301
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
187302
}
188303
}
304+
updateCh.Inc()
189305
return nil
190306
}, func() {
191307
log.Info("all stats updated")

0 commit comments

Comments
 (0)