Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -1240,10 +1241,15 @@ func collectRangeFiles(progressRangeTree *rtree.ProgressRangeTree, metaWriter *m
var rangeAscendErr error
progressRange.Res.Ascend(func(i btree.Item) bool {
r := i.(*rtree.Range)
cfCount := make(map[string]int)
for _, f := range r.Files {
cfCount[f.Cf] += 1
summary.CollectSuccessUnit(summary.TotalKV, 1, f.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, f.TotalBytes)
}
for cf, count := range cfCount {
summary.CollectInt(fmt.Sprintf("%s CF files", cf), count)
}
// we need keep the files in order after we support multi_ingest sst.
// default_sst and write_sst need to be together.
if err := metaWriter.Send(r.Files, metautil.AppendDataFile); err != nil {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ type Progress interface {
// called.
Close()
}

// WithProgress execute some logic with the progress, and close it once the execution done.
func WithProgress(
ctx context.Context,
g Glue,
cmdName string,
total int64,
redirectLog bool,
cc func(p Progress) error,
) error {
p := g.StartProgress(ctx, cmdName, total, redirectLog)
defer p.Close()
return cc(p)
}
28 changes: 14 additions & 14 deletions br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ type MultiTablesRestorer struct {
workerPool *util.WorkerPool
fileImporter BalancedFileImporter
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]

fileCount int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need lock here or make it atomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable fileCount is modified in GoRestore and load in WaitUntilFinish(Notice that fileCount is not used in workerpool or any other goroutines). Actually, the two functions are always appear together like this:

err := restorer.GoRestore()
if err != nil {
  return err
}
err = restorer.WaitUntilFinish()
if err != nil {
  return err
}

start time.Time
}

func NewMultiTablesRestorer(
Expand Down Expand Up @@ -254,22 +257,16 @@ func (m *MultiTablesRestorer) WaitUntilFinish() error {
log.Error("restore files failed", zap.Error(err))
return errors.Trace(err)
}
elapsed := time.Since(m.start)
log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed))
summary.CollectDuration("restore files", elapsed)
summary.CollectSuccessUnit("files", m.fileCount, elapsed)
return nil
}

func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) (err error) {
start := time.Now()
fileCount := 0
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", fileCount, elapsed)
}
}()

log.Debug("start to restore files", zap.Int("files", fileCount))

func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ...BatchBackupFileSet) error {
m.start = time.Now()
m.fileCount = 0
if span := opentracing.SpanFromContext(m.ectx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -285,6 +282,9 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
// breaking here directly is also a reasonable behavior.
break
}
for _, fileSet := range batchFileSet {
m.fileCount += len(fileSet.SSTFiles)
}
filesReplica := batchFileSet
m.fileImporter.PauseForBackpressure()
cx := logutil.ContextWithField(m.ectx, zap.Int("sn", i))
Expand All @@ -293,7 +293,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets ..
defer func() {
if restoreErr == nil {
logutil.CL(cx).Info("import files done", zap.Duration("take", time.Since(fileStart)))
onProgress(int64(len(filesReplica)))
onProgress(1)
}
}()
if importErr := m.fileImporter.Import(cx, filesReplica...); importErr != nil {
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/restorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,20 @@ func TestMultiTablesRestorerRestoreSuccess(t *testing.T) {

var progress int64
fileSets := createSampleBatchFileSets()
fileSets2 := createSampleBatchFileSets()

var mu sync.Mutex
restorer.GoRestore(func(p int64) {
mu.Lock()
progress += p
mu.Unlock()
}, fileSets)
}, fileSets, fileSets2)
err := restorer.WaitUntilFinish()
require.NoError(t, err)

// Ensure progress was tracked correctly
require.Equal(t, int64(2), progress) // Total files group: 2
require.Equal(t, 1, importer.unblockCount)
require.Equal(t, 2, importer.unblockCount)
}

func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (rc *SnapClient) InitCheckpoint(
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
}

// t2 is the latest time the checkpoint checksum persisted to the external storage.
checkpointChecksum, t2, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, execCtx)
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
Expand Down
126 changes: 121 additions & 5 deletions br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/engine"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -59,6 +60,124 @@ func defaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// ExhaustErrors drains all remaining errors in the channel, into a slice of errors.
func ExhaustErrors(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

type PipelineContext struct {
// pipeline item switch
Checksum bool
LoadStats bool
WaitTiflashReady bool

// pipeline item configuration
LogProgress bool
ChecksumConcurrency uint
StatsConcurrency uint

// pipeline item tool client
KvClient kv.Client
ExtStorage storage.ExternalStorage
Glue glue.Glue
}

// RestorePipeline does checksum, load stats and wait for tiflash to be ready.
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
start := time.Now()
defer func() {
summary.CollectDuration("restore pipeline", time.Since(start))
}()
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)
progressLen := int64(0)
if plCtx.Checksum {
progressLen += int64(len(createdTables))
}
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
if plCtx.WaitTiflashReady {
progressLen += int64(len(createdTables))
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
defer updateCh.Close()
// pipeline checksum
if plCtx.Checksum {
postHandleCh = rc.GoValidateChecksum(ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
}

// pipeline update meta and load stats
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)

// pipeline wait Tiflash synced
if plCtx.WaitTiflashReady {
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
}

finish := dropToBlackhole(ctx, postHandleCh, errCh)

select {
case err = <-errCh:
err = multierr.Append(err, multierr.Combine(ExhaustErrors(errCh)...))
case <-finish:
}

return errors.Trace(err)
}

func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
outCh := make(chan *CreatedTable)

go func() {
defer close(outCh)

for _, createdTable := range createdTables {
select {
case <-ctx.Done():
return
default:
outCh <- createdTable
}
}
}()
return outCh
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(ctx context.Context, inCh <-chan *CreatedTable, errCh chan<- error) <-chan struct{} {
outCh := make(chan struct{}, 1)
go func() {
defer func() {
close(outCh)
}()
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case _, ok := <-inCh:
if !ok {
return
}
}
}
}()
return outCh
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
Expand Down Expand Up @@ -114,11 +233,6 @@ func (rc *SnapClient) GoValidateChecksum(
outCh := defaultOutputTableChan()
workers := tidbutil.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execAndValidateChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
Expand All @@ -136,6 +250,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
s storage.ExternalStorage,
inCh <-chan *CreatedTable,
errCh chan<- error,
updateCh glue.Progress,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
Expand Down Expand Up @@ -186,6 +301,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
updateCh.Inc()
return nil
}, func() {
log.Info("all stats updated")
Expand Down
Loading
Loading