Skip to content

Commit f0db14c

Browse files
authored
session: add indexes for mysql.analyze_jobs (#58134) (#58355)
close #57996
1 parent 194aa51 commit f0db14c

File tree

7 files changed

+130
-10
lines changed

7 files changed

+130
-10
lines changed

br/pkg/restore/snap_client/systable_restore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
116116
//
117117
// The above variables are in the file br/pkg/restore/systable_restore.go
118118
func TestMonitorTheSystemTableIncremental(t *testing.T) {
119-
require.Equal(t, int64(218), session.CurrentBootstrapVersion)
119+
require.Equal(t, int64(219), session.CurrentBootstrapVersion)
120120
}

pkg/executor/infoschema_reader_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ func TestIndexUsageTable(t *testing.T) {
668668
testkit.RowsWithSep("|",
669669
"test|idt2|idx_4"))
670670
tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check(
671-
testkit.RowsWithSep("|", "78"))
671+
testkit.RowsWithSep("|", "80"))
672672

673673
tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage
674674
where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows())

pkg/session/bootstrap.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,9 @@ const (
458458
instance VARCHAR(512) NOT NULL comment 'address of the TiDB instance executing the analyze job',
459459
process_id BIGINT(64) UNSIGNED comment 'ID of the process executing the analyze job',
460460
PRIMARY KEY (id),
461-
KEY (update_time)
461+
KEY (update_time),
462+
INDEX idx_schema_table_state (table_schema, table_name, state),
463+
INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)
462464
);`
463465
// CreateAdvisoryLocks stores the advisory locks (get_lock, release_lock).
464466
CreateAdvisoryLocks = `CREATE TABLE IF NOT EXISTS mysql.advisory_locks (
@@ -1192,16 +1194,20 @@ const (
11921194
// enable fast_create_table on default
11931195
version218 = 218
11941196

1197+
// version 219
1198+
// add modify_params to tidb_global_task and tidb_global_task_history.
1199+
version219 = 219
1200+
11951201
// ...
1196-
// [version219, version238] is the version range reserved for patches of 8.5.x
1202+
// [version220, version238] is the version range reserved for patches of 8.5.x
11971203
// ...
11981204

11991205
// next version should start with 239
12001206
)
12011207

12021208
// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
12031209
// please make sure this is the largest version
1204-
var currentBootstrapVersion int64 = version218
1210+
var currentBootstrapVersion int64 = version219
12051211

12061212
// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
12071213
var internalSQLTimeout = owner.ManagerSessionTTL + 15
@@ -1375,6 +1381,7 @@ var (
13751381
upgradeToVer216,
13761382
upgradeToVer217,
13771383
upgradeToVer218,
1384+
upgradeToVer219,
13781385
}
13791386
)
13801387

@@ -3273,6 +3280,24 @@ func upgradeToVer218(_ sessiontypes.Session, ver int64) {
32733280
// empty, just make lint happy.
32743281
}
32753282

3283+
const (
3284+
// addAnalyzeJobsSchemaTableStateIndex is a DDL statement that adds an index on (table_schema, table_name, state)
3285+
// columns to mysql.analyze_jobs table. This index is currently unused since queries filter on partition_name='',
3286+
// even for non-partitioned tables. It is kept for potential future optimization where queries could use this
3287+
// simpler index directly for non-partitioned tables.
3288+
addAnalyzeJobsSchemaTableStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_state (table_schema, table_name, state)"
3289+
// addAnalyzeJobsSchemaTablePartitionStateIndex adds an index on (table_schema, table_name, partition_name, state) to mysql.analyze_jobs
3290+
addAnalyzeJobsSchemaTablePartitionStateIndex = "ALTER TABLE mysql.analyze_jobs ADD INDEX idx_schema_table_partition_state (table_schema, table_name, partition_name, state)"
3291+
)
3292+
3293+
func upgradeToVer219(s sessiontypes.Session, ver int64) {
3294+
if ver >= version219 {
3295+
return
3296+
}
3297+
doReentrantDDL(s, addAnalyzeJobsSchemaTableStateIndex, dbterror.ErrDupKeyName)
3298+
doReentrantDDL(s, addAnalyzeJobsSchemaTablePartitionStateIndex, dbterror.ErrDupKeyName)
3299+
}
3300+
32763301
// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
32773302
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
32783303
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)

pkg/session/bootstrap_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2524,3 +2524,48 @@ func TestIndexJoinMultiPatternByUpgrade650To840(t *testing.T) {
25242524
require.Equal(t, 1, row.Len())
25252525
require.Equal(t, int64(0), row.GetInt64(0))
25262526
}
2527+
2528+
func TestTiDBUpgradeToVer219(t *testing.T) {
2529+
ctx := context.Background()
2530+
store, dom := CreateStoreAndBootstrap(t)
2531+
defer func() { require.NoError(t, store.Close()) }()
2532+
2533+
ver218 := version218
2534+
seV218 := CreateSessionAndSetID(t, store)
2535+
txn, err := store.Begin()
2536+
require.NoError(t, err)
2537+
m := meta.NewMutator(txn)
2538+
err = m.FinishBootstrap(int64(ver218))
2539+
require.NoError(t, err)
2540+
revertVersionAndVariables(t, seV218, ver218)
2541+
err = txn.Commit(ctx)
2542+
require.NoError(t, err)
2543+
unsetStoreBootstrapped(store.UUID())
2544+
2545+
// Check if the required indexes already exist in mysql.analyze_jobs (they are created by default in new clusters)
2546+
res := MustExecToRecodeSet(t, seV218, "show create table mysql.analyze_jobs")
2547+
chk := res.NewChunk(nil)
2548+
err = res.Next(ctx, chk)
2549+
require.NoError(t, err)
2550+
require.Equal(t, 1, chk.NumRows())
2551+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
2552+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
2553+
2554+
// Check that the indexes still exist after upgrading to the new version and that no errors occurred during the upgrade.
2555+
dom.Close()
2556+
domCurVer, err := BootstrapSession(store)
2557+
require.NoError(t, err)
2558+
defer domCurVer.Close()
2559+
seCurVer := CreateSessionAndSetID(t, store)
2560+
ver, err := getBootstrapVersion(seCurVer)
2561+
require.NoError(t, err)
2562+
require.Equal(t, currentBootstrapVersion, ver)
2563+
2564+
res = MustExecToRecodeSet(t, seCurVer, "show create table mysql.analyze_jobs")
2565+
chk = res.NewChunk(nil)
2566+
err = res.Next(ctx, chk)
2567+
require.NoError(t, err)
2568+
require.Equal(t, 1, chk.NumRows())
2569+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_state")
2570+
require.Contains(t, string(chk.GetRow(0).GetBytes(1)), "idx_schema_table_partition_state")
2571+
}

pkg/session/bootstraptest/bootstrap_upgrade_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
608608

609609
wg.Wait()
610610
// Make sure the second add index operation is successful.
611-
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d order by job_id", jobID)
611+
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_history where job_id >=%d and table_name in ('upgrade_tbl', 'upgrade_tbl1') order by job_id", jobID)
612612
rows, err := execute(context.Background(), seLatestV, sql)
613613
require.NoError(t, err)
614614
require.GreaterOrEqual(t, len(rows), 2)
@@ -624,8 +624,7 @@ func TestUpgradeVersionForResumeJob(t *testing.T) {
624624
idxFinishTS = runJob.BinlogInfo.FinishedTS
625625
} else {
626626
// The second add index op.
627-
// notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op.
628-
if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") {
627+
if strings.Contains(runJob.TableName, "upgrade_tbl") {
629628
require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS)
630629
} else {
631630
// The upgrade DDL ops. These jobs' finishedTS must less than add index ops.

pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ package priorityqueue_test
1616

1717
import (
1818
"context"
19+
"strings"
1920
"testing"
2021

2122
"github.com/pingcap/tidb/pkg/parser/model"
2223
"github.com/pingcap/tidb/pkg/session"
2324
"github.com/pingcap/tidb/pkg/sessionctx"
2425
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
26+
"github.com/pingcap/tidb/pkg/statistics/handle/util"
2527
"github.com/pingcap/tidb/pkg/testkit"
2628
"github.com/stretchr/testify/require"
2729
)
@@ -175,3 +177,50 @@ func TestValidateAndPrepareForDynamicPartitionedTable(t *testing.T) {
175177
require.False(t, valid)
176178
require.Equal(t, "last failed analysis duration is less than 2 times the average analysis duration", failReason)
177179
}
180+
181+
func TestPerformanceOfValidateAndPrepare(t *testing.T) {
182+
store, dom := testkit.CreateMockStoreAndDomain(t)
183+
tk := testkit.NewTestKit(t, store)
184+
tk.MustExec(session.CreateAnalyzeJobs)
185+
tk.MustExec("create database example_schema")
186+
tk.MustExec("use example_schema")
187+
tk.MustExec("create table example_table (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))")
188+
tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("example_schema"), model.NewCIStr("example_table"))
189+
require.NoError(t, err)
190+
job := &priorityqueue.DynamicPartitionedTableAnalysisJob{
191+
SchemaName: "example_schema",
192+
GlobalTableID: tableInfo.Meta().ID,
193+
PartitionIDs: map[int64]struct{}{
194+
113: {},
195+
114: {},
196+
},
197+
Weight: 2,
198+
}
199+
initJobs(tk)
200+
insertMultipleFinishedJobs(tk, "example_table", "p0")
201+
se := tk.Session()
202+
sctx := se.(sessionctx.Context)
203+
valid, failReason := job.ValidateAndPrepare(sctx)
204+
require.True(t, valid)
205+
require.Equal(t, "", failReason)
206+
207+
// Insert some failed jobs.
208+
// Just failed.
209+
now := tk.MustQuery("select now()").Rows()[0][0].(string)
210+
insertFailedJobWithStartTime(tk, job.SchemaName, job.GlobalTableName, "p0", now)
211+
212+
// Execute LastFailedDurationQueryForPartition directly to check the query plan.
213+
tableSchema := job.SchemaName
214+
tableName := job.GlobalTableName
215+
partitionNames := []string{"p0", "p1"}
216+
217+
rows, _, err := util.ExecRows(sctx, "explain format='brief' "+priorityqueue.LastFailedDurationQueryForPartition, tableSchema, tableName, partitionNames)
218+
require.NoError(t, err)
219+
planRows := make([]string, 0, len(rows))
220+
for _, row := range rows {
221+
planRows = append(planRows, row.GetString(0))
222+
}
223+
plan := strings.Join(planRows, "\n")
224+
require.Contains(t, plan, "IndexJoin")
225+
require.Contains(t, plan, "IndexRangeScan")
226+
}

pkg/statistics/handle/autoanalyze/priorityqueue/interval.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ const lastFailedDurationQueryForTable = `
5959
LIMIT 1;
6060
`
6161

62+
// LastFailedDurationQueryForPartition is used to get the duration of the last failed analysis for each specified partition.
63+
// Exported for testing.
6264
// For multiple partitions, we only need to return the duration of the most recent failed analysis.
6365
// We pick the minimum duration of all failed analyses because we want to be conservative.
64-
const lastFailedDurationQueryForPartition = `
66+
const LastFailedDurationQueryForPartition = `
6567
SELECT
6668
MIN(TIMESTAMPDIFF(SECOND, aj.start_time, CURRENT_TIMESTAMP)) AS min_duration
6769
FROM (
@@ -128,7 +130,7 @@ func GetLastFailedAnalysisDuration(
128130
query = lastFailedDurationQueryForTable
129131
params = append(params, schema, tableName)
130132
} else {
131-
query = lastFailedDurationQueryForPartition
133+
query = LastFailedDurationQueryForPartition
132134
params = append(params, schema, tableName, partitionNames)
133135
}
134136

0 commit comments

Comments
 (0)