Skip to content

Commit db9a6b9

Browse files
authored
log backup: set a proper maxVersion to resolve lock (#57178) (#57279)
close #57134
1 parent 286cfe6 commit db9a6b9

File tree

3 files changed

+71
-33
lines changed

3 files changed

+71
-33
lines changed

br/pkg/streamhelper/advancer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"bytes"
77
"context"
88
"fmt"
9-
"math"
109
"strings"
1110
"sync"
1211
"sync/atomic"
@@ -473,7 +472,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
473472
return nil
474473
}
475474

476-
func (c *CheckpointAdvancer) setCheckpoint(ctx context.Context, s spans.Valued) bool {
475+
func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
477476
cp := NewCheckpointWithSpan(s)
478477
if cp.TS < c.lastCheckpoint.TS {
479478
log.Warn("failed to update global checkpoint: stale",
@@ -499,7 +498,7 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context,
499498
return err
500499
}
501500

502-
if c.setCheckpoint(ctx, cp) {
501+
if c.setCheckpoint(cp) {
503502
log.Info("uploading checkpoint for task",
504503
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp.Value)),
505504
zap.Uint64("checkpoint", cp.Value),
@@ -586,7 +585,7 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
586585

587586
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
588587
c.checkpointsMu.Lock()
589-
c.setCheckpoint(ctx, c.checkpoints.Min())
588+
c.setCheckpoint(c.checkpoints.Min())
590589
c.checkpointsMu.Unlock()
591590
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
592591
return errors.Annotate(err, "failed to upload global checkpoint")
@@ -686,10 +685,14 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar
686685
// do not block main tick here
687686
go func() {
688687
failpoint.Inject("AsyncResolveLocks", func() {})
688+
maxTs := uint64(0)
689+
for _, t := range targets {
690+
maxTs = max(maxTs, t.Value)
691+
}
689692
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
690693
// we will scan all locks and try to resolve them by check txn status.
691694
return tikv.ResolveLocksForRange(
692-
ctx, c.env, math.MaxUint64, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
695+
ctx, c.env, maxTs+1, r.StartKey, r.EndKey, tikv.NewGcResolveLockMaxBackoffer, tikv.GCScanLockLimit)
693696
}
694697
workerPool := util.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "advancer resolve locks")
695698
var wg sync.WaitGroup

br/pkg/streamhelper/advancer_test.go

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package streamhelper_test
44

55
import (
6+
"bytes"
67
"context"
78
"fmt"
89
"strings"
@@ -356,48 +357,60 @@ func TestResolveLock(t *testing.T) {
356357
lockRegion := c.findRegionByKey([]byte("01"))
357358
allLocks := []*txnlock.Lock{
358359
{
359-
Key: []byte{1},
360+
Key: []byte("011"),
360361
// TxnID == minCheckpoint
361362
TxnID: minCheckpoint,
362363
},
363364
{
364-
Key: []byte{2},
365+
Key: []byte("012"),
365366
// TxnID > minCheckpoint
366367
TxnID: minCheckpoint + 1,
367368
},
369+
{
370+
Key: []byte("013"),
371+
// this lock cannot be resolved due to scan version
372+
TxnID: oracle.GoTimeToTS(oracle.GetTimeFromTS(minCheckpoint).Add(2 * time.Minute)),
373+
},
368374
}
369375
c.LockRegion(lockRegion, allLocks)
370376

371377
// ensure resolve locks triggered and collect all locks from scan locks
372378
resolveLockRef := atomic.NewBool(false)
373379
env.resolveLocks = func(locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
374380
resolveLockRef.Store(true)
375-
require.ElementsMatch(t, locks, allLocks)
381+
// The third lock has skipped, because it's less than max version.
382+
require.ElementsMatch(t, locks, allLocks[:2])
376383
return loc, nil
377384
}
378385
adv := streamhelper.NewCheckpointAdvancer(env)
379-
// make lastCheckpoint stuck at 123
380-
adv.UpdateLastCheckpoint(streamhelper.NewCheckpointWithSpan(spans.Valued{
381-
Key: kv.KeyRange{
382-
StartKey: kv.Key([]byte("1")),
383-
EndKey: kv.Key([]byte("2")),
384-
},
385-
Value: 123,
386-
}))
387-
adv.NewCheckpoints(
388-
spans.Sorted(spans.NewFullWith([]kv.KeyRange{
389-
{
390-
StartKey: kv.Key([]byte("1")),
391-
EndKey: kv.Key([]byte("2")),
392-
},
393-
}, 0)),
394-
)
395386
adv.StartTaskListener(ctx)
396-
require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil },
397-
time.Second, 50*time.Millisecond)
387+
388+
maxTargetTs := uint64(0)
398389
coll := streamhelper.NewClusterCollector(ctx, env)
390+
coll.SetOnSuccessHook(func(u uint64, kr kv.KeyRange) {
391+
adv.WithCheckpoints(func(s *spans.ValueSortedFull) {
392+
for _, lock := range allLocks {
393+
// if there is any lock key in the range
394+
if bytes.Compare(kr.StartKey, lock.Key) <= 0 && (bytes.Compare(lock.Key, kr.EndKey) < 0 || len(kr.EndKey) == 0) {
395+
// mock lock behavior, do not update checkpoint
396+
s.Merge(spans.Valued{Key: kr, Value: minCheckpoint})
397+
return
398+
}
399+
}
400+
s.Merge(spans.Valued{Key: kr, Value: u})
401+
maxTargetTs = max(maxTargetTs, u)
402+
})
403+
})
399404
err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll)
400405
require.NoError(t, err)
406+
r, err := coll.Finish(ctx)
407+
require.NoError(t, err)
408+
require.Len(t, r.FailureSubRanges, 0)
409+
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
410+
411+
env.maxTs = maxTargetTs + 1
412+
require.Eventually(t, func() bool { return adv.OnTick(ctx) == nil },
413+
time.Second, 50*time.Millisecond)
401414
// now the lock state must be ture. because tick finished and asyncResolveLocks got stuck.
402415
require.True(t, adv.GetInResolvingLock())
403416
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks"))
@@ -406,10 +419,6 @@ func TestResolveLock(t *testing.T) {
406419
// state must set to false after tick
407420
require.Eventually(t, func() bool { return !adv.GetInResolvingLock() },
408421
8*time.Second, 50*time.Microsecond)
409-
r, err := coll.Finish(ctx)
410-
require.NoError(t, err)
411-
require.Len(t, r.FailureSubRanges, 0)
412-
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
413422
}
414423

415424
func TestOwnerDropped(t *testing.T) {

br/pkg/streamhelper/basic_lib_for_test.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"testing"
1818
"time"
1919

20+
"github.com/pingcap/errors"
2021
backup "github.com/pingcap/kvproto/pkg/brpb"
2122
"github.com/pingcap/kvproto/pkg/errorpb"
2223
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@@ -98,6 +99,7 @@ type fakeCluster struct {
9899
idAlloced uint64
99100
stores map[uint64]*fakeStore
100101
regions []*region
102+
maxTs uint64
101103
testCtx *testing.T
102104

103105
onGetClient func(uint64) error
@@ -770,25 +772,39 @@ func (t *testEnv) putTask() {
770772
}
771773

772774
func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
775+
t.mu.Lock()
776+
defer t.mu.Unlock()
777+
if t.maxTs != maxVersion {
778+
return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion)
779+
}
773780
for _, r := range t.regions {
774781
if len(r.locks) != 0 {
775-
return r.locks, &tikv.KeyLocation{
782+
locks := make([]*txnlock.Lock, 0, len(r.locks))
783+
for _, l := range r.locks {
784+
// skip the lock larger than maxVersion
785+
if l.TxnID < maxVersion {
786+
locks = append(locks, l)
787+
}
788+
}
789+
return locks, &tikv.KeyLocation{
776790
Region: tikv.NewRegionVerID(r.id, 0, 0),
777791
}, nil
778792
}
779793
}
780-
return nil, nil, nil
794+
return nil, &tikv.KeyLocation{}, nil
781795
}
782796

783797
func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
798+
t.mu.Lock()
799+
defer t.mu.Unlock()
784800
for _, r := range t.regions {
785801
if loc != nil && loc.Region.GetID() == r.id {
786802
// reset locks
787803
r.locks = nil
788804
return t.resolveLocks(locks, loc)
789805
}
790806
}
791-
return nil, nil
807+
return loc, nil
792808
}
793809

794810
func (t *testEnv) Identifier() string {
@@ -853,6 +869,16 @@ func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Stor
853869
}, nil
854870
}
855871

872+
func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
873+
// only used for GetRegionCache once in resolve lock
874+
return []*metapb.Store{
875+
{
876+
Id: 1,
877+
Address: "127.0.0.1",
878+
},
879+
}, nil
880+
}
881+
856882
func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 {
857883
return 1
858884
}

0 commit comments

Comments
 (0)