From 009fbf21c5c3d3f1a6be6ff67c119eea1153d4b4 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Sat, 23 Nov 2024 16:34:59 +0800 Subject: [PATCH 1/4] statistics: handle deleted tables correctly in the PQ Signed-off-by: Rustin170506 --- .../priorityqueue/analysis_job_factory.go | 9 ++++ .../handle/autoanalyze/priorityqueue/queue.go | 4 +- .../autoanalyze/priorityqueue/queue_test.go | 45 +++++++++++++++++++ .../handle/autoanalyze/refresher/refresher.go | 3 ++ 4 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go index fd52909eff1fe..30bd0b6097d09 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go @@ -57,6 +57,9 @@ func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob( tblInfo *model.TableInfo, tblStats *statistics.Table, ) AnalysisJob { + if tblStats == nil { + return nil + } if !tblStats.IsEligibleForAnalysis() { return nil } @@ -92,6 +95,9 @@ func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob( partitionID int64, partitionStats *statistics.Table, ) AnalysisJob { + if partitionStats == nil { + return nil + } if !partitionStats.IsEligibleForAnalysis() { return nil } @@ -128,6 +134,9 @@ func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob( globalTblStats *statistics.Table, partitionStats map[PartitionIDAndName]*statistics.Table, ) AnalysisJob { + if globalTblStats == nil { + return nil + } if !globalTblStats.IsEligibleForAnalysis() { return nil } diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index a64ef48933669..e6e0433fb59a8 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -644,7 +644,8 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { zap.Int64("tableID", job.GetTableID()), zap.String("job", job.String()), ) - // TODO: Remove this after handling the DDL event. + // Delete the job from the queue since its table is missing. This is a safeguard - + // DDL events should have already cleaned up jobs for dropped tables. err := pq.syncFields.inner.delete(job) if err != nil { statslogutil.StatsLogger().Error("Failed to delete job from priority queue", @@ -652,6 +653,7 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() { zap.String("job", job.String()), ) } + continue } indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats) job.SetIndicators(indicators) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index 29f8f0b6b7f34..0e32590ff6aa3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" @@ -608,3 +609,47 @@ func TestPQCanBeClosedAndReInitialized(t *testing.T) { // Check if the priority queue is initialized. require.True(t, pq.IsInitialized()) } + +func TestPQHandlesTableDeletionGracefully(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int)") + tk.MustExec("insert into t1 values (1)") + statistics.AutoAnalyzeMinCnt = 0 + defer func() { + statistics.AutoAnalyzeMinCnt = 1000 + }() + + ctx := context.Background() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + pq := priorityqueue.NewAnalysisPriorityQueue(handle) + defer pq.Close() + require.NoError(t, pq.Initialize()) + + // Check the priority queue is not empty. + l, err := pq.Len() + require.NoError(t, err) + require.NotEqual(t, 0, l) + + tbl, err := dom.InfoSchema().TableByName(ctx, pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) + require.NoError(t, err) + + // Drop the table and mock the table stats is removed from the cache. + tk.MustExec("drop table t1") + deleteEvent := findEvent(handle.DDLEventCh(), model.ActionDropTable) + require.NotNil(t, deleteEvent) + require.NoError(t, handle.HandleDDLEvent(deleteEvent)) + require.NoError(t, handle.Update(ctx, dom.InfoSchema())) + + // Make sure handle.Get() returns false. + _, ok := handle.Get(tbl.Meta().ID) + require.False(t, ok) + + require.NotPanics(t, func() { + pq.RefreshLastAnalysisDuration() + }) +} diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 4bbb0d975a665..bf2e95a823e3b 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -262,5 +262,8 @@ func (*Refresher) OnBecomeOwner() { // OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner. func (r *Refresher) OnRetireOwner() { // Stop the worker and close the queue. + // Note: we have to guarantee that the worker is stopped before closing the queue. + // Otherwise, the worker may still access the queue after it is closed. + r.worker.Stop() r.jobs.Close() } From 1a7bc2fd41bb755167e1953be4e0dc03d418d8b7 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Sat, 23 Nov 2024 16:46:07 +0800 Subject: [PATCH 2/4] fix: do not block etcd Signed-off-by: Rustin170506 --- pkg/statistics/handle/autoanalyze/priorityqueue/queue.go | 6 +++--- pkg/statistics/handle/autoanalyze/refresher/refresher.go | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index e6e0433fb59a8..583b2ff11307c 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -854,9 +854,9 @@ func (pq *AnalysisPriorityQueue) Close() { pq.syncFields.initialized = false // The rest fields will be reset when the priority queue is initialized. // But we do it here for double safety. - pq.syncFields.inner = nil - pq.syncFields.runningJobs = nil - pq.syncFields.mustRetryJobs = nil + pq.syncFields.inner = newHeap() + pq.syncFields.runningJobs = make(map[int64]struct{}) + pq.syncFields.mustRetryJobs = make(map[int64]struct{}) pq.syncFields.lastDMLUpdateFetchTimestamp = 0 pq.syncFields.cancel = nil } diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index bf2e95a823e3b..06cf3ca0b36a9 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -261,9 +261,7 @@ func (*Refresher) OnBecomeOwner() { // OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner. func (r *Refresher) OnRetireOwner() { - // Stop the worker and close the queue. - // Note: we have to guarantee that the worker is stopped before closing the queue. - // Otherwise, the worker may still access the queue after it is closed. - r.worker.Stop() + // Theoretically we should stop the worker here, but stopping analysis jobs can be time-consuming. + // To avoid blocking etcd leader re-election, we only close the priority queue. r.jobs.Close() } From 51eef0defbcae252205fbc27a334603ea0f24d9b Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Sat, 23 Nov 2024 16:50:30 +0800 Subject: [PATCH 3/4] refactor: better code Signed-off-by: Rustin170506 --- .../priorityqueue/analysis_job_factory.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go index 30bd0b6097d09..3a20597c02b91 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go @@ -57,10 +57,7 @@ func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob( tblInfo *model.TableInfo, tblStats *statistics.Table, ) AnalysisJob { - if tblStats == nil { - return nil - } - if !tblStats.IsEligibleForAnalysis() { + if tblStats == nil || !tblStats.IsEligibleForAnalysis() { return nil } @@ -95,10 +92,7 @@ func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob( partitionID int64, partitionStats *statistics.Table, ) AnalysisJob { - if partitionStats == nil { - return nil - } - if !partitionStats.IsEligibleForAnalysis() { + if partitionStats == nil || !partitionStats.IsEligibleForAnalysis() { return nil } @@ -134,10 +128,7 @@ func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob( globalTblStats *statistics.Table, partitionStats map[PartitionIDAndName]*statistics.Table, ) AnalysisJob { - if globalTblStats == nil { - return nil - } - if !globalTblStats.IsEligibleForAnalysis() { + if globalTblStats == nil || !globalTblStats.IsEligibleForAnalysis() { return nil } From fda8e188e8e61eb9883d2a801796563f4f9e4707 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Sat, 23 Nov 2024 17:07:20 +0800 Subject: [PATCH 4/4] refactor: better way to fix the issue Signed-off-by: Rustin170506 --- .../handle/autoanalyze/priorityqueue/queue.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go index 583b2ff11307c..39b75ed7af376 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue.go @@ -754,11 +754,24 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) { job.RegisterSuccessHook(func(j AnalysisJob) { pq.syncFields.mu.Lock() defer pq.syncFields.mu.Unlock() + // During owner switch, the priority queue is closed and its fields are reset to nil. + // We allow running jobs to complete normally rather than stopping them, so this nil + // check is expected when the job finishes after the switch. + if pq.syncFields.runningJobs == nil { + return + } delete(pq.syncFields.runningJobs, j.GetTableID()) }) job.RegisterFailureHook(func(j AnalysisJob, needRetry bool) { pq.syncFields.mu.Lock() defer pq.syncFields.mu.Unlock() + // During owner switch, the priority queue is closed and its fields are reset to nil. + // We allow running jobs to complete normally rather than stopping them, so this nil check + // is expected when jobs finish after the switch. Failed jobs will be handled by the next + // initialization, so we can safely ignore them here. + if pq.syncFields.runningJobs == nil || pq.syncFields.mustRetryJobs == nil { + return + } // Mark the job as failed and remove it from the running jobs. delete(pq.syncFields.runningJobs, j.GetTableID()) if needRetry { @@ -854,9 +867,9 @@ func (pq *AnalysisPriorityQueue) Close() { pq.syncFields.initialized = false // The rest fields will be reset when the priority queue is initialized. // But we do it here for double safety. - pq.syncFields.inner = newHeap() - pq.syncFields.runningJobs = make(map[int64]struct{}) - pq.syncFields.mustRetryJobs = make(map[int64]struct{}) + pq.syncFields.inner = nil + pq.syncFields.runningJobs = nil + pq.syncFields.mustRetryJobs = nil pq.syncFields.lastDMLUpdateFetchTimestamp = 0 pq.syncFields.cancel = nil }