Skip to content

Commit d57bc50

Browse files
committed
Revert "ddl: query since min job id to mitigate slowness due to un-compacted deleted kv (#54122)"
This reverts commit a33a868.
1 parent 5c98028 commit d57bc50

File tree

7 files changed

+4
-200
lines changed

7 files changed

+4
-200
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,6 @@ gen_mock: mockgen
496496
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
497497
tools/bin/mockgen -package mockstorage github.com/pingcap/tidb/br/pkg/storage ExternalStorage > br/pkg/mock/storage/storage.go
498498
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl SchemaLoader > pkg/ddl/mock/schema_loader_mock.go
499-
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager > pkg/ddl/mock/systable_manager_mock.go
500499

501500
# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
502501
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have

pkg/ddl/job_scheduler_test.go

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -61,39 +61,3 @@ func TestMustReloadSchemas(t *testing.T) {
6161
sch.mustReloadSchemas()
6262
require.True(t, ctrl.Satisfied())
6363
}
64-
65-
func TestRefreshMinJobID(t *testing.T) {
66-
reduceIntervals(t)
67-
ctrl := gomock.NewController(t)
68-
defer ctrl.Finish()
69-
mgr := mock.NewMockManager(ctrl)
70-
71-
sch := &jobScheduler{
72-
schCtx: context.Background(),
73-
sysTblMgr: mgr,
74-
}
75-
// success
76-
start := time.Now()
77-
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(0)).Return(int64(1), nil)
78-
sch.refreshMinJobID()
79-
require.EqualValues(t, 1, sch.currMinJobID)
80-
require.GreaterOrEqual(t, sch.lastRefreshMinIDTime, start)
81-
require.True(t, ctrl.Satisfied())
82-
// not refresh too fast
83-
schedulerLoopRetryInterval = time.Hour
84-
sch.refreshMinJobID()
85-
require.True(t, ctrl.Satisfied())
86-
// ignore refresh error
87-
sch.lastRefreshMinIDTime = time.Time{}
88-
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(1)).Return(int64(0), errors.New("mock err"))
89-
sch.refreshMinJobID()
90-
require.EqualValues(t, 1, sch.currMinJobID)
91-
require.True(t, ctrl.Satisfied())
92-
// success again
93-
sch.lastRefreshMinIDTime = time.Time{}
94-
mgr.EXPECT().GetMinJobID(gomock.Any(), int64(1)).Return(int64(100), nil)
95-
sch.refreshMinJobID()
96-
require.EqualValues(t, 100, sch.currMinJobID)
97-
require.GreaterOrEqual(t, sch.lastRefreshMinIDTime, start)
98-
require.True(t, ctrl.Satisfied())
99-
}

pkg/ddl/job_table.go

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,6 @@ type jobScheduler struct {
128128
runningJobs *runningJobs
129129
sysTblMgr systable.Manager
130130
schemaLoader SchemaLoader
131-
// currMinJobID is the minimal job ID in tidb_ddl_job table, we use it to mitigate
132-
// this issue https://github.com/pingcap/tidb/issues/52905
133-
currMinJobID int64
134-
lastRefreshMinIDTime time.Time
135131

136132
// those fields are created on start
137133
reorgWorkerPool *workerPool
@@ -198,15 +194,14 @@ func (s *jobScheduler) getJob(se *sess.Session, tp jobType) (*model.Job, error)
198194
not = ""
199195
label = "get_job_reorg"
200196
}
201-
// TODO replace this sub-query with memory implementation.
202197
const getJobSQL = `select job_meta, processing from mysql.tidb_ddl_job where job_id in
203-
(select min(job_id) from mysql.tidb_ddl_job where job_id >= %d group by schema_ids, table_ids, processing)
198+
(select min(job_id) from mysql.tidb_ddl_job group by schema_ids, table_ids, processing)
204199
and %s reorg %s order by processing desc, job_id`
205200
var excludedJobIDs string
206201
if ids := s.runningJobs.allIDs(); len(ids) > 0 {
207202
excludedJobIDs = fmt.Sprintf("and job_id not in (%s)", ids)
208203
}
209-
sql := fmt.Sprintf(getJobSQL, s.currMinJobID, not, excludedJobIDs)
204+
sql := fmt.Sprintf(getJobSQL, not, excludedJobIDs)
210205
rows, err := se.Execute(context.Background(), sql, label)
211206
if err != nil {
212207
return nil, errors.Trace(err)
@@ -398,7 +393,6 @@ func (s *jobScheduler) startDispatch() error {
398393
if err := s.checkAndUpdateClusterState(false); err != nil {
399394
continue
400395
}
401-
s.refreshMinJobID()
402396
failpoint.InjectCall("beforeAllLoadDDLJobAndRun")
403397
s.loadDDLJobAndRun(se, s.generalDDLWorkerPool, jobTypeGeneral)
404398
s.loadDDLJobAndRun(se, s.reorgWorkerPool, jobTypeReorg)
@@ -657,21 +651,6 @@ func (*jobScheduler) markJobProcessing(se *sess.Session, job *model.Job) error {
657651
return errors.Trace(err)
658652
}
659653

660-
func (s *jobScheduler) refreshMinJobID() {
661-
now := time.Now()
662-
if now.Sub(s.lastRefreshMinIDTime) < dispatchLoopWaitingDuration {
663-
return
664-
}
665-
s.lastRefreshMinIDTime = now
666-
minID, err := s.sysTblMgr.GetMinJobID(s.schCtx, s.currMinJobID)
667-
if err != nil {
668-
logutil.DDLLogger().Info("get min job ID failed", zap.Error(err))
669-
return
670-
}
671-
// use max, in case all job are finished to avoid the currMinJobID go back.
672-
s.currMinJobID = max(s.currMinJobID, minID)
673-
}
674-
675654
func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*model.DBInfo, table.Table, error) {
676655
var tbl table.Table
677656
var dbInfo *model.DBInfo

pkg/ddl/mock/BUILD.bazel

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "mock",
5-
srcs = [
6-
"schema_loader_mock.go",
7-
"systable_manager_mock.go",
8-
],
5+
srcs = ["schema_loader_mock.go"],
96
importpath = "github.com/pingcap/tidb/pkg/ddl/mock",
107
visibility = ["//visibility:public"],
11-
deps = [
12-
"//pkg/parser/model",
13-
"@org_uber_go_mock//gomock",
14-
],
8+
deps = ["@org_uber_go_mock//gomock"],
159
)

pkg/ddl/mock/systable_manager_mock.go

Lines changed: 0 additions & 91 deletions
This file was deleted.

pkg/ddl/systable/manager.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ type Manager interface {
3838
GetJobByID(ctx context.Context, jobID int64) (*model.Job, error)
3939
// GetMDLVer gets the MDL version by job ID, returns ErrNotFound if the MDL info does not exist.
4040
GetMDLVer(ctx context.Context, jobID int64) (int64, error)
41-
// GetMinJobID gets current minimum job ID in the job table for job_id >= prevMinJobID,
42-
// if no jobs, returns 0. prevMinJobID is used to avoid full table scan, see
43-
// https://github.com/pingcap/tidb/issues/52905
44-
GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error)
4541
}
4642

4743
type manager struct {
@@ -109,22 +105,3 @@ func (mgr *manager) GetMDLVer(ctx context.Context, jobID int64) (int64, error) {
109105
}
110106
return ver, nil
111107
}
112-
113-
func (mgr *manager) GetMinJobID(ctx context.Context, prevMinJobID int64) (int64, error) {
114-
var minID int64
115-
if err := mgr.withNewSession(func(se *session.Session) error {
116-
sql := fmt.Sprintf(`select min(job_id) from mysql.tidb_ddl_job where job_id >= %d`, prevMinJobID)
117-
rows, err := se.Execute(ctx, sql, "get-min-job-id")
118-
if err != nil {
119-
return errors.Trace(err)
120-
}
121-
if len(rows) == 0 {
122-
return nil
123-
}
124-
minID = rows[0].GetInt64(0)
125-
return nil
126-
}); err != nil {
127-
return 0, err
128-
}
129-
return minID, nil
130-
}

pkg/ddl/systable/manager_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,4 @@ func TestManager(t *testing.T) {
6464
require.NoError(t, err)
6565
require.EqualValues(t, 123, ver)
6666
})
67-
68-
t.Run("GetMinJobID", func(t *testing.T) {
69-
tk.MustExec("delete from mysql.tidb_ddl_job")
70-
id, err := mgr.GetMinJobID(ctx, 0)
71-
require.NoError(t, err)
72-
require.EqualValues(t, 0, id)
73-
tk.MustExec(`insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing)
74-
values(123456, 0, '1', '1', '{"id":9998}', 1, 0)`)
75-
id, err = mgr.GetMinJobID(ctx, 0)
76-
require.NoError(t, err)
77-
require.EqualValues(t, 123456, id)
78-
id, err = mgr.GetMinJobID(ctx, 123456)
79-
require.NoError(t, err)
80-
require.EqualValues(t, 123456, id)
81-
id, err = mgr.GetMinJobID(ctx, 123457)
82-
require.NoError(t, err)
83-
require.EqualValues(t, 0, id)
84-
})
8567
}

0 commit comments

Comments
 (0)