Skip to content

Commit 228d37f

Browse files
committed
fix the issue that INSERT IGNORE doesn't lock the parent table
Signed-off-by: Yang Keao <[email protected]>
1 parent fa723c3 commit 228d37f

File tree

2 files changed

+59
-6
lines changed

2 files changed

+59
-6
lines changed

pkg/executor/foreign_key.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,18 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
223223
fkc.stats = &FKCheckRuntimeStats{}
224224
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)
225225
}
226+
start := time.Now()
226227
if len(fkc.toBeCheckedKeys) == 0 && len(fkc.toBeCheckedPrefixKeys) == 0 {
227-
return nil
228+
// It's possible that the `toBeLockedKeys` is not empty. Because the `INSERT IGNORE` will use
229+
// `checkRows` to check each row, which doesn't need this function to check the rows again, but
230+
// it still needs to lock the rows.
231+
err := fkc.doLock(ctx)
232+
if fkc.stats != nil {
233+
fkc.stats.Lock = time.Since(start)
234+
}
235+
236+
return err
228237
}
229-
start := time.Now()
230238
if fkc.stats != nil {
231239
defer func() {
232240
fkc.stats.Keys = len(fkc.toBeCheckedKeys) + len(fkc.toBeCheckedPrefixKeys)
@@ -248,6 +256,15 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
248256
if fkc.stats != nil {
249257
fkc.stats.Check = time.Since(start)
250258
}
259+
260+
err = fkc.doLock(ctx)
261+
if fkc.stats != nil {
262+
fkc.stats.Lock = time.Since(start) - fkc.stats.Check
263+
}
264+
return err
265+
}
266+
267+
func (fkc *FKCheckExec) doLock(ctx context.Context) error {
251268
if len(fkc.toBeLockedKeys) == 0 {
252269
return nil
253270
}
@@ -263,9 +280,6 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
263280
// doLockKeys may set TxnCtx.ForUpdate to 1, then if the lock meet write conflict, TiDB can't retry for update.
264281
// So reset TxnCtx.ForUpdate to 0 then can be retry if meet write conflict.
265282
atomic.StoreUint32(&sessVars.TxnCtx.ForUpdate, forUpdate)
266-
if fkc.stats != nil {
267-
fkc.stats.Lock = time.Since(start) - fkc.stats.Check
268-
}
269283
return err
270284
}
271285

@@ -542,7 +556,7 @@ type fkCheckKey struct {
542556
isPrefix bool
543557
}
544558

545-
func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
559+
func (fkc *FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
546560
if fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
547561
fkc.stats = &FKCheckRuntimeStats{}
548562
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)

tests/realtikvtest/txntest/txn_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,3 +595,42 @@ func TestDMLWithAddForeignKey(t *testing.T) {
595595

596596
require.True(t, errDML != nil || errDDL != nil)
597597
}
598+
599+
func TestLockKeysInInsertIgnore(t *testing.T) {
600+
// FIXME: this test will fail in the unistore. After fixing the issue in unistore, please move
601+
// this test to `foreign_key_test.go`.
602+
603+
store := realtikvtest.CreateMockStoreAndSetup(t)
604+
tk := testkit.NewTestKit(t, store)
605+
tk.MustExec("use test")
606+
tk.MustExec("create table t1 (id int primary key);")
607+
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")
608+
tk.MustExec("insert into t1 values (1)")
609+
610+
tk.MustExec("BEGIN")
611+
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1)")
612+
613+
var wg sync.WaitGroup
614+
var tk2CommitTime time.Time
615+
tk2StartTime := time.Now()
616+
wg.Add(1)
617+
go func() {
618+
defer wg.Done()
619+
620+
tk2 := testkit.NewTestKit(t, store)
621+
tk2.MustExec("use test")
622+
tk2.MustExec("BEGIN")
623+
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
624+
tk2.MustExec("COMMIT")
625+
tk2CommitTime = time.Now()
626+
}()
627+
628+
sleepDuration := 500 * time.Millisecond
629+
time.Sleep(sleepDuration)
630+
tk.MustExec("COMMIT")
631+
wg.Wait()
632+
633+
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
634+
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
635+
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
636+
}

0 commit comments

Comments
 (0)