Skip to content

Commit 2a21c9a

Browse files
committed
fix the issue that INSERT IGNORE doesn't lock the parent table
Signed-off-by: Yang Keao <[email protected]>
1 parent fa723c3 commit 2a21c9a

File tree

3 files changed

+62
-8
lines changed

3 files changed

+62
-8
lines changed

pkg/executor/foreign_key.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,19 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
223223
fkc.stats = &FKCheckRuntimeStats{}
224224
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)
225225
}
226+
start := time.Now()
226227
if len(fkc.toBeCheckedKeys) == 0 && len(fkc.toBeCheckedPrefixKeys) == 0 {
227-
return nil
228+
// It's possible that the `toBeLockedKeys` is not empty. Because the `INSERT IGNORE` will use
229+
// `checkRows` to check each row, which doesn't need this function to check the rows again, but
230+
// it still needs to lock the rows.
231+
err := fkc.doLock(ctx)
232+
if len(fkc.toBeLockedKeys) > 0 && fkc.stats != nil {
233+
fkc.stats.Lock = time.Since(start)
234+
fkc.stats.Total = fkc.stats.Lock
235+
}
236+
237+
return err
228238
}
229-
start := time.Now()
230239
if fkc.stats != nil {
231240
defer func() {
232241
fkc.stats.Keys = len(fkc.toBeCheckedKeys) + len(fkc.toBeCheckedPrefixKeys)
@@ -248,6 +257,15 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
248257
if fkc.stats != nil {
249258
fkc.stats.Check = time.Since(start)
250259
}
260+
261+
err = fkc.doLock(ctx)
262+
if fkc.stats != nil {
263+
fkc.stats.Lock = time.Since(start) - fkc.stats.Check
264+
}
265+
return err
266+
}
267+
268+
func (fkc *FKCheckExec) doLock(ctx context.Context) error {
251269
if len(fkc.toBeLockedKeys) == 0 {
252270
return nil
253271
}
@@ -263,9 +281,6 @@ func (fkc *FKCheckExec) doCheck(ctx context.Context) error {
263281
// doLockKeys may set TxnCtx.ForUpdate to 1, then if the lock meet write conflict, TiDB can't retry for update.
264282
// So reset TxnCtx.ForUpdate to 0 then can be retry if meet write conflict.
265283
atomic.StoreUint32(&sessVars.TxnCtx.ForUpdate, forUpdate)
266-
if fkc.stats != nil {
267-
fkc.stats.Lock = time.Since(start) - fkc.stats.Check
268-
}
269284
return err
270285
}
271286

@@ -542,7 +557,7 @@ type fkCheckKey struct {
542557
isPrefix bool
543558
}
544559

545-
func (fkc FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
560+
func (fkc *FKCheckExec) checkRows(ctx context.Context, sc *stmtctx.StatementContext, txn kv.Transaction, rows []toBeCheckedRow) error {
546561
if fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
547562
fkc.stats = &FKCheckRuntimeStats{}
548563
defer fkc.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(fkc.ID(), fkc.stats)

pkg/executor/test/fktest/foreign_key_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,8 +2088,8 @@ func TestExplainAnalyzeDMLWithFKInfo(t *testing.T) {
20882088
{
20892089
sql: "explain analyze insert ignore into t6 values (1,1,10)",
20902090
plan: "Insert_.* root time:.* loops:.* prepare:.* check_insert.* fk_check:.*" +
2091-
"├─Foreign_Key_Check.* 0 root table:t5 total:0s, foreign_keys:1 foreign_key:fk_1, check_exist N/A N/A.*" +
2092-
"├─Foreign_Key_Check.* 0 root table:t5, index:idx2 total:0s, foreign_keys:1 foreign_key:fk_2, check_exist N/A N/A.*" +
2091+
"├─Foreign_Key_Check.* 0 root table:t5 total:.*, lock:.*, foreign_keys:1 foreign_key:fk_1, check_exist N/A N/A.*" +
2092+
"├─Foreign_Key_Check.* 0 root table:t5, index:idx2 total:.*, lock:.*, foreign_keys:1 foreign_key:fk_2, check_exist N/A N/A.*" +
20932093
"└─Foreign_Key_Check.* 0 root table:t5, index:idx3 total:0s, foreign_keys:1 foreign_key:fk_3, check_exist N/A N/A",
20942094
},
20952095
{

tests/realtikvtest/txntest/txn_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,3 +595,42 @@ func TestDMLWithAddForeignKey(t *testing.T) {
595595

596596
require.True(t, errDML != nil || errDDL != nil)
597597
}
598+
599+
func TestLockKeysInInsertIgnore(t *testing.T) {
600+
// FIXME: this test will fail in the unistore. After fixing the issue in unistore, please move
601+
// this test to `foreign_key_test.go`.
602+
603+
store := realtikvtest.CreateMockStoreAndSetup(t)
604+
tk := testkit.NewTestKit(t, store)
605+
tk.MustExec("use test")
606+
tk.MustExec("create table t1 (id int primary key);")
607+
tk.MustExec("create table t2 (id int primary key, foreign key fk (id) references t1(id));")
608+
tk.MustExec("insert into t1 values (1)")
609+
610+
tk.MustExec("BEGIN")
611+
tk.MustExec("INSERT IGNORE INTO t2 VALUES (1)")
612+
613+
var wg sync.WaitGroup
614+
var tk2CommitTime time.Time
615+
tk2StartTime := time.Now()
616+
wg.Add(1)
617+
go func() {
618+
defer wg.Done()
619+
620+
tk2 := testkit.NewTestKit(t, store)
621+
tk2.MustExec("use test")
622+
tk2.MustExec("BEGIN")
623+
require.NotNil(t, tk2.ExecToErr("UPDATE t1 SET id = 2 WHERE id = 1"))
624+
tk2.MustExec("COMMIT")
625+
tk2CommitTime = time.Now()
626+
}()
627+
628+
sleepDuration := 500 * time.Millisecond
629+
time.Sleep(sleepDuration)
630+
tk.MustExec("COMMIT")
631+
wg.Wait()
632+
633+
require.Greater(t, tk2CommitTime.Sub(tk2StartTime), sleepDuration)
634+
tk.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1"))
635+
tk.MustQuery("SELECT * FROM t2").Check(testkit.Rows("1"))
636+
}

0 commit comments

Comments
 (0)