Skip to content

Commit ed39273

Browse files
authored
statistics: handle deleted tables correctly in the PQ (#57649) (#57674)
close #57648
1 parent a8adedb commit ed39273

File tree

4 files changed

+66
-5
lines changed

4 files changed

+66
-5
lines changed

pkg/statistics/handle/autoanalyze/priorityqueue/analysis_job_factory.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob(
5757
tblInfo *model.TableInfo,
5858
tblStats *statistics.Table,
5959
) AnalysisJob {
60-
if !tblStats.IsEligibleForAnalysis() {
60+
if tblStats == nil || !tblStats.IsEligibleForAnalysis() {
6161
return nil
6262
}
6363

@@ -92,7 +92,7 @@ func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob(
9292
partitionID int64,
9393
partitionStats *statistics.Table,
9494
) AnalysisJob {
95-
if !partitionStats.IsEligibleForAnalysis() {
95+
if partitionStats == nil || !partitionStats.IsEligibleForAnalysis() {
9696
return nil
9797
}
9898

@@ -128,7 +128,7 @@ func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob(
128128
globalTblStats *statistics.Table,
129129
partitionStats map[PartitionIDAndName]*statistics.Table,
130130
) AnalysisJob {
131-
if !globalTblStats.IsEligibleForAnalysis() {
131+
if globalTblStats == nil || !globalTblStats.IsEligibleForAnalysis() {
132132
return nil
133133
}
134134

pkg/statistics/handle/autoanalyze/priorityqueue/queue.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,14 +644,16 @@ func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() {
644644
zap.Int64("tableID", job.GetTableID()),
645645
zap.String("job", job.String()),
646646
)
647-
// TODO: Remove this after handling the DDL event.
647+
// Delete the job from the queue since its table is missing. This is a safeguard -
648+
// DDL events should have already cleaned up jobs for dropped tables.
648649
err := pq.syncFields.inner.delete(job)
649650
if err != nil {
650651
statslogutil.StatsLogger().Error("Failed to delete job from priority queue",
651652
zap.Error(err),
652653
zap.String("job", job.String()),
653654
)
654655
}
656+
continue
655657
}
656658
indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats)
657659
job.SetIndicators(indicators)
@@ -752,11 +754,24 @@ func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) {
752754
job.RegisterSuccessHook(func(j AnalysisJob) {
753755
pq.syncFields.mu.Lock()
754756
defer pq.syncFields.mu.Unlock()
757+
// During owner switch, the priority queue is closed and its fields are reset to nil.
758+
// We allow running jobs to complete normally rather than stopping them, so this nil
759+
// check is expected when the job finishes after the switch.
760+
if pq.syncFields.runningJobs == nil {
761+
return
762+
}
755763
delete(pq.syncFields.runningJobs, j.GetTableID())
756764
})
757765
job.RegisterFailureHook(func(j AnalysisJob, needRetry bool) {
758766
pq.syncFields.mu.Lock()
759767
defer pq.syncFields.mu.Unlock()
768+
// During owner switch, the priority queue is closed and its fields are reset to nil.
769+
// We allow running jobs to complete normally rather than stopping them, so this nil check
770+
// is expected when jobs finish after the switch. Failed jobs will be handled by the next
771+
// initialization, so we can safely ignore them here.
772+
if pq.syncFields.runningJobs == nil || pq.syncFields.mustRetryJobs == nil {
773+
return
774+
}
760775
// Mark the job as failed and remove it from the running jobs.
761776
delete(pq.syncFields.runningJobs, j.GetTableID())
762777
if needRetry {

pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020
"time"
2121

22+
"github.com/pingcap/tidb/pkg/meta/model"
2223
pmodel "github.com/pingcap/tidb/pkg/parser/model"
2324
"github.com/pingcap/tidb/pkg/sessionctx"
2425
"github.com/pingcap/tidb/pkg/statistics"
@@ -608,3 +609,47 @@ func TestPQCanBeClosedAndReInitialized(t *testing.T) {
608609
// Check if the priority queue is initialized.
609610
require.True(t, pq.IsInitialized())
610611
}
612+
613+
func TestPQHandlesTableDeletionGracefully(t *testing.T) {
614+
store, dom := testkit.CreateMockStoreAndDomain(t)
615+
handle := dom.StatsHandle()
616+
617+
tk := testkit.NewTestKit(t, store)
618+
tk.MustExec("use test")
619+
tk.MustExec("create table t1 (a int)")
620+
tk.MustExec("insert into t1 values (1)")
621+
statistics.AutoAnalyzeMinCnt = 0
622+
defer func() {
623+
statistics.AutoAnalyzeMinCnt = 1000
624+
}()
625+
626+
ctx := context.Background()
627+
require.NoError(t, handle.DumpStatsDeltaToKV(true))
628+
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
629+
pq := priorityqueue.NewAnalysisPriorityQueue(handle)
630+
defer pq.Close()
631+
require.NoError(t, pq.Initialize())
632+
633+
// Check the priority queue is not empty.
634+
l, err := pq.Len()
635+
require.NoError(t, err)
636+
require.NotEqual(t, 0, l)
637+
638+
tbl, err := dom.InfoSchema().TableByName(ctx, pmodel.NewCIStr("test"), pmodel.NewCIStr("t1"))
639+
require.NoError(t, err)
640+
641+
// Drop the table and mock the table stats is removed from the cache.
642+
tk.MustExec("drop table t1")
643+
deleteEvent := findEvent(handle.DDLEventCh(), model.ActionDropTable)
644+
require.NotNil(t, deleteEvent)
645+
require.NoError(t, handle.HandleDDLEvent(deleteEvent))
646+
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
647+
648+
// Make sure handle.Get() returns false.
649+
_, ok := handle.Get(tbl.Meta().ID)
650+
require.False(t, ok)
651+
652+
require.NotPanics(t, func() {
653+
pq.RefreshLastAnalysisDuration()
654+
})
655+
}

pkg/statistics/handle/autoanalyze/refresher/refresher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ func (*Refresher) OnBecomeOwner() {
261261

262262
// OnRetireOwner is used to handle the event when the current TiDB instance retires from being the stats owner.
263263
func (r *Refresher) OnRetireOwner() {
264-
// Stop the worker and close the queue.
264+
// Theoretically we should stop the worker here, but stopping analysis jobs can be time-consuming.
265+
// To avoid blocking etcd leader re-election, we only close the priority queue.
265266
r.jobs.Close()
266267
}

0 commit comments

Comments
 (0)