Skip to content

Commit 24b88f0

Browse files
authored
txn: fix resolver cache usage for async commit (#1629)
Signed-off-by: cfzjywxk <[email protected]>
1 parent 31ecc42 commit 24b88f0

File tree

5 files changed

+182
-53
lines changed

5 files changed

+182
-53
lines changed

integration_tests/async_commit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ func (s *testAsyncCommitSuite) TestRollbackAsyncCommitEnforcesFallback() {
631631
s.Nil(err)
632632
status, err := resolver.GetTxnStatus(s.bo, lock.TxnID, []byte("a"), currentTS, currentTS, false, false, nil)
633633
s.Nil(err)
634-
if status.IsRolledBack() {
634+
if s.store.GetOracle().IsExpired(lock.TxnID, status.TTL(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
635635
break
636636
}
637637
time.Sleep(time.Millisecond * 30)

integration_tests/lock_test.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -613,7 +613,7 @@ func (s *testLockSuite) prepareTxnFallenBackFromAsyncCommit() {
613613
committer, err := txn.NewCommitter(1)
614614
s.Nil(err)
615615
s.Equal(committer.GetMutations().Len(), 2)
616-
committer.SetLockTTL(0)
616+
committer.SetLockTTL(1)
617617
committer.SetUseAsyncCommit()
618618
committer.SetCommitTS(committer.GetStartTS() + (100 << 18)) // 100ms
619619

@@ -642,10 +642,20 @@ func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit() {
642642
err = lr.CheckAllSecondaries(bo, lock, &status)
643643
s.True(lr.IsNonAsyncCommitLock(err))
644644

645-
status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
646-
s.Nil(err)
645+
resolveStarted := time.Now()
646+
for {
647+
status, err = lr.GetTxnStatusFromLock(bo, lock, 0, true)
648+
s.Nil(err)
649+
if status.Action() == kvrpcpb.Action_TTLExpireRollback {
650+
break
651+
}
652+
if time.Since(resolveStarted) > 10*time.Second {
653+
s.NoError(errors.Errorf("Resolve fallback async commit locks timeout"))
654+
}
655+
}
647656
s.Equal(status.Action(), kvrpcpb.Action_TTLExpireRollback)
648657
s.Equal(status.TTL(), uint64(0))
658+
s.Equal(status.IsRolledBack(), true)
649659
}
650660

651661
func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
@@ -654,9 +664,18 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
654664
lock := s.mustGetLock([]byte("fb1"))
655665
s.True(lock.UseAsyncCommit)
656666
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
657-
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
658-
s.Nil(err)
659-
s.Equal(expire, int64(0))
667+
668+
resolveStarted := time.Now()
669+
for {
670+
expire, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
671+
s.Nil(err)
672+
if expire == 0 {
673+
break
674+
}
675+
if time.Since(resolveStarted) > 10*time.Second {
676+
s.NoError(errors.Errorf("Resolve fallback async commit locks timeout"))
677+
}
678+
}
660679

661680
t3, err := s.store.Begin()
662681
s.Nil(err)
@@ -938,7 +957,7 @@ func (s *testLockSuite) TestResolveLocksForRead() {
938957
locks = append(locks, lock)
939958

940959
// can't be pushed but is expired
941-
startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 0, false, true)
960+
startTS, _ = s.lockKey([]byte("k5"), []byte("v5"), []byte("k55"), []byte("v55"), 1, false, true)
942961
committedLocks = append(committedLocks, startTS)
943962
lock = s.mustGetLock([]byte("k5"))
944963
locks = append(locks, lock)
@@ -972,6 +991,9 @@ func (s *testLockSuite) TestResolveLocksForRead() {
972991
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
973992
lr := s.store.NewLockResolver()
974993
defer lr.Close()
994+
995+
// Sleep for a while to make sure the async commit lock "k5" expires, so it could be resolve commit.
996+
time.Sleep(500 * time.Millisecond)
975997
msBeforeExpired, resolved, committed, err := lr.ResolveLocksForRead(bo, readStartTS, locks, false)
976998
s.Nil(err)
977999
s.Greater(msBeforeExpired, int64(0))

txnkv/txnlock/lock_resolver.go

Lines changed: 101 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,27 @@ func (lr *LockResolver) Close() {
113113

114114
// TxnStatus represents a txn's final status. It should be Lock or Commit or Rollback.
115115
type TxnStatus struct {
116+
// The ttl is set from the `CheckTxnStatus` kv response, it is read only and do not change it.
116117
ttl uint64
117118
commitTS uint64
118119
action kvrpcpb.Action
119120
primaryLock *kvrpcpb.LockInfo
120121
}
121122

122123
// IsCommitted returns true if the txn's final status is Commit.
123-
func (s TxnStatus) IsCommitted() bool { return s.ttl == 0 && s.commitTS > 0 }
124+
func (s TxnStatus) IsCommitted() bool { return s.commitTS > 0 }
124125

125126
// IsRolledBack returns true if the txn's final status is rolled back.
126-
func (s TxnStatus) IsRolledBack() bool { return s.ttl == 0 && s.commitTS == 0 }
127+
func (s TxnStatus) IsRolledBack() bool {
128+
return s.ttl == 0 && s.commitTS == 0 && (s.action == kvrpcpb.Action_NoAction ||
129+
s.action == kvrpcpb.Action_LockNotExistRollback ||
130+
s.action == kvrpcpb.Action_TTLExpireRollback)
131+
}
132+
133+
// IsStatusDetermined returns true if the txn's final status is determined.
134+
func (s TxnStatus) IsStatusDetermined() bool {
135+
return s.IsRolledBack() || s.IsCommitted()
136+
}
127137

128138
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
129139
func (s TxnStatus) CommitTS() uint64 { return s.commitTS }
@@ -137,27 +147,37 @@ func (s TxnStatus) Action() kvrpcpb.Action { return s.action }
137147
// StatusCacheable checks whether the transaction status is certain.True will be
138148
// returned if its status is certain:
139149
//
140-
// If transaction is already committed, the result could be cached.
141-
// Otherwise:
142-
// If l.LockType is pessimistic lock type:
143-
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
144-
// - if its primary lock is prewrite lock type, the check txn status could be cached.
145-
// If l.lockType is prewrite lock type:
146-
// - always cache the check txn status result.
150+
// The `CheckTxnStatus` status logic is:
151+
//
152+
// If l.LockType is pessimistic lock type:
153+
// - if its primary lock is pessimistic too, the check txn status result should NOT be cached.
154+
// - if its primary lock is prewrite lock type, the check txn status could be cached.
155+
// If l.lockType is prewrite lock type:
156+
// - always cache the check txn status result.
147157
//
148158
// For prewrite locks, their primary keys should ALWAYS be the correct one and will NOT change.
159+
//
160+
// The mapping from `CheckTxnStatus` kv result to the tidb status:
161+
//
162+
// TxnStatus::RolledBack => resp.set_action(Action::NoAction),
163+
// TxnStatus::TtlExpire => resp.set_action(Action::TtlExpireRollback),
164+
// TxnStatus::LockNotExist => resp.set_action(Action::LockNotExistRollback),
165+
// TxnStatus::Committed { commit_ts } => {
166+
// resp.set_commit_version(commit_ts.into_inner())
167+
// }
168+
//
169+
// So the transaction is regarded as committed if the commit_ts is not 0, and rolled back if the
170+
// `action` equals `Action::NoAction` or `Action::LockNotExistRollback` or `Action::TtlExpireRollback`.
171+
// Refer to the tikv `CheckTxnStatus` handling logic for more information.
149172
func (s TxnStatus) StatusCacheable() bool {
150-
if s.IsCommitted() {
151-
return true
152-
}
153-
if s.ttl == 0 {
154-
if s.action == kvrpcpb.Action_NoAction ||
155-
s.action == kvrpcpb.Action_LockNotExistRollback ||
156-
s.action == kvrpcpb.Action_TTLExpireRollback {
157-
return true
158-
}
159-
}
160-
return false
173+
return s.IsStatusDetermined()
174+
}
175+
176+
// HasSameDeterminedStatus checks whether the current status is equal to another status if the
177+
// transaction status is determined.
178+
func (s TxnStatus) HasSameDeterminedStatus(other TxnStatus) bool {
179+
return (s.IsCommitted() && other.IsCommitted() && s.CommitTS() == other.CommitTS()) ||
180+
(s.IsRolledBack() && other.IsRolledBack())
161181
}
162182

163183
func (s TxnStatus) String() string {
@@ -204,10 +224,23 @@ func NewLock(l *kvrpcpb.LockInfo) *Lock {
204224
}
205225

206226
func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
227+
if !status.IsStatusDetermined() {
228+
logutil.BgLogger().Error("unexpected undetermined status saved to cache",
229+
zap.Uint64("txnID", txnID), zap.Stringer("status", status), zap.Stack("stack"))
230+
panic("unexpected undetermined status saved to cache")
231+
}
207232
lr.mu.Lock()
208233
defer lr.mu.Unlock()
209234

210-
if _, ok := lr.mu.resolved[txnID]; ok {
235+
if savedStatus, ok := lr.mu.resolved[txnID]; ok {
236+
// The saved determined status should always equal to the new one.
237+
if !(savedStatus.HasSameDeterminedStatus(status)) {
238+
logutil.BgLogger().Error("unexpected txn status saving to the cache, the existing status is not equal to the new one",
239+
zap.Uint64("txnID", txnID),
240+
zap.String("existing status", savedStatus.String()),
241+
zap.String("new status", status.String()))
242+
panic("unexpected txn status saved to cache with existing different entry")
243+
}
211244
return
212245
}
213246
lr.mu.resolved[txnID] = status
@@ -491,19 +524,28 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptio
491524
} else if err != nil {
492525
return TxnStatus{}, err
493526
}
494-
if status.ttl != 0 {
527+
ttlExpired := (lr.store == nil) || (lr.store.GetOracle().IsExpired(l.TxnID, status.ttl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}))
528+
expiredAsyncCommitLocks := status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit && ttlExpired
529+
if status.ttl != 0 && !expiredAsyncCommitLocks {
495530
return status, nil
496531
}
497532

498-
// If the lock is committed or rolled back, resolve lock.
499-
// If the lock is regarded as an expired pessimistic lock, pessimistic rollback it.
533+
// If the lock is non-async-commit type:
534+
// - It is committed or rolled back, resolve lock accordingly.
535+
// - It does not expire, return TTL and backoff wait.
536+
// - It is regarded as an expired pessimistic lock, pessimistic rollback it.
537+
//
538+
// Else if the lock is an async-commit lock:
539+
// - It does not expire, return TTL and backoff wait.
540+
// - Otherwise, try to trigger the `resolveAsyncCommitLock` process to determine the status of
541+
// corresponding async commit transaction.
500542
metrics.LockResolverCountWithExpired.Inc()
501543
cleanRegions, exists := cleanTxns[l.TxnID]
502544
if !exists {
503545
cleanRegions = make(map[locate.RegionVerID]struct{})
504546
cleanTxns[l.TxnID] = cleanRegions
505547
}
506-
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
548+
if expiredAsyncCommitLocks {
507549
// resolveAsyncCommitLock will resolve all locks of the transaction, so we needn't resolve
508550
// it again if it has been resolved once.
509551
if exists {
@@ -821,11 +863,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
821863
status.action = cmdResp.Action
822864
status.primaryLock = cmdResp.LockInfo
823865

824-
if status.primaryLock != nil && status.primaryLock.UseAsyncCommit && !forceSyncCommit {
825-
if !lr.store.GetOracle().IsExpired(txnID, cmdResp.LockTtl, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) {
826-
status.ttl = cmdResp.LockTtl
827-
}
828-
} else if cmdResp.LockTtl != 0 {
866+
if cmdResp.LockTtl != 0 {
829867
status.ttl = cmdResp.LockTtl
830868
} else {
831869
if cmdResp.CommitVersion == 0 {
@@ -879,7 +917,7 @@ func (data *asyncResolveData) addKeys(locks []*kvrpcpb.LockInfo, expected int, s
879917

880918
// Check locks to see if any have been committed or rolled back.
881919
if len(locks) < expected {
882-
logutil.BgLogger().Debug("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
920+
logutil.BgLogger().Info("addKeys: lock has been committed or rolled back", zap.Uint64("commit ts", commitTS), zap.Uint64("start ts", startTS))
883921
// A lock is missing - the transaction must either have been rolled back or committed.
884922
if !data.missingLock {
885923
// commitTS == 0 => lock has been rolled back.
@@ -970,10 +1008,10 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
9701008
}
9711009

9721010
// resolveAsyncResolveData resolves all locks in an async-commit transaction according to the status.
973-
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, data *asyncResolveData) error {
1011+
func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, status TxnStatus, keys [][]byte) error {
9741012
util.EvalFailpoint("resolveAsyncResolveData")
9751013

976-
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, data.keys, nil)
1014+
keysByRegion, _, err := lr.store.GetRegionCache().GroupKeysByRegion(bo, keys, nil)
9771015
if err != nil {
9781016
return err
9791017
}
@@ -1012,31 +1050,49 @@ func (lr *LockResolver) resolveAsyncResolveData(bo *retry.Backoffer, l *Lock, st
10121050
func (lr *LockResolver) resolveAsyncCommitLock(bo *retry.Backoffer, l *Lock, status TxnStatus, asyncResolveAll bool) (TxnStatus, error) {
10131051
metrics.LockResolverCountWithResolveAsync.Inc()
10141052

1015-
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
1016-
if err != nil {
1017-
return TxnStatus{}, err
1018-
}
1019-
resolveData.keys = append(resolveData.keys, l.Primary)
1053+
var toResolveKeys [][]byte
1054+
if status.IsStatusDetermined() {
1055+
toResolveKeys = make([][]byte, 0, len(status.primaryLock.Secondaries)+1)
1056+
toResolveKeys = append(toResolveKeys, status.primaryLock.Secondaries...)
1057+
toResolveKeys = append(toResolveKeys, l.Primary)
1058+
} else {
1059+
// Only do checkAllSecondaries if the transaction status is undetermined.
1060+
// The async commit transaction is regarded as committed if `resolveData.commitTS` is not 0,
1061+
// otherwise it is regarded as rolled back. The transaction status should be determined if the
1062+
// `checkAllSecondaries` finishes with no errors.
1063+
resolveData, err := lr.checkAllSecondaries(bo, l, &status)
1064+
if err != nil {
1065+
return TxnStatus{}, err
1066+
}
1067+
resolveData.keys = append(resolveData.keys, l.Primary)
10201068

1021-
status.commitTS = resolveData.commitTs
1022-
if status.StatusCacheable() {
1023-
lr.saveResolved(l.TxnID, status)
1069+
status.commitTS = resolveData.commitTs
1070+
if status.StatusCacheable() {
1071+
lr.saveResolved(l.TxnID, status)
1072+
}
1073+
toResolveKeys = resolveData.keys
10241074
}
10251075

1026-
logutil.BgLogger().Info("resolve async commit", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS))
1076+
if _, err := util.EvalFailpoint("resolveAsyncCommitLockReturn"); err == nil {
1077+
return status, nil
1078+
}
1079+
logutil.BgLogger().Info("resolve async commit locks", zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.commitTS), zap.Stringer("TxnStatus", status))
10271080
if asyncResolveAll {
10281081
asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff)
10291082
go func() {
1030-
err := lr.resolveAsyncResolveData(asyncBo, l, status, resolveData)
1083+
err := lr.resolveAsyncResolveData(asyncBo, l, status, toResolveKeys)
10311084
if err != nil {
10321085
logutil.BgLogger().Info("failed to resolve async-commit locks asynchronously",
10331086
zap.Uint64("startTS", l.TxnID), zap.Uint64("commitTS", status.CommitTS()), zap.Error(err))
10341087
}
10351088
}()
10361089
} else {
1037-
err = lr.resolveAsyncResolveData(bo, l, status, resolveData)
1090+
err := lr.resolveAsyncResolveData(bo, l, status, toResolveKeys)
1091+
if err != nil {
1092+
return TxnStatus{}, err
1093+
}
10381094
}
1039-
return status, err
1095+
return status, nil
10401096
}
10411097

10421098
// checkAllSecondaries checks the secondary locks of an async commit transaction to find out the final

txnkv/txnlock/lock_resolver_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package txnlock
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/pingcap/failpoint"
8+
"github.com/pingcap/kvproto/pkg/kvrpcpb"
9+
"github.com/stretchr/testify/require"
10+
"github.com/tikv/client-go/v2/config/retry"
11+
"github.com/tikv/client-go/v2/util"
12+
)
13+
14+
// TestLockResolverCache is used to cover the issue https://github.com/pingcap/tidb/issues/59494.
15+
func TestLockResolverCache(t *testing.T) {
16+
util.EnableFailpoints()
17+
lockResolver := NewLockResolver(nil)
18+
lock := func(key, primary string, startTS uint64, useAsyncCommit bool, secondaries [][]byte) *kvrpcpb.LockInfo {
19+
return &kvrpcpb.LockInfo{
20+
Key: []byte(key),
21+
PrimaryLock: []byte(primary),
22+
LockVersion: startTS,
23+
UseAsyncCommit: useAsyncCommit,
24+
MinCommitTs: startTS + 1,
25+
Secondaries: secondaries,
26+
}
27+
}
28+
29+
resolvedTxnTS := uint64(1)
30+
k1 := "k1"
31+
k2 := "k2"
32+
resolvedTxnStatus := TxnStatus{
33+
ttl: 0,
34+
commitTS: 10,
35+
primaryLock: lock(k1, k1, resolvedTxnTS, true, [][]byte{[]byte(k2)}),
36+
}
37+
lockResolver.mu.resolved[resolvedTxnTS] = resolvedTxnStatus
38+
toResolveLock := lock(k2, k1, resolvedTxnTS, true, [][]byte{})
39+
backOff := retry.NewBackoffer(context.Background(), asyncResolveLockMaxBackoff)
40+
41+
// Save the async commit transaction resolved result to the resolver cache.
42+
lockResolver.saveResolved(resolvedTxnTS, resolvedTxnStatus)
43+
44+
// With failpoint, the async commit transaction will be resolved and `CheckSecondaries` would not be called.
45+
// Otherwise, the test would panic as the storage is nil.
46+
require.Nil(t, failpoint.Enable("tikvclient/resolveAsyncCommitLockReturn", "return"))
47+
_, err := lockResolver.ResolveLocks(backOff, 5, []*Lock{NewLock(toResolveLock)})
48+
require.NoError(t, err)
49+
require.Nil(t, failpoint.Disable("tikvclient/resolveAsyncCommitLockReturn"))
50+
}

txnkv/txnlock/test_probe.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type LockProbe struct{}
2727
// NewLockStatus returns a txn state that has been locked.
2828
func (l LockProbe) NewLockStatus(keys [][]byte, useAsyncCommit bool, minCommitTS uint64) TxnStatus {
2929
return TxnStatus{
30+
ttl: 1,
3031
primaryLock: &kvrpcpb.LockInfo{
3132
Secondaries: keys,
3233
UseAsyncCommit: useAsyncCommit,

0 commit comments

Comments
 (0)