Skip to content
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ go_test(
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/internal/callback",
"//ddl/placement",
"//ddl/schematracker",
Expand Down
37 changes: 37 additions & 0 deletions ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@ package ddl

import (
"bytes"
"context"
"testing"

"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/stretchr/testify/require"
)

Expand All @@ -43,3 +48,35 @@ func TestDoneTaskKeeper(t *testing.T) {
n.updateNextKey(6, kv.Key("h"))
require.True(t, bytes.Equal(n.nextKey, kv.Key("h")))
}

func TestPickBackfillType(t *testing.T) {
mockMgr := ingest.NewMockBackendCtxMgr(
func() sessionctx.Context {
return nil
})
ingest.LitBackCtxMgr = mockMgr
mockCtx := context.Background()
const uk = false
mockJob := &model.Job{
ID: 1,
ReorgMeta: &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeTxn,
},
}
variable.EnableFastReorg.Store(true)
tp, err := pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxn)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = false
tp, err = pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeTxnMerge)

mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone
ingest.LitInitialized = true
tp, err = pickBackfillType(mockCtx, mockJob, uk)
require.NoError(t, err)
require.Equal(t, tp, model.ReorgTypeLitMerge)
}
72 changes: 46 additions & 26 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
switch indexInfo.State {
case model.StateNone:
// none -> delete only
reorgTp := pickBackfillType(job)
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
Copy link
Collaborator

@Benjamin2037 Benjamin2037 Apr 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if return err,can will still run add index?Or when there is err for lightning mode, can we all use txnMerge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if return err,can will still run add index?

No. Users should check their configuration and fix the issue, or turn off tidb_ddl_enable_fast_reorg.

Or when there is err for lightning mode, can we all use txnMerge?

If something goes wrong halfway through the execution, it will fallback to txn-merge mode.

if err != nil {
break
}
if reorgTp.NeedMergeProcess() {
// Increase telemetryAddIndexIngestUsage
telemetryAddIndexIngestUsage.Inc()
Expand Down Expand Up @@ -711,39 +715,50 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

// pickBackfillType determines which backfill process will be used.
func pickBackfillType(job *model.Job) model.ReorgType {
func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) {
if job.ReorgMeta.ReorgTp != model.ReorgTypeNone {
// The backfill task has been started.
// Don't switch the backfill process.
return job.ReorgMeta.ReorgTp
// Don't change the backfill type.
return job.ReorgMeta.ReorgTp, nil
}
if IsEnableFastReorg() {
var useIngest bool
if ingest.LitInitialized && ingest.LitBackCtxMgr.Available() {
cleanupSortPath(job.ID)
if !IsEnableFastReorg() {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn, nil
}
if ingest.LitInitialized {
available, err := ingest.LitBackCtxMgr.CheckAvailable()
if err != nil {
return model.ReorgTypeNone, err
}
if available {
err = cleanupSortPath(job.ID)
if err != nil {
return model.ReorgTypeNone, err
}
_, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID)
if err != nil {
return model.ReorgTypeNone, err
}
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge, nil
}
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", ingest.LitInitialized),
zap.Bool("can use ingest", useIngest))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge
}
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
return model.ReorgTypeTxn
// The lightning environment is unavailable, but we can still use the txn-merge backfill.
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process",
zap.Bool("lightning env initialized", false))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
return model.ReorgTypeTxnMerge, nil
}

// cleanupSortPath is used to clean up the temp data of the previous jobs.
// Because we don't remove all the files after the support of checkpoint,
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
func cleanupSortPath(currentJobID int64) {
func cleanupSortPath(currentJobID int64) error {
sortPath := ingest.ConfigSortPath()
entries, err := os.ReadDir(sortPath)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
logutil.BgLogger().Warn("[ddl-ingest] cannot read sort path", zap.Error(err))
return errors.Trace(err)
}
for _, entry := range entries {
if !entry.IsDir() {
Expand All @@ -762,10 +777,11 @@ func cleanupSortPath(currentJobID int64) {
err := os.RemoveAll(filepath.Join(sortPath, entry.Name()))
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
return nil
}
}
}
return nil
}

// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed.
Expand Down Expand Up @@ -824,17 +840,21 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
bfProcess := pickBackfillType(job)
if !bfProcess.NeedMergeProcess() {
var reorgTp model.ReorgType
reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique)
if err != nil {
return false, ver, err
}
if !reorgTp.NeedMergeProcess() {
return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
}
switch indexInfo.BackfillState {
case model.BackfillStateRunning:
logutil.BgLogger().Info("[ddl] index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge),
zap.Bool("ingest mode", reorgTp == model.ReorgTypeLitMerge),
zap.String("index", indexInfo.Name.O))
switch bfProcess {
switch reorgTp {
case model.ReorgTypeLitMerge:
if job.ReorgMeta.IsDistReorg {
done, ver, err = runIngestReorgJobDist(w, d, t, job, tbl, indexInfo)
Expand All @@ -854,7 +874,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
indexInfo.BackfillState = model.BackfillStateMerging
if bfProcess == model.ReorgTypeLitMerge {
if reorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg.
Expand Down
14 changes: 9 additions & 5 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// BackendCtxMgr is used to manage the backend context.
type BackendCtxMgr interface {
Available() bool
CheckAvailable() (bool, error)
Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error)
Unregister(jobID int64)
Load(jobID int64) (BackendCtx, bool)
Expand All @@ -56,16 +56,20 @@ func newLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr {
return mgr
}

// Available checks if the ingest backfill is available.
func (m *litBackendCtxMgr) Available() bool {
// CheckAvailable checks if the ingest backfill is available.
func (m *litBackendCtxMgr) CheckAvailable() (bool, error) {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
activeJobIDs := m.Keys()
if len(activeJobIDs) > 0 {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is already in use by another DDL job",
zap.Int64("job ID", activeJobIDs[0]))
return false
return false, nil
}
return true
if err := m.diskRoot.PreCheckUsage(); err != nil {
logutil.BgLogger().Info("[ddl-ingest] ingest backfill is not available", zap.Error(err))
return false, err
}
return true, nil
}

// Register creates a new backend and registers it to the backend context.
Expand Down
14 changes: 14 additions & 0 deletions ddl/ingest/disk_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"sync"

"github.com/pingcap/errors"
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -29,6 +30,7 @@ type DiskRoot interface {
UpdateUsage()
ShouldImport() bool
UsageInfo() string
PreCheckUsage() error
}

const capacityThreshold = 0.9
Expand Down Expand Up @@ -87,3 +89,15 @@ func (d *diskRootImpl) UsageInfo() string {
defer d.mu.RUnlock()
return fmt.Sprintf("disk usage: %d/%d, backend usage: %d", d.used, d.capacity, d.bcUsed)
}

// PreCheckUsage implements DiskRoot interface.
func (d *diskRootImpl) PreCheckUsage() error {
sz, err := lcom.GetStorageSize(d.path)
if err != nil {
return errors.Trace(err)
}
if float64(sz.Available) < (1-capacityThreshold)*float64(sz.Capacity) {
return errors.Errorf("%s, please clean up the disk and retry", d.UsageInfo())
}
return nil
}
6 changes: 3 additions & 3 deletions ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBacken
}
}

// Available implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) Available() bool {
return true
// CheckAvailable implements BackendCtxMgr.Available interface.
func (*MockBackendCtxMgr) CheckAvailable() (bool, error) {
return true, nil
}

// Register implements BackendCtxMgr.Register interface.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,7 +2185,7 @@ var defaultSysVars = []*SysVar{
return nil
}},
// This system var is set disk quota for lightning sort dir, from 100 GB to 1PB.
{Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
{Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: MinTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil
}, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota))
Expand Down
7 changes: 4 additions & 3 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ func TestSetTIDBDiskQuota(t *testing.T) {
vars.GlobalVarsAccessor = mock
diskQuota := GetSysVar(TiDBDDLDiskQuota)
var (
mb int64 = 1024 * 1024
gb int64 = 1024 * 1024 * 1024
pb int64 = 1024 * 1024 * 1024 * 1024 * 1024
err error
Expand All @@ -778,12 +779,12 @@ func TestSetTIDBDiskQuota(t *testing.T) {
// Default 100 GB
require.Equal(t, diskQuota.Value, strconv.FormatInt(100*gb, 10))

// MinValue is 100 GB, set to 50 Gb is not allowed
err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(50*gb, 10))
// MinValue is 1 GB, set to 500 MB is not allowed
err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(500*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBDDLDiskQuota)
require.NoError(t, err)
require.Equal(t, strconv.FormatInt(100*gb, 10), val)
require.Equal(t, strconv.FormatInt(1*gb, 10), val)

// Set to 100 GB
err = mock.SetGlobalSysVar(context.Background(), TiDBDDLDiskQuota, strconv.FormatInt(100*gb, 10))
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ const (
DefMemoryUsageAlarmKeepRecordNum = 5
DefTiDBEnableFastReorg = true
DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB
MinTiDBDDLDiskQuota = 1 * 1024 * 1024 * 1024 // 1GB
DefExecutorConcurrency = 5
DefTiDBEnableNonPreparedPlanCache = false
DefTiDBNonPreparedPlanCacheSize = 100
Expand Down
4 changes: 3 additions & 1 deletion tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,9 @@ func TestAddIndexIngestCancel(t *testing.T) {
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
require.True(t, cancelled)
dom.DDL().SetHook(defHook)
require.True(t, ingest.LitBackCtxMgr.Available())
ok, err := ingest.LitBackCtxMgr.CheckAvailable()
require.NoError(t, err)
require.True(t, ok)
}

func TestAddIndexSplitTableRanges(t *testing.T) {
Expand Down