Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2062,6 +2062,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if tblInfo.TiFlashReplica != nil {
removeTiFlashAvailablePartitionIDs(tblInfo, physicalTableIDs)
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []interface{}{physicalTableIDs}
Expand All @@ -2071,7 +2072,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
}
job.SchemaState = model.StateNone
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: tblInfo.Partition.Definitions}})
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: droppedDefs}})
// A background job will be created to delete old partition data.
job.Args = []interface{}{physicalTableIDs}
default:
Expand Down
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,11 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
startKey := tablecodec.EncodeTablePrefix(job.TableID)
job.Args = append(job.Args, startKey, oldIDs, ruleIDs)
if tblInfo.IsSequence() {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropSequence, TableInfo: tblInfo})
} else if !tblInfo.IsView() {
asyncNotifyEvent(d, &util.Event{Tp: model.ActionDropTable, TableInfo: tblInfo})
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
}
Expand Down
24 changes: 23 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ const (
// The variable name in mysql.tidb table and it records the default value of
// oom-action when upgrade from v3.0.x to v4.0.11+.
tidbDefOOMAction = "default_oom_action"

tiDBStatsGCLastTS = "tidb_stats_gc_last_ts"
tiDBStatsGCLastTSComment = "the previous gc timestamp for statistics"
// Const for TiDB server version 2.
version2 = 2
version3 = 3
Expand Down Expand Up @@ -975,11 +978,15 @@ const (
version172 = 172
// version 173 add column `summary` to `mysql.tidb_background_subtask`.
version173 = 173
// version 174
// add new `variable tidb_analyze_last_gc_point` to mysql.tidb
// used for reduce the pressure the statistics's GC jobs.
version174 = 174
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version173
var currentBootstrapVersion int64 = version174

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1121,6 +1128,7 @@ var (
upgradeToVer171,
upgradeToVer172,
upgradeToVer173,
upgradeToVer174,
}
)

Expand Down Expand Up @@ -2721,6 +2729,18 @@ func upgradeToVer173(s Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask ADD COLUMN `summary` JSON", infoschema.ErrColumnExists)
}

func upgradeToVer174(s Session, ver int64) {
if ver >= version174 {
return
}
writeStatsGCLastPos(s)
}

func writeStatsGCLastPos(s Session) {
mustExecute(s, "INSERT HIGH_PRIORITY INTO %n.%n VALUES(%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE = %?",
mysql.SystemDB, mysql.TiDBTable, tiDBStatsGCLastTS, 0, tiDBStatsGCLastTSComment, 0)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -2974,6 +2994,8 @@ func doDMLWorks(s Session) {

writeStmtSummaryVars(s)

writeStatsGCLastPos(s)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
_, err := s.ExecuteInternal(ctx, "COMMIT")
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
}
case model.ActionDropTable:
ids := h.getInitStateTableIDs(t.TableInfo)
for _, id := range ids {
if err := h.resetTableStats2KVForDrop(id); err != nil {
return err
}
}
case model.ActionAddColumn, model.ActionModifyColumn:
ids := h.getInitStateTableIDs(t.TableInfo)
for _, id := range ids {
Expand All @@ -63,6 +70,11 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
}
for _, def := range t.PartInfo.Definitions {
if err := h.resetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionReorganizePartition:
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
Expand Down Expand Up @@ -320,6 +332,36 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
return nil
}

// resetTableStats2KV resets the count to 0.
func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.recordHistoricalStatsMeta(physicalID, statsVer, StatsMetaHistorySourceSchemaChange)
}
}()
h.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the h.session.pool to avoid the lock the handle?

defer h.mu.Unlock()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
if _, err := exec.ExecuteInternal(ctx, "update mysql.stats_meta set count=0, modify_count=0, version=%? where table_id =%?", startTS, physicalID); err != nil {
return err
}
return nil
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) {
Expand Down
10 changes: 7 additions & 3 deletions statistics/handle/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,16 @@ func TestDDLHistogram(t *testing.T) {
func TestDDLPartition(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
for _, pruneMode := range []string{"static", "dynamic"} {
for i, pruneMode := range []string{"static", "dynamic"} {
testKit.MustExec("set @@tidb_partition_prune_mode=`" + pruneMode + "`")
testKit.MustExec("set global tidb_partition_prune_mode=`" + pruneMode + "`")
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
h := do.StatsHandle()
if i == 1 {
err := h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
}
createTable := `CREATE TABLE t (a int, b int, primary key(a), index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6),
Expand All @@ -207,14 +212,13 @@ PARTITION BY RANGE ( a ) (
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()
h := do.StatsHandle()
err = h.HandleDDLEvent(<-h.DDLEventCh())
require.NoError(t, err)
require.Nil(t, h.Update(is))
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
require.False(t, statsTbl.Pseudo, "for %v", pruneMode)
}

testKit.MustExec("insert into t values (1,2),(6,2),(11,2),(16,2)")
Expand Down
43 changes: 41 additions & 2 deletions statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package handle
import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/pingcap/errors"
Expand All @@ -34,9 +35,11 @@ import (
"go.uber.org/zap"
)

const gcLastTSVarName = "tidb_stats_gc_last_ts"

// GCStats will garbage collect the useless stats info. For dropped tables, we will first update their version so that
// other tidb could know that table is deleted.
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error {
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) {
ctx := context.Background()
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
// we only garbage collect version before 10 lease.
Expand All @@ -47,7 +50,17 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error
return nil
}
gcVer := now - offset
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version < %?", gcVer)
lastGC, err := h.GetLastGCTimestamp(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
return
}
err = h.writeGCTimestampToKV(ctx, gcVer)
}()
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -70,6 +83,32 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error
return h.removeDeletedExtendedStats(gcVer)
}

// GetLastGCTimestamp loads the last gc time from mysql.tidb.
func (h *Handle) GetLastGCTimestamp(ctx context.Context) (uint64, error) {
rows, _, err := h.execRestrictedSQL(ctx, "SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName)
if err != nil {
return 0, errors.Trace(err)
}
if len(rows) != 1 {
return 0, errors.New("can not get 'tidb_stat_gc_last_ts' from table mysql.tidb")
}
lastGcTSString := rows[0].GetString(0)
lastGcTS, err := strconv.ParseUint(lastGcTSString, 10, 64)
if err != nil {
return 0, errors.Trace(err)
}
return lastGcTS, nil
}

func (h *Handle) writeGCTimestampToKV(ctx context.Context, newTS uint64) error {
_, _, err := h.execRestrictedSQL(ctx,
"update mysql.tidb set variable_value = %? where variable_name = %?",
newTS,
gcLastTSVarName,
)
return err
}

func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error {
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID)
Expand Down
4 changes: 3 additions & 1 deletion statistics/handle/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func TestGCExtendedStats(t *testing.T) {
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")
testKit.MustExec("alter table t add stats_extended s1 correlation(a,b)")
testKit.MustExec("alter table t add stats_extended s2 correlation(b,c)")
h := dom.StatsHandle()
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
testKit.MustExec("analyze table t")

testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
Expand All @@ -114,7 +116,6 @@ func TestGCExtendedStats(t *testing.T) {
"s1 2 [1,2] 1.000000 1",
"s2 2 [2,3] 1.000000 1",
))
h := dom.StatsHandle()
ddlLease := time.Duration(0)
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
Expand All @@ -130,6 +131,7 @@ func TestGCExtendedStats(t *testing.T) {
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
"s2 2 [2,3] 1.000000 1",
))
require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh()))
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows(
"s2 2 [2,3] 1.000000 2",
Expand Down