Skip to content

Commit dde1dcd

Browse files
authored
session: add indexes for mysql.analyze_jobs (#58134)
close #57996
1 parent 8a80a41 commit dde1dcd

File tree

6 files changed

+128
-8
lines changed

6 files changed

+128
-8
lines changed

br/pkg/restore/snap_client/systable_restore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
116116
//
117117
// The above variables are in the file br/pkg/restore/systable_restore.go
118118
func TestMonitorTheSystemTableIncremental(t *testing.T) {
119-
require.Equal(t, int64(239), session.CurrentBootstrapVersion)
119+
require.Equal(t, int64(240), session.CurrentBootstrapVersion)
120120
}

pkg/session/bootstrap.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,9 @@ const (
458458
instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the analyze job',
459459
process_id BIGINT(64) UNSIGNED comment 'ID of the process executing the analyze job',
460460
PRIMARY KEY (id),
461-
KEY (update_time)
461+
KEY (update_time),
462+
INDEX idx_schema_table_state (table_schema, table_name, state),
463+
INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)
462464
);`
463465
// CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock).
464466
CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks (
@@ -1203,11 +1205,15 @@ const (
12031205
// version 239
12041206
// add modify_params to tidb_global_task and tidb_global_task_history.
12051207
version239 = 239
1208+
1209+
// version 240
1210+
// Add indexes to mysql.analyze_jobs to speed up the query.
1211+
version240 = 240
12061212
)
12071213

12081214
// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
12091215
// please make sure this is the largest version
1210-
var currentBootstrapVersion int64 = version239
1216+
var currentBootstrapVersion int64 = version240
12111217

12121218
// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
12131219
var internalSQLTimeout = owner.ManagerSessionTTL + 15
@@ -1382,6 +1388,7 @@ var (
13821388
upgradeToVer217,
13831389
upgradeToVer218,
13841390
upgradeToVer239,
1391+
upgradeToVer240,
13851392
}
13861393
)
13871394

@@ -3287,6 +3294,24 @@ func upgradeToVer239(s sessiontypes.Session, ver int64) {
32873294
doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN modify_params json AFTER `error`;", infoschema.ErrColumnExists)
32883295
}
32893296

3297+
const (
3298+
// addAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state)
3299+
// columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='',
3300+
// even for non-partitioned tables. It is kept for potential future optimization where queries could use this
3301+
// simpler index directly for non-partitioned tables.
3302+
addAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)"
3303+
// addAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs
3304+
addAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)"
3305+
)
3306+
3307+
func upgradeToVer240(s sessiontypes.Session, ver int64) {
3308+
if ver >= version240 {
3309+
return
3310+
}
3311+
doReentrantDDL(s, addAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName)
3312+
doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName)
3313+
}
3314+
32903315
// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
32913316
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
32923317
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)

pkg/session/bootstrap_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2614,3 +2614,48 @@ func (mebd *mockEtcdBackend) EtcdAddrs() ([]string, error) {
26142614
func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil }
26152615

26162616
func (mebd *mockEtcdBackend) StartGCWorker() error { return nil }
2617+
2618+
func TestTiDBUpgradeToVer240(t *testing.T) {
2619+
ctx := context.Background()
2620+
store, dom := CreateStoreAndBootstrap(t)
2621+
defer func() { require.NoError(t, store.Close()) }()
2622+
2623+
ver239 := version239
2624+
seV239 := CreateSessionAndSetID(t, store)
2625+
txn, err := store.Begin()
2626+
require.NoError(t, err)
2627+
m := meta.NewMutator(txn)
2628+
err = m.FinishBootstrap(int64(ver239))
2629+
require.NoError(t, err)
2630+
revertVersionAndVariables(t, seV239, ver239)
2631+
err = txn.Commit(ctx)
2632+
require.NoError(t, err)
2633+
store.SetOption(StoreBootstrappedKey, nil)
2634+
2635+
// Check if the required indexes already exist in mysql.analyze_jobs (they are created by default in new clusters)
2636+
res := MustExecToRecodeSet(t, seV239, "show create table mysql.analyze_jobs")
2637+
chk := res.NewChunk(nil)
2638+
err = res.Next(ctx, chk)
2639+
require.NoError(t, err)
2640+
require.Equal(t, 1, chk.NumRows())
2641+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
2642+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
2643+
2644+
// Check that the indexes still exist after upgrading to the new version and that no errors occurred during the upgrade.
2645+
dom.Close()
2646+
domCurVer, err := BootstrapSession(store)
2647+
require.NoError(t, err)
2648+
defer domCurVer.Close()
2649+
seCurVer := CreateSessionAndSetID(t, store)
2650+
ver, err := getBootstrapVersion(seCurVer)
2651+
require.NoError(t, err)
2652+
require.Equal(t, currentBootstrapVersion, ver)
2653+
2654+
res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs")
2655+
chk = res.NewChunk(nil)
2656+
err = res.Next(ctx, chk)
2657+
require.NoError(t, err)
2658+
require.Equal(t, 1, chk.NumRows())
2659+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
2660+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
2661+
}

pkg/session/bootstraptest/bootstrap_upgrade_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
608608

609609
wg.Wait()
610610
// Make sure the second add index operation is successful.
611-
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID)
611+
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID)
612612
rows, err := execute(context.Background(), seLatestV, sql)
613613
require.NoError(t, err)
614614
require.GreaterOrEqual(t, len(rows), 2)
@@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
624624
idxFinishTS = runJob.BinlogInfo.FinishedTS
625625
} else {
626626
// The second add index op.
627-
// notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op.
628-
if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") {
627+
if strings.Contains(runJob.TableName, "upgrade_tbl") {
629628
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
630629
} else {
631630
// The upgrade DDL ops. These jobs' finishedTS must less than add index ops.

pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ package priorityqueue_test
1616

1717
import (
1818
"context"
19+
"strings"
1920
"testing"
2021

2122
"github.com/pingcap/tidb/pkg/parser/model"
2223
"github.com/pingcap/tidb/pkg/session"
2324
"github.com/pingcap/tidb/pkg/sessionctx"
2425
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
26+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
2527
"github.com/pingcap/tidb/pkg/testkit"
2628
"github.com/stretchr/testify/require"
2729
)
@@ -175,3 +177,50 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) {
175177
require.False(t, valid)
176178
require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason)
177179
}
180+
181+
func TestPerformanceOfValidateAndPrepare(t *testing.T) {
182+
store, dom := testkit.CreateMockStoreAndDomain(t)
183+
tk := testkit.NewTestKit(t, store)
184+
tk.MustExec(session.CreateAnalyzeJobs)
185+
tk.MustExec("create database example_schema")
186+
tk.MustExec("use example_schema")
187+
tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))")
188+
tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table"))
189+
require.NoError(t, err)
190+
job := &priorityqueue.DynamicPartitionedTableAnalysisJob{
191+
SchemaName: "example_schema",
192+
GlobalTableID: tableInfo.Meta().ID,
193+
PartitionIDs: map[int64]struct{}{
194+
113: {},
195+
114: {},
196+
},
197+
Weight: 2,
198+
}
199+
initJobs(tk)
200+
insertMultipleFinishedJobs(tk, "example_table", "p0")
201+
se := tk.Session()
202+
sctx := se.(sessionctx.Context)
203+
valid, failReason := job.ValidateAndPrepare(sctx)
204+
require.True(t, valid)
205+
require.Equal(t, "", failReason)
206+
207+
// Insert some failed jobs.
208+
// Just failed.
209+
now := tk.MustQuery("select now()").Rows()[0][0].(string)
210+
insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now)
211+
212+
// Execute LastFailedDurationQueryForPartition directly to check the query plan.
213+
tableSchema := job.SchemaName
214+
tableName := job.GlobalTableName
215+
partitionNames := []string{"p0", "p1"}
216+
217+
rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames)
218+
require.NoError(t, err)
219+
planRows := make([]string, 0, len(rows))
220+
for _, row := range rows {
221+
planRows = append(planRows, row.GetString(0))
222+
}
223+
plan := strings.Join(planRows, "\n")
224+
require.Contains(t, plan, "IndexJoin")
225+
require.Contains(t, plan, "IndexRangeScan")
226+
}

pkg/statistics/handle/autoanalyze/priorityqueue/interval.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = `
5959
LIMIT 1;
6060
`
6161

62+
// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition.
63+
// Exported for testing.
6264
// For multiple partitions, we only need to return the duration of the most recent failed analysis.
6365
// We pick the minimum duration of all failed analyses because we want to be conservative.
64-
const lastFailedDurationQueryForPartition = `
66+
const LastFailedDurationQueryForPartition = `
6567
SELECT
6668
MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration
6769
FROM (
@@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration(
128130
query = lastFailedDurationQueryForTable
129131
params = append(params, schema, tableName)
130132
} else {
131-
query = lastFailedDurationQueryForPartition
133+
query = LastFailedDurationQueryForPartition
132134
params = append(params, schema, tableName, partitionNames)
133135
}
134136

0 commit comments

Comments
 (0)