Skip to content

Commit a22fc59

Browse files
authored
statistics: add the refresher as a stats owner listener (#56998)
ref #55906
1 parent 2f8b11a commit a22fc59

File tree

9 files changed

+86
-17
lines changed

9 files changed

+86
-17
lines changed

pkg/domain/domain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2356,7 +2356,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
23562356
variable.EnableStatsOwner = do.enableStatsOwner
23572357
variable.DisableStatsOwner = do.disableStatsOwner
23582358
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
2359-
do.statsOwner.SetListener(do.ddlNotifier)
2359+
do.statsOwner.SetListener(owner.NewListenersWrapper(statsHandle, do.ddlNotifier))
23602360
do.wg.Run(func() {
23612361
do.indexUsageWorker()
23622362
}, "indexUsageWorker")

pkg/owner/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ go_test(
3737
],
3838
embed = [":owner"],
3939
flaky = True,
40-
shard_count = 9,
40+
shard_count = 10,
4141
deps = [
4242
"//pkg/ddl",
4343
"//pkg/infoschema",

pkg/owner/manager.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,3 +554,28 @@ func AcquireDistributedLock(
554554
}
555555
}, nil
556556
}
557+
558+
// ListenersWrapper is a list of listeners.
559+
// A way to broadcast events to multiple listeners.
560+
type ListenersWrapper struct {
561+
listeners []Listener
562+
}
563+
564+
// OnBecomeOwner broadcasts the OnBecomeOwner event to all listeners.
565+
func (ol *ListenersWrapper) OnBecomeOwner() {
566+
for _, l := range ol.listeners {
567+
l.OnBecomeOwner()
568+
}
569+
}
570+
571+
// OnRetireOwner broadcasts the OnRetireOwner event to all listeners.
572+
func (ol *ListenersWrapper) OnRetireOwner() {
573+
for _, l := range ol.listeners {
574+
l.OnRetireOwner()
575+
}
576+
}
577+
578+
// NewListenersWrapper creates a new OwnerListeners.
579+
func NewListenersWrapper(listeners ...Listener) *ListenersWrapper {
580+
return &ListenersWrapper{listeners: listeners}
581+
}

pkg/owner/manager_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,3 +560,19 @@ func TestAcquireDistributedLock(t *testing.T) {
560560
release2()
561561
})
562562
}
563+
564+
func TestListenersWrapper(t *testing.T) {
565+
lis1 := &listener{}
566+
lis2 := &listener{}
567+
wrapper := owner.NewListenersWrapper(lis1, lis2)
568+
569+
// Test OnBecomeOwner
570+
wrapper.OnBecomeOwner()
571+
require.True(t, lis1.val.Load())
572+
require.True(t, lis2.val.Load())
573+
574+
// Test OnRetireOwner
575+
wrapper.OnRetireOwner()
576+
require.False(t, lis1.val.Load())
577+
require.False(t, lis2.val.Load())
578+
}

pkg/statistics/handle/autoanalyze/autoanalyze.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,16 @@ func (sa *statsAnalyze) CleanupCorruptedAnalyzeJobsOnDeadInstances() error {
138138
}, statsutil.FlagWrapTxn)
139139
}
140140

141+
// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner.
142+
func (sa *statsAnalyze) OnBecomeOwner() {
143+
sa.refresher.OnBecomeOwner()
144+
}
145+
146+
// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
147+
func (sa *statsAnalyze) OnRetireOwner() {
148+
sa.refresher.OnRetireOwner()
149+
}
150+
141151
// SelectAnalyzeJobsOnCurrentInstanceSQL is the SQL to select the analyze jobs whose
142152
// state is `pending` or `running` and the update time is more than 10 minutes ago
143153
// and the instance is current instance.

pkg/statistics/handle/autoanalyze/priorityqueue/queue.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,20 @@ type pqHeap interface {
7272
//
7373
//nolint:fieldalignment
7474
type AnalysisPriorityQueue struct {
75+
ctx context.Context
7576
statsHandle statstypes.StatsHandle
7677
calculator *PriorityCalculator
7778

78-
ctx context.Context
79-
cancel context.CancelFunc
80-
wg util.WaitGroupWrapper
79+
wg util.WaitGroupWrapper
8180

8281
// syncFields is a substructure to hold fields protected by mu.
8382
syncFields struct {
8483
// mu is used to protect the following fields.
85-
mu sync.RWMutex
86-
inner pqHeap
84+
mu sync.RWMutex
85+
// Because the Initialize and Close functions can be called concurrently,
86+
// so we need to protect the cancel function to avoid data race.
87+
cancel context.CancelFunc
88+
inner pqHeap
8789
// runningJobs is a map to store the running jobs. Used to avoid duplicate jobs.
8890
runningJobs map[int64]struct{}
8991
// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
@@ -97,20 +99,11 @@ type AnalysisPriorityQueue struct {
9799

98100
// NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue2.
99101
func NewAnalysisPriorityQueue(handle statstypes.StatsHandle) *AnalysisPriorityQueue {
100-
ctx, cancel := context.WithCancel(context.Background())
101-
102102
queue := &AnalysisPriorityQueue{
103103
statsHandle: handle,
104104
calculator: NewPriorityCalculator(),
105-
ctx: ctx,
106-
cancel: cancel,
107105
}
108106

109-
queue.syncFields.mu.Lock()
110-
queue.syncFields.runningJobs = make(map[int64]struct{})
111-
queue.syncFields.failedJobs = make(map[int64]struct{})
112-
queue.syncFields.mu.Unlock()
113-
114107
return queue
115108
}
116109

@@ -144,6 +137,12 @@ func (pq *AnalysisPriorityQueue) Initialize() error {
144137
pq.Close()
145138
return errors.Trace(err)
146139
}
140+
141+
ctx, cancel := context.WithCancel(context.Background())
142+
pq.ctx = ctx
143+
pq.syncFields.cancel = cancel
144+
pq.syncFields.runningJobs = make(map[int64]struct{})
145+
pq.syncFields.failedJobs = make(map[int64]struct{})
147146
pq.syncFields.initialized = true
148147
pq.syncFields.mu.Unlock()
149148

@@ -813,6 +812,9 @@ func (pq *AnalysisPriorityQueue) Close() {
813812
return
814813
}
815814

816-
pq.cancel()
815+
// It is possible that the priority queue is not initialized.
816+
if pq.syncFields.cancel != nil {
817+
pq.syncFields.cancel()
818+
}
817819
pq.wg.Wait()
818820
}

pkg/statistics/handle/autoanalyze/refresher/refresher.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,15 @@ func (r *Refresher) Close() {
251251
r.jobs.Close()
252252
}
253253
}
254+
255+
// OnBecomeOwner is used to handle the event when the current TiDB instance becomes the stats owner.
256+
func (*Refresher) OnBecomeOwner() {
257+
// No action is taken when becoming the stats owner.
258+
// Initialization of the Refresher can fail, so operations are deferred until the first auto-analyze check.
259+
}
260+
261+
// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
262+
func (r *Refresher) OnRetireOwner() {
263+
// Stop the worker and close the queue.
264+
r.jobs.Close()
265+
}

pkg/statistics/handle/types/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"//pkg/ddl/notifier",
1010
"//pkg/infoschema",
1111
"//pkg/meta/model",
12+
"//pkg/owner",
1213
"//pkg/parser/ast",
1314
"//pkg/sessionctx",
1415
"//pkg/sessionctx/stmtctx",

pkg/statistics/handle/types/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/tidb/pkg/ddl/notifier"
2323
"github.com/pingcap/tidb/pkg/infoschema"
2424
"github.com/pingcap/tidb/pkg/meta/model"
25+
"github.com/pingcap/tidb/pkg/owner"
2526
"github.com/pingcap/tidb/pkg/parser/ast"
2627
"github.com/pingcap/tidb/pkg/sessionctx"
2728
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
@@ -119,6 +120,8 @@ type StatsHistory interface {
119120

120121
// StatsAnalyze is used to handle auto-analyze and manage analyze jobs.
121122
type StatsAnalyze interface {
123+
owner.Listener
124+
122125
// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
123126
InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error
124127

0 commit comments

Comments
 (0)