Skip to content

Commit 2306250

Browse files
authored
*: add metadata lock when using the plan cache (#51897) (#52956)
close #51407
1 parent 681b43f commit 2306250

File tree

19 files changed

+242
-54
lines changed

19 files changed

+242
-54
lines changed

ddl/metadatalocktest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ go_test(
77
"mdl_test.go",
88
],
99
flaky = True,
10-
shard_count = 34,
10+
shard_count = 36,
1111
deps = [
1212
"//config",
1313
"//ddl",

ddl/metadatalocktest/mdl_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -872,6 +872,101 @@ func TestMDLPreparePlanCacheInvalid(t *testing.T) {
872872
tk.MustQuery(`execute stmt_test_1 using @a;`).Check(testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 <nil>"))
873873
}
874874

875+
func TestMDLPreparePlanCacheExecute(t *testing.T) {
876+
store, dom := testkit.CreateMockStoreAndDomain(t)
877+
878+
sv := server.CreateMockServer(t, store)
879+
880+
sv.SetDomain(dom)
881+
dom.InfoSyncer().SetSessionManager(sv)
882+
defer sv.Close()
883+
884+
conn1 := server.CreateMockConn(t, sv)
885+
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
886+
conn2 := server.CreateMockConn(t, sv)
887+
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
888+
tk.MustExec("use test")
889+
tk.MustExec("set global tidb_enable_metadata_lock=1")
890+
tk.MustExec("create table t(a int);")
891+
tk.MustExec("create table t2(a int);")
892+
tk.MustExec("insert into t values(1), (2), (3), (4);")
893+
894+
tk.MustExec(`prepare stmt_test_1 from 'update t set a = ? where a = ?';`)
895+
tk.MustExec(`set @a = 1, @b = 3;`)
896+
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
897+
898+
tk.MustExec("begin")
899+
900+
ch := make(chan struct{})
901+
902+
var wg sync.WaitGroup
903+
wg.Add(1)
904+
go func() {
905+
<-ch
906+
tkDDL.MustExec("alter table test.t add index idx(a);")
907+
wg.Done()
908+
}()
909+
910+
tk.MustQuery("select * from t2")
911+
tk.MustExec(`set @a = 2, @b=4;`)
912+
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
913+
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
914+
// The plan is from cache, the metadata lock should be added to block the DDL.
915+
ch <- struct{}{}
916+
917+
time.Sleep(5 * time.Second)
918+
919+
tk.MustExec("commit")
920+
921+
wg.Wait()
922+
923+
tk.MustExec("admin check table t")
924+
}
925+
926+
func TestMDLPreparePlanCacheExecute2(t *testing.T) {
927+
store, dom := testkit.CreateMockStoreAndDomain(t)
928+
929+
sv := server.CreateMockServer(t, store)
930+
931+
sv.SetDomain(dom)
932+
dom.InfoSyncer().SetSessionManager(sv)
933+
defer sv.Close()
934+
935+
conn1 := server.CreateMockConn(t, sv)
936+
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
937+
conn2 := server.CreateMockConn(t, sv)
938+
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
939+
tk.MustExec("use test")
940+
tk.MustExec("set global tidb_enable_metadata_lock=1")
941+
tk.MustExec("create table t(a int);")
942+
tk.MustExec("create table t2(a int);")
943+
tk.MustExec("insert into t values(1), (2), (3), (4);")
944+
945+
tk.MustExec(`prepare stmt_test_1 from 'select * from t where a = ?';`)
946+
tk.MustExec(`set @a = 1;`)
947+
tk.MustExec(`execute stmt_test_1 using @a;`)
948+
949+
tk.MustExec("begin")
950+
tk.MustQuery("select * from t2")
951+
952+
var wg sync.WaitGroup
953+
wg.Add(1)
954+
go func() {
955+
tkDDL.MustExec("alter table test.t add index idx(a);")
956+
wg.Done()
957+
}()
958+
959+
wg.Wait()
960+
961+
tk.MustExec(`set @a = 2;`)
962+
tk.MustExec(`execute stmt_test_1 using @a;`)
963+
// The plan should not be from cache because the schema has changed.
964+
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
965+
tk.MustExec("commit")
966+
967+
tk.MustExec("admin check table t")
968+
}
969+
875970
func TestMDLDisable2Enable(t *testing.T) {
876971
store, dom := testkit.CreateMockStoreAndDomain(t)
877972
sv := server.CreateMockServer(t, store)

domain/plan_replayer_dump.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
526526
return nil
527527
}
528528

529+
// extractTableNames extracts table names from the given stmts.
529530
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
530531
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
531532
tableExtractor := &tableNameExtractor{

executor/executor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
19851985
vars.DiskTracker.ResetMaxConsumed()
19861986
vars.MemTracker.SessionID.Store(vars.ConnectionID)
19871987
vars.StmtCtx.TableStats = make(map[int64]interface{})
1988+
sc.MDLRelatedTableIDs = make(map[int64]int64)
19881989

19891990
isAnalyze := false
19901991
if execStmt, ok := s.(*ast.ExecuteStmt); ok {

executor/prepared.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/pingcap/tidb/parser/mysql"
2727
plannercore "github.com/pingcap/tidb/planner/core"
2828
"github.com/pingcap/tidb/sessionctx"
29+
"github.com/pingcap/tidb/sessiontxn"
2930
"github.com/pingcap/tidb/types"
3031
"github.com/pingcap/tidb/util"
3132
"github.com/pingcap/tidb/util/chunk"
@@ -117,7 +118,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
117118
return err
118119
}
119120
}
120-
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.ctx, stmt0)
121+
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.ctx, stmt0, sessiontxn.GetTxnManager(e.ctx).GetTxnInfoSchema())
121122
if err != nil {
122123
return err
123124
}
@@ -211,7 +212,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
211212
if e.ctx.GetSessionVars().EnablePreparedPlanCache {
212213
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.ctx, preparedObj)
213214
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
214-
0, bindSQL)
215+
0, bindSQL, preparedObj.RelateVersion)
215216
if err != nil {
216217
return err
217218
}

executor/seqtest/seq_executor_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,8 +940,7 @@ func TestBatchInsertDelete(t *testing.T) {
940940
atomic.StoreUint64(&kv.TxnTotalSizeLimit, originLimit)
941941
}()
942942
// Set the limitation to a small value, make it easier to reach the limitation.
943-
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 5800)
944-
943+
atomic.StoreUint64(&kv.TxnTotalSizeLimit, 7000)
945944
tk := testkit.NewTestKit(t, store)
946945
tk.MustExec("use test")
947946
tk.MustExec("drop table if exists batch_insert")

meta/meta.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,8 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
828828
return errors.Trace(err)
829829
}
830830

831+
tableInfo.Revision++
832+
831833
data, err := json.Marshal(tableInfo)
832834
if err != nil {
833835
return errors.Trace(err)

parser/model/model.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,9 @@ type TableInfo struct {
567567
ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"`
568568

569569
TTLInfo *TTLInfo `json:"ttl_info"`
570+
571+
// Revision is per table schema's version, it will be increased when the schema changed.
572+
Revision uint64 `json:"revision"`
570573
}
571574

572575
// SepAutoInc decides whether _rowid and auto_increment id use separate allocator.

planner/core/plan_cache.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,33 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
7676
vars.PreparedParams = append(vars.PreparedParams, val)
7777
}
7878

79-
// step 3: check schema version
80-
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
79+
// step 3: add metadata lock and check each table's schema version
80+
schemaNotMatch := false
81+
for i := 0; i < len(stmt.dbName); i++ {
82+
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
83+
if !ok {
84+
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
85+
if err != nil {
86+
return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
87+
}
88+
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
89+
stmt.tbls[i] = tblByName
90+
stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
91+
}
92+
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], stmt.tbls[i], is)
93+
if err != nil {
94+
schemaNotMatch = true
95+
continue
96+
}
97+
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
98+
schemaNotMatch = true
99+
}
100+
stmt.tbls[i] = newTbl
101+
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
102+
}
103+
104+
// step 4: check schema version
105+
if schemaNotMatch || stmt.PreparedAst.SchemaVersion != is.SchemaMetaVersion() {
81106
// In order to avoid some correctness issues, we have to clear the
82107
// cached plan once the schema version is changed.
83108
// Cached plan in prepared struct does NOT have a "cache key" with
@@ -99,7 +124,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
99124
stmtAst.SchemaVersion = is.SchemaMetaVersion()
100125
}
101126

102-
// step 4: handle expiration
127+
// step 5: handle expiration
103128
// If the lastUpdateTime less than expiredTimeStamp4PC,
104129
// it means other sessions have executed 'admin flush instance plan_cache'.
105130
// So we need to clear the current session's plan cache.
@@ -110,6 +135,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
110135
stmtAst.CachedPlan = nil
111136
vars.LastUpdateTime4PC = expiredTimeStamp4PC
112137
}
138+
113139
return nil
114140
}
115141

@@ -151,7 +177,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
151177
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
152178
}
153179
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
154-
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL); err != nil {
180+
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, stmt.RelateVersion); err != nil {
155181
return nil, nil, err
156182
}
157183
}
@@ -293,7 +319,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isGeneralPlan
293319
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
294320
delete(sessVars.IsolationReadEngines, kv.TiFlash)
295321
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
296-
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL); err != nil {
322+
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, stmt.RelateVersion); err != nil {
297323
return nil, nil, err
298324
}
299325
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}

0 commit comments

Comments
 (0)