Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,19 @@ import (
"go.uber.org/zap"
)

type handler struct {
type subscriber struct {
statsCache types.StatsCache
}

// NewHandlerAndRegister creates a new handler and registers it to the DDL
// notifier.
func NewHandlerAndRegister(
// NewSubscriber creates a new subscriber.
func NewSubscriber(
statsCache types.StatsCache,
registry *notifier.DDLNotifier,
) {
h := handler{statsCache: statsCache}
registry.RegisterHandler(notifier.StatsMetaHandlerID, h.handle)
) *subscriber {
h := subscriber{statsCache: statsCache}
return &h
}

func (h handler) handle(
func (h subscriber) handle(
ctx context.Context,
sctx sessionctx.Context,
change *notifier.SchemaChangeEvent,
Expand Down Expand Up @@ -235,6 +233,19 @@ func (h handler) handle(
return errors.Trace(storage.UpdateStatsVersion(ctx, sctx))
case model.ActionAddIndex:
// No need to update the stats meta for the adding index event.
case model.ActionDropSchema:
miniDBInfo := change.GetDropSchemaInfo()
intest.Assert(miniDBInfo != nil)
for _, table := range miniDBInfo.Tables {
// Try best effort to update the stats meta version for gc.
if err := h.delayedDeleteStats4PhysicalID(ctx, sctx, table.ID); err != nil {
logutil.StatsLogger().Error(
"Failed to update stats meta version for gc",
zap.Int64("tableID", table.ID),
zap.Error(err),
)
}
}
default:
intest.Assert(false)
logutil.StatsLogger().Error("Unhandled schema change event",
Expand All @@ -243,7 +254,7 @@ func (h handler) handle(
return nil
}

func (h handler) insertStats4PhysicalID(
func (h subscriber) insertStats4PhysicalID(
ctx context.Context,
sctx sessionctx.Context,
info *model.TableInfo,
Expand All @@ -256,7 +267,7 @@ func (h handler) insertStats4PhysicalID(
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
}

func (h handler) recordHistoricalStatsMeta(
func (h subscriber) recordHistoricalStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
id int64,
Expand Down Expand Up @@ -287,7 +298,7 @@ func (h handler) recordHistoricalStatsMeta(
)
}

func (h handler) delayedDeleteStats4PhysicalID(
func (h subscriber) delayedDeleteStats4PhysicalID(
ctx context.Context,
sctx sessionctx.Context,
id int64,
Expand All @@ -299,7 +310,7 @@ func (h handler) delayedDeleteStats4PhysicalID(
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
}

func (h handler) insertStats4Col(
func (h subscriber) insertStats4Col(
ctx context.Context,
sctx sessionctx.Context,
physicalID int64,
Expand Down