Skip to content

Commit a33a868

Browse files
authored
ddl: query since min job id to mitigate slowness due to un-compacted deleted kv (#54122)
ref #53246
1 parent 3023aea commit a33a868

File tree

7 files changed

+200
-4
lines changed

7 files changed

+200
-4
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,7 @@ gen_mock: mockgen
496496
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
497497
tools/bin/mockgen -package mockstorage github.com/pingcap/tidb/br/pkg/storage ExternalStorage > br/pkg/mock/storage/storage.go
498498
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl SchemaLoader > pkg/ddl/mock/schema_loader_mock.go
499+
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager > pkg/ddl/mock/systable_manager_mock.go
499500

500501
# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
501502
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have

pkg/ddl/job_scheduler_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,39 @@ func TestMustReloadSchemas(t *testing.T) {
6161
sch.mustReloadSchemas()
6262
require.True(t, ctrl.Satisfied())
6363
}
64+
65+
func TestRefreshMinJobID(t *testing.T) {
66+
reduceIntervals(t)
67+
ctrl := gomock.NewController(t)
68+
defer ctrl.Finish()
69+
mgr := mock.NewMockManager(ctrl)
70+
71+
sch := &jobScheduler{
72+
schCtx: context.Background(),
73+
sysTblMgr: mgr,
74+
}
75+
// success
76+
start := time.Now()
77+
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(0)).Return(int64(1), nil)
78+
sch.refreshMinJobID()
79+
require.EqualValues(t, 1, sch.currMinJobID)
80+
require.GreaterOrEqual(t, sch.lastRefreshMinIDTime, start)
81+
require.True(t, ctrl.Satisfied())
82+
// not refresh too fast
83+
schedulerLoopRetryInterval = time.Hour
84+
sch.refreshMinJobID()
85+
require.True(t, ctrl.Satisfied())
86+
// ignore refresh error
87+
sch.lastRefreshMinIDTime = time.Time{}
88+
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(1)).Return(int64(0), errors.New("mock err"))
89+
sch.refreshMinJobID()
90+
require.EqualValues(t, 1, sch.currMinJobID)
91+
require.True(t, ctrl.Satisfied())
92+
// success again
93+
sch.lastRefreshMinIDTime = time.Time{}
94+
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(1)).Return(int64(100), nil)
95+
sch.refreshMinJobID()
96+
require.EqualValues(t, 100, sch.currMinJobID)
97+
require.GreaterOrEqual(t, sch.lastRefreshMinIDTime, start)
98+
require.True(t, ctrl.Satisfied())
99+
}

pkg/ddl/job_table.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ type jobScheduler struct {
128128
runningJobs *runningJobs
129129
sysTblMgr systable.Manager
130130
schemaLoader SchemaLoader
131+
// currMinJobID is the minimal job ID in tidb_ddl_job table, we use it to mitigate
132+
// this issue https://github.com/pingcap/tidb/issues/52905
133+
currMinJobID int64
134+
lastRefreshMinIDTime time.Time
131135

132136
// those fields are created on start
133137
reorgWorkerPool *workerPool
@@ -191,14 +195,15 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
191195
not = ""
192196
label = "get_job_reorg"
193197
}
198+
// TODO replace this sub-query with memory implementation.
194199
const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
195-
(select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
200+
(select min(job_id) from mysql.tidb_ddl_job where job_id >= %d group by schema_ids, table_ids, processing)
196201
and %s reorg %s order by processing desc, job_id`
197202
var excludedJobIDs string
198203
if ids := s.runningJobs.allIDs(); len(ids) > 0 {
199204
excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids)
200205
}
201-
sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs)
206+
sql := fmt.Sprintf(getJobSQL, s.currMinJobID, not, excludedJobIDs)
202207
rows, err := se.Execute(context.Background(), sql, label)
203208
if err != nil {
204209
return nil, errors.Trace(err)
@@ -387,6 +392,7 @@ func (s *jobScheduler) startDispatch() error {
387392
if err := s.checkAndUpdateClusterState(false); err != nil {
388393
continue
389394
}
395+
s.refreshMinJobID()
390396
failpoint.InjectCall("beforeAllLoadDDLJobAndRun")
391397
s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, jobTypeGeneral)
392398
s.loadDDLJobAndRun(se, s.reorgWorkerPool, jobTypeReorg)
@@ -645,6 +651,21 @@ func (*jobScheduler) markJobProcessing(se *sess.Session, job *model.Job) error {
645651
return errors.Trace(err)
646652
}
647653

654+
func (s *jobScheduler) refreshMinJobID() {
655+
now := time.Now()
656+
if now.Sub(s.lastRefreshMinIDTime) < dispatchLoopWaitingDuration {
657+
return
658+
}
659+
s.lastRefreshMinIDTime = now
660+
minID, err := s.sysTblMgr.GetMinJobID(s.schCtx, s.currMinJobID)
661+
if err != nil {
662+
logutil.DDLLogger().Info("get min job ID failed", zap.Error(err))
663+
return
664+
}
665+
// use max, in case all job are finished to avoid the currMinJobID go back.
666+
s.currMinJobID = max(s.currMinJobID, minID)
667+
}
668+
648669
func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) {
649670
var tbl table.Table
650671
var dbInfo *model.DBInfo

pkg/ddl/mock/BUILD.bazel

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "mock",
5-
srcs = ["schema_loader_mock.go"],
5+
srcs = [
6+
"schema_loader_mock.go",
7+
"systable_manager_mock.go",
8+
],
69
importpath = "github.com/pingcap/tidb/pkg/ddl/mock",
710
visibility = ["//visibility:public"],
8-
deps = ["@org_uber_go_mock//gomock"],
11+
deps = [
12+
"//pkg/parser/model",
13+
"@org_uber_go_mock//gomock",
14+
],
915
)

pkg/ddl/mock/systable_manager_mock.go

Lines changed: 91 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ddl/systable/manager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ type Manager interface {
3838
GetJobByID(ctx context.Context, jobID int64) (*model.Job, error)
3939
// GetMDLVer gets the MDL version by job ID, returns ErrNotFound if the MDL info does not exist.
4040
GetMDLVer(ctx context.Context, jobID int64) (int64, error)
41+
// GetMinJobID gets current minimum job ID in the job table for job_id >= prevMinJobID,
42+
// if no jobs, returns 0. prevMinJobID is used to avoid full table scan, see
43+
// https://github.com/pingcap/tidb/issues/52905
44+
GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error)
4145
}
4246

4347
type manager struct {
@@ -105,3 +109,22 @@ func (mgr *manager) GetMDLVer(ctx context.Context, jobID int64) (int64, error) {
105109
}
106110
return ver, nil
107111
}
112+
113+
func (mgr *manager) GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error) {
114+
var minID int64
115+
if err := mgr.withNewSession(func(se *session.Session) error {
116+
sql := fmt.Sprintf(`select min(job_id) from mysql.tidb_ddl_job where job_id >= %d`, prevMinJobID)
117+
rows, err := se.Execute(ctx, sql, "get-min-job-id")
118+
if err != nil {
119+
return errors.Trace(err)
120+
}
121+
if len(rows) == 0 {
122+
return nil
123+
}
124+
minID = rows[0].GetInt64(0)
125+
return nil
126+
}); err != nil {
127+
return 0, err
128+
}
129+
return minID, nil
130+
}

pkg/ddl/systable/manager_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,22 @@ func TestManager(t *testing.T) {
6464
require.NoError(t, err)
6565
require.EqualValues(t, 123, ver)
6666
})
67+
68+
t.Run("GetMinJobID", func(t *testing.T) {
69+
tk.MustExec("delete from mysql.tidb_ddl_job")
70+
id, err := mgr.GetMinJobID(ctx, 0)
71+
require.NoError(t, err)
72+
require.EqualValues(t, 0, id)
73+
tk.MustExec(`insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing)
74+
values(123456, 0, '1', '1', '{"id":9998}', 1, 0)`)
75+
id, err = mgr.GetMinJobID(ctx, 0)
76+
require.NoError(t, err)
77+
require.EqualValues(t, 123456, id)
78+
id, err = mgr.GetMinJobID(ctx, 123456)
79+
require.NoError(t, err)
80+
require.EqualValues(t, 123456, id)
81+
id, err = mgr.GetMinJobID(ctx, 123457)
82+
require.NoError(t, err)
83+
require.EqualValues(t, 0, id)
84+
})
6785
}

0 commit comments

Comments
 (0)