Skip to content

Commit d52ecb6

Browse files
committed
fix the fair locking mechanism in unistore
Signed-off-by: Yang Keao <[email protected]>
1 parent 7599574 commit d52ecb6

File tree

3 files changed

+98
-3
lines changed

3 files changed

+98
-3
lines changed

pkg/executor/test/fktest/foreign_key_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2494,3 +2494,36 @@ func TestFKBuild(t *testing.T) {
24942494
tk.MustExec("delete from test.t3")
24952495
tk.MustQuery("select * from test.t2").Check(testkit.Rows())
24962496
}
2497+
2498+
func TestLockKeysInDML(t *testing.T) {
2499+
store := testkit.CreateMockStore(t)
2500+
tk := testkit.NewTestKit(t, store)
2501+
tk.MustExec("use test")
2502+
tk.MustExec("create table t1 (id int primary key);")
2503+
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")
2504+
2505+
tk.MustExec("insert into t1 values (1)")
2506+
tk.MustExec("BEGIN")
2507+
tk.MustExec("INSERT INTO t2 VALUES (1)")
2508+
var wg sync.WaitGroup
2509+
var tk2CommitTime time.Time
2510+
tk2StartTime := time.Now()
2511+
wg.Add(1)
2512+
go func() {
2513+
defer wg.Done()
2514+
tk2 := testkit.NewTestKit(t, store)
2515+
tk2.MustExec("use test")
2516+
tk2.MustExec("BEGIN")
2517+
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
2518+
tk2.MustExec("COMMIT")
2519+
tk2CommitTime = time.Now()
2520+
}()
2521+
sleepDuration := 500 * time.Millisecond
2522+
time.Sleep(sleepDuration)
2523+
tk.MustExec("COMMIT")
2524+
wg.Wait()
2525+
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
2526+
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
2527+
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
2528+
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
2529+
}

pkg/store/mockstore/unistore/tikv/mvcc.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,8 @@ func (store *MVCCStore) pessimisticLockInner(reqCtx *requestCtx, req *kvrpcpb.Pe
293293
}
294294
if !dup {
295295
for i, m := range mutations {
296-
lock, lockedWithConflictTS, err1 := store.buildPessimisticLock(m, items[i], req)
296+
latestExtraMeta := store.getLatestExtraMetaForKey(reqCtx, m)
297+
lock, lockedWithConflictTS, err1 := store.buildPessimisticLock(m, items[i], latestExtraMeta, req)
297298
lockedWithConflictTSList = append(lockedWithConflictTSList, lockedWithConflictTS)
298299
if err1 != nil {
299300
return nil, err1
@@ -666,15 +667,19 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF
666667

667668
// buildPessimisticLock builds the lock according to the request and the current state of the key.
668669
// Returns the built lock, and the LockedWithConflictTS (if any, otherwise 0).
669-
func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.Item,
670+
func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.Item, latestExtraMeta mvcc.DBUserMeta,
670671
req *kvrpcpb.PessimisticLockRequest) (*mvcc.Lock, uint64, error) {
671672
var lockedWithConflictTS uint64 = 0
672673

673674
var writeConflictError error
674675

675676
if item != nil {
676-
userMeta := mvcc.DBUserMeta(item.UserMeta())
677677
if !req.Force {
678+
userMeta := mvcc.DBUserMeta(item.UserMeta())
679+
if latestExtraMeta != nil && latestExtraMeta.CommitTS() > userMeta.CommitTS() {
680+
userMeta = latestExtraMeta
681+
}
682+
678683
if userMeta.CommitTS() > req.ForUpdateTs {
679684
writeConflictError = &kverrors.ErrConflict{
680685
StartTS: req.StartVersion,
@@ -734,6 +739,31 @@ func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.I
734739
return lock, lockedWithConflictTS, nil
735740
}
736741

742+
// getLatestExtraMetaForKey returns the userMeta of the extra txn status key with the biggest commit ts.
743+
// It's used to assert whether a key has been written / locked by another transaction in the fair locking mechanism.
744+
// Theoretically, the rollback record should be ignored. But we have no way to check the rollback record in the
745+
// unistore. Returning record with a bigger commit ts may cause extra retry, but it's safe.
746+
func (store *MVCCStore) getLatestExtraMetaForKey(reqCtx *requestCtx, m *kvrpcpb.Mutation) mvcc.DBUserMeta {
747+
it := reqCtx.getDBReader().GetExtraIter()
748+
rbStartKey := mvcc.EncodeExtraTxnStatusKey(m.Key, math.MaxUint64)
749+
rbEndKey := mvcc.EncodeExtraTxnStatusKey(m.Key, 0)
750+
751+
for it.Seek(rbStartKey); it.Valid(); it.Next() {
752+
item := it.Item()
753+
if len(rbEndKey) != 0 && bytes.Compare(item.Key(), rbEndKey) > 0 {
754+
break
755+
}
756+
key := item.Key()
757+
if len(key) == 0 || (key[0] != tableExtraPrefix && key[0] != metaExtraPrefix) {
758+
continue
759+
}
760+
761+
meta := mvcc.DBUserMeta(item.UserMeta())
762+
return meta
763+
}
764+
return nil
765+
}
766+
737767
// Prewrite implements the MVCCStore interface.
738768
func (store *MVCCStore) Prewrite(reqCtx *requestCtx, req *kvrpcpb.PrewriteRequest) error {
739769
mutations := sortPrewrite(req)

tests/realtikvtest/txntest/txn_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,3 +595,35 @@ func TestDMLWithAddForeignKey(t *testing.T) {
595595

596596
require.True(t, errDML != nil || errDDL != nil)
597597
}
598+
599+
func TestLockKeysInDML(t *testing.T) {
600+
store := realtikvtest.CreateMockStoreAndSetup(t)
601+
tk := testkit.NewTestKit(t, store)
602+
tk.MustExec("use test")
603+
tk.MustExec("create table t1 (id int primary key);")
604+
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")
605+
606+
tk.MustExec("insert into t1 values (1)")
607+
tk.MustExec("BEGIN")
608+
tk.MustExec("INSERT INTO t2 VALUES (1)")
609+
var wg sync.WaitGroup
610+
var tk2CommitTime time.Time
611+
tk2StartTime := time.Now()
612+
wg.Add(1)
613+
go func() {
614+
defer wg.Done()
615+
tk2 := testkit.NewTestKit(t, store)
616+
tk2.MustExec("use test")
617+
tk2.MustExec("BEGIN")
618+
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
619+
tk2.MustExec("COMMIT")
620+
tk2CommitTime = time.Now()
621+
}()
622+
sleepDuration := 500 * time.Millisecond
623+
time.Sleep(sleepDuration)
624+
tk.MustExec("COMMIT")
625+
wg.Wait()
626+
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
627+
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
628+
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
629+
}

0 commit comments

Comments
 (0)