Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/test/fktest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 25,
shard_count = 26,
deps = [
"//pkg/config",
"//pkg/executor",
Expand Down
33 changes: 33 additions & 0 deletions pkg/executor/test/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2494,3 +2494,36 @@ func TestFKBuild(t *testing.T) {
tk.MustExec("delete from test.t3")
tk.MustQuery("select * from test.t2").Check(testkit.Rows())
}

func TestLockKeysInDML(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int primary key);")
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")

tk.MustExec("insert into t1 values (1)")
tk.MustExec("BEGIN")
tk.MustExec("INSERT INTO t2 VALUES (1)")
var wg sync.WaitGroup
var tk2CommitTime time.Time
tk2StartTime := time.Now()
wg.Add(1)
go func() {
defer wg.Done()
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("BEGIN")
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
tk2.MustExec("COMMIT")
tk2CommitTime = time.Now()
}()
sleepDuration := 500 * time.Millisecond
time.Sleep(sleepDuration)
tk.MustExec("COMMIT")
wg.Wait()
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
}
36 changes: 33 additions & 3 deletions pkg/store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ func (store *MVCCStore) pessimisticLockInner(reqCtx *requestCtx, req *kvrpcpb.Pe
}
if !dup {
for i, m := range mutations {
lock, lockedWithConflictTS, err1 := store.buildPessimisticLock(m, items[i], req)
latestExtraMeta := store.getLatestExtraMetaForKey(reqCtx, m)
lock, lockedWithConflictTS, err1 := store.buildPessimisticLock(m, items[i], latestExtraMeta, req)
lockedWithConflictTSList = append(lockedWithConflictTSList, lockedWithConflictTS)
if err1 != nil {
return nil, err1
Expand Down Expand Up @@ -666,15 +667,19 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF

// buildPessimisticLock builds the lock according to the request and the current state of the key.
// Returns the built lock, and the LockedWithConflictTS (if any, otherwise 0).
func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.Item,
func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.Item, latestExtraMeta mvcc.DBUserMeta,
req *kvrpcpb.PessimisticLockRequest) (*mvcc.Lock, uint64, error) {
var lockedWithConflictTS uint64 = 0

var writeConflictError error

if item != nil {
userMeta := mvcc.DBUserMeta(item.UserMeta())
if !req.Force {
userMeta := mvcc.DBUserMeta(item.UserMeta())
if latestExtraMeta != nil && latestExtraMeta.CommitTS() > userMeta.CommitTS() {
userMeta = latestExtraMeta
}

if userMeta.CommitTS() > req.ForUpdateTs {
writeConflictError = &kverrors.ErrConflict{
StartTS: req.StartVersion,
Expand Down Expand Up @@ -734,6 +739,31 @@ func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.I
return lock, lockedWithConflictTS, nil
}

// getLatestExtraMetaForKey returns the userMeta of the extra txn status key with the biggest commit ts.
// It's used to assert whether a key has been written / locked by another transaction in the fair locking mechanism.
// Theoretically, the rollback record should be ignored. But we have no way to check the rollback record in the
// unistore. Returning record with a bigger commit ts may cause extra retry, but it's safe.
func (store *MVCCStore) getLatestExtraMetaForKey(reqCtx *requestCtx, m *kvrpcpb.Mutation) mvcc.DBUserMeta {
it := reqCtx.getDBReader().GetExtraIter()
rbStartKey := mvcc.EncodeExtraTxnStatusKey(m.Key, math.MaxUint64)
rbEndKey := mvcc.EncodeExtraTxnStatusKey(m.Key, 0)

for it.Seek(rbStartKey); it.Valid(); it.Next() {
item := it.Item()
if len(rbEndKey) != 0 && bytes.Compare(item.Key(), rbEndKey) > 0 {
break
}
key := item.Key()
if len(key) == 0 || (key[0] != tableExtraPrefix && key[0] != metaExtraPrefix) {
continue
}

meta := mvcc.DBUserMeta(item.UserMeta())
return meta
}
return nil
}

// Prewrite implements the MVCCStore interface.
func (store *MVCCStore) Prewrite(reqCtx *requestCtx, req *kvrpcpb.PrewriteRequest) error {
mutations := sortPrewrite(req)
Expand Down
32 changes: 32 additions & 0 deletions tests/realtikvtest/txntest/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,35 @@ func TestDMLWithAddForeignKey(t *testing.T) {

require.True(t, errDML != nil || errDDL != nil)
}

func TestLockKeysInDML(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int primary key);")
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")

tk.MustExec("insert into t1 values (1)")
tk.MustExec("BEGIN")
tk.MustExec("INSERT INTO t2 VALUES (1)")
var wg sync.WaitGroup
var tk2CommitTime time.Time
tk2StartTime := time.Now()
wg.Add(1)
go func() {
defer wg.Done()
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("BEGIN")
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
tk2.MustExec("COMMIT")
tk2CommitTime = time.Now()
}()
sleepDuration := 500 * time.Millisecond
time.Sleep(sleepDuration)
tk.MustExec("COMMIT")
wg.Wait()
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
}