Skip to content

Commit cfa52d0

Browse files
authored
statistics: add a priority queue API (#57385)
close #55063
1 parent 8382fdb commit cfa52d0

15 files changed

+241
-1
lines changed

pkg/server/handler/optimizor/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ go_test(
4343
"statistics_handler_test.go",
4444
],
4545
flaky = True,
46-
shard_count = 6,
46+
shard_count = 7,
4747
deps = [
4848
":optimizor",
4949
"//pkg/config",
@@ -56,6 +56,7 @@ go_test(
5656
"//pkg/server/internal/testutil",
5757
"//pkg/server/internal/util",
5858
"//pkg/session",
59+
"//pkg/statistics/handle/types",
5960
"//pkg/statistics/handle/util",
6061
"//pkg/store/mockstore/unistore",
6162
"//pkg/testkit",

pkg/server/handler/optimizor/statistics_handler.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,26 @@ func getSnapshotTableInfo(dom *domain.Domain, snapshot uint64, dbName, tblName s
144144
}
145145
return is.TableByName(context.Background(), model.NewCIStr(dbName), model.NewCIStr(tblName))
146146
}
147+
148+
// StatsPriorityQueueHandler is the handler for dumping the stats priority queue snapshot.
149+
type StatsPriorityQueueHandler struct {
150+
do *domain.Domain
151+
}
152+
153+
// NewStatsPriorityQueueHandler creates a new StatsPriorityQueueHandler.
154+
func NewStatsPriorityQueueHandler(do *domain.Domain) *StatsPriorityQueueHandler {
155+
return &StatsPriorityQueueHandler{do: do}
156+
}
157+
158+
// ServeHTTP dumps the stats priority queue snapshot to json.
159+
func (sh StatsPriorityQueueHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
160+
w.Header().Set("Content-Type", "application/json")
161+
162+
h := sh.do.StatsHandle()
163+
tables, err := h.GetPriorityQueueSnapshot()
164+
if err != nil {
165+
handler.WriteError(w, err)
166+
} else {
167+
handler.WriteData(w, tables)
168+
}
169+
}

pkg/server/handler/optimizor/statistics_handler_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pingcap/tidb/pkg/server/internal/testutil"
3434
"github.com/pingcap/tidb/pkg/server/internal/util"
3535
"github.com/pingcap/tidb/pkg/session"
36+
"github.com/pingcap/tidb/pkg/statistics/handle/types"
3637
util2 "github.com/pingcap/tidb/pkg/statistics/handle/util"
3738
"github.com/pingcap/tidb/pkg/testkit"
3839
"github.com/stretchr/testify/require"
@@ -289,3 +290,58 @@ func checkData(t *testing.T, path string, client *testserverclient.TestServerCli
289290
require.Equal(t, int64(4), count)
290291
require.NoError(t, rows.Close())
291292
}
293+
294+
func TestStatsPriorityQueueAPI(t *testing.T) {
295+
store := testkit.CreateMockStore(t)
296+
driver := server2.NewTiDBDriver(store)
297+
client := testserverclient.NewTestServerClient()
298+
cfg := util.NewTestConfig()
299+
cfg.Port = client.Port
300+
cfg.Status.StatusPort = client.StatusPort
301+
cfg.Status.ReportStatus = true
302+
cfg.Socket = fmt.Sprintf("/tmp/tidb-mock-%d.sock", time.Now().UnixNano())
303+
304+
server, err := server2.NewServer(cfg, driver)
305+
require.NoError(t, err)
306+
defer server.Close()
307+
308+
dom, err := session.GetDomain(store)
309+
require.NoError(t, err)
310+
server.SetDomain(dom)
311+
go func() {
312+
err := server.Run(nil)
313+
require.NoError(t, err)
314+
}()
315+
<-server2.RunInGoTestChan
316+
client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
317+
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
318+
client.WaitUntilServerOnline()
319+
320+
router := mux.NewRouter()
321+
handler := optimizor.NewStatsPriorityQueueHandler(dom)
322+
router.Handle("/stats/priority-queue", handler)
323+
324+
resp, err := client.FetchStatus("/stats/priority-queue")
325+
require.NoError(t, err)
326+
defer resp.Body.Close()
327+
328+
js, err := io.ReadAll(resp.Body)
329+
require.NoError(t, err)
330+
require.Equal(t, "priority queue not initialized", string(js))
331+
332+
// Init the queue.
333+
handle := dom.StatsHandle()
334+
require.False(t, handle.HandleAutoAnalyze())
335+
336+
resp, err = client.FetchStatus("/stats/priority-queue")
337+
require.NoError(t, err)
338+
defer resp.Body.Close()
339+
340+
js, err = io.ReadAll(resp.Body)
341+
require.NoError(t, err)
342+
var snapshot types.PriorityQueueSnapshot
343+
err = json.Unmarshal(js, &snapshot)
344+
require.NoError(t, err)
345+
require.Empty(t, snapshot.CurrentJobs)
346+
require.Empty(t, snapshot.MustRetryTables)
347+
}

pkg/server/http_status.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ func (s *Server) startHTTPServer() {
217217
Name("StatsDump")
218218
router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()).
219219
Name("StatsHistoryDump")
220+
router.Handle("/stats/priority-queue", s.newStatsPriorityQueueHandler()).
221+
Name("StatsPriorityQueue")
220222

221223
router.Handle("/plan_replayer/dump/{filename}", s.newPlanReplayerHandler()).Name("PlanReplayerDump")
222224
router.Handle("/extract_task/dump", s.newExtractServeHandler()).Name("ExtractTaskDump")
@@ -621,3 +623,17 @@ func (s *Server) newStatsHistoryHandler() *optimizor.StatsHistoryHandler {
621623
}
622624
return optimizor.NewStatsHistoryHandler(do)
623625
}
626+
627+
func (s *Server) newStatsPriorityQueueHandler() *optimizor.StatsPriorityQueueHandler {
628+
store, ok := s.driver.(*TiDBDriver)
629+
if !ok {
630+
panic("Illegal driver")
631+
}
632+
633+
do, err := session.GetDomain(store.store)
634+
if err != nil {
635+
panic("Failed to get domain")
636+
}
637+
638+
return optimizor.NewStatsPriorityQueueHandler(do)
639+
}

pkg/statistics/handle/autoanalyze/autoanalyze.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,11 @@ func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalID
317317
return statistics.CheckAnalyzeVerOnTable(tbl, version)
318318
}
319319

320+
// GetPriorityQueueSnapshot returns the stats priority queue snapshot.
321+
func (sa *statsAnalyze) GetPriorityQueueSnapshot() (statstypes.PriorityQueueSnapshot, error) {
322+
return sa.refresher.GetPriorityQueueSnapshot()
323+
}
324+
320325
func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
321326
defer func() {
322327
if r := recover(); r != nil {

pkg/statistics/handle/autoanalyze/priorityqueue/calculatoranalysis/calculator_analysis_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,7 @@ func (j *TestJob) SetIndicators(indicators priorityqueue.Indicators) {
289289
func (j *TestJob) HasNewlyAddedIndex() bool {
290290
return false
291291
}
292+
293+
func (j *TestJob) AsJSON() types.AnalysisJobJSON {
294+
panic("unimplemented")
295+
}

pkg/statistics/handle/autoanalyze/priorityqueue/dynamic_partitioned_table_analysis_job.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,3 +364,20 @@ func getPartitionNames(partitionIndexes map[string][]string) []string {
364364
}
365365
return names
366366
}
367+
368+
// AsJSON converts the job to a JSON object.
369+
func (j *DynamicPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
370+
partitionIDs := make([]int64, 0, len(j.PartitionIDs))
371+
for partition := range j.PartitionIDs {
372+
partitionIDs = append(partitionIDs, partition)
373+
}
374+
return statstypes.AnalysisJobJSON{
375+
Type: string(j.getAnalyzeType()),
376+
TableID: j.GlobalTableID,
377+
PartitionIDs: partitionIDs,
378+
PartitionIndexIDs: j.PartitionIndexIDs,
379+
Weight: j.Weight,
380+
Indicators: asJSONIndicators(j.Indicators),
381+
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
382+
}
383+
}

pkg/statistics/handle/autoanalyze/priorityqueue/heap_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func (t testHeapObject) RegisterSuccessHook(hook SuccessJobHook) {
6767
func (t testHeapObject) RegisterFailureHook(hook FailureJobHook) {
6868
panic("implement me")
6969
}
70+
func (t testHeapObject) AsJSON() statstypes.AnalysisJobJSON {
71+
panic("implement me")
72+
}
7073
func (t testHeapObject) String() string {
7174
panic("implement me")
7275
}

pkg/statistics/handle/autoanalyze/priorityqueue/job.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ type AnalysisJob interface {
9191
// RegisterFailureHook registers a failureHook function that will be called after the job is marked as failed.
9292
RegisterFailureHook(hook FailureJobHook)
9393

94+
// AsJSON converts the job to a JSON object.
95+
AsJSON() statstypes.AnalysisJobJSON
96+
9497
fmt.Stringer
9598
}
9699

@@ -186,3 +189,12 @@ func IsDynamicPartitionedTableAnalysisJob(job AnalysisJob) bool {
186189
_, ok := job.(*DynamicPartitionedTableAnalysisJob)
187190
return ok
188191
}
192+
193+
// asJSONIndicators converts the indicators to a JSON object.
194+
func asJSONIndicators(indicators Indicators) statstypes.IndicatorsJSON {
195+
return statstypes.IndicatorsJSON{
196+
ChangePercentage: fmt.Sprintf("%.2f%%", indicators.ChangePercentage*100),
197+
TableSize: fmt.Sprintf("%.2f", indicators.TableSize),
198+
LastAnalysisDuration: fmt.Sprintf("%v", indicators.LastAnalysisDuration),
199+
}
200+
}

pkg/statistics/handle/autoanalyze/priorityqueue/non_partitioned_table_analysis_job.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,19 @@ func (j *NonPartitionedTableAnalysisJob) GenSQLForAnalyzeIndex(index string) (st
270270

271271
return sql, params
272272
}
273+
274+
// AsJSON converts the job to a JSON object.
275+
func (j *NonPartitionedTableAnalysisJob) AsJSON() statstypes.AnalysisJobJSON {
276+
indexes := make([]int64, 0, len(j.IndexIDs))
277+
for index := range j.IndexIDs {
278+
indexes = append(indexes, index)
279+
}
280+
return statstypes.AnalysisJobJSON{
281+
Type: string(j.getAnalyzeType()),
282+
TableID: j.TableID,
283+
IndexIDs: indexes,
284+
Weight: j.Weight,
285+
Indicators: asJSONIndicators(j.Indicators),
286+
HasNewlyAddedIndex: j.HasNewlyAddedIndex(),
287+
}
288+
}

0 commit comments

Comments
 (0)