Skip to content

Commit 0f05777

Browse files
authored
ddl: fix runnable ingest job checking (#52503) (#59761)
close #52475
1 parent 9336ad0 commit 0f05777

File tree

4 files changed

+39
-19
lines changed

4 files changed

+39
-19
lines changed

ddl/ddl.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
740740
if err != nil {
741741
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
742742
}
743+
d.runningJobs.clear()
743744
d.ddlCtx.setOwnerTS(time.Now().Unix())
744745
})
745746

@@ -769,9 +770,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
769770
d.wg.Run(d.PollTiFlashRoutine)
770771

771772
ingest.InitGlobalLightningEnv()
772-
d.ownerManager.SetRetireOwnerHook(func() {
773-
d.runningJobs = newRunningJobs()
774-
})
775773

776774
return nil
777775
}

ddl/ddl_running_jobs.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ import (
2222
"strconv"
2323
"strings"
2424
"sync"
25+
"time"
2526

2627
"github.com/pingcap/tidb/parser/model"
28+
"github.com/pingcap/tidb/util/logutil"
29+
"go.uber.org/zap"
2730
)
2831

2932
type runningJobs struct {
@@ -36,6 +39,11 @@ type runningJobs struct {
3639
// It is not necessarily being processed by a worker.
3740
unfinishedIDs map[int64]struct{}
3841
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}
42+
43+
// processingReorgJobID records the ID of the ingest job that is being processed by a worker.
44+
// TODO(tangenta): remove this when we support running multiple concurrent ingest jobs.
45+
processingIngestJobID int64
46+
lastLoggingTime time.Time
3947
}
4048

4149
func newRunningJobs() *runningJobs {
@@ -46,11 +54,21 @@ func newRunningJobs() *runningJobs {
4654
}
4755
}
4856

57+
func (j *runningJobs) clear() {
58+
j.Lock()
59+
defer j.Unlock()
60+
j.unfinishedIDs = make(map[int64]struct{})
61+
j.unfinishedSchema = make(map[string]map[string]struct{})
62+
}
63+
4964
func (j *runningJobs) add(job *model.Job) {
5065
j.Lock()
5166
defer j.Unlock()
5267
j.processingIDs[job.ID] = struct{}{}
5368
j.updateInternalRunningJobIDs()
69+
if isIngestJob(job) {
70+
j.processingIngestJobID = job.ID
71+
}
5472

5573
if _, ok := j.unfinishedIDs[job.ID]; ok {
5674
// Already exists, no need to add it again.
@@ -70,6 +88,9 @@ func (j *runningJobs) remove(job *model.Job) {
7088
defer j.Unlock()
7189
delete(j.processingIDs, job.ID)
7290
j.updateInternalRunningJobIDs()
91+
if isIngestJob(job) && job.ID == j.processingIngestJobID {
92+
j.processingIngestJobID = 0
93+
}
7394

7495
if job.IsFinished() || job.IsSynced() {
7596
delete(j.unfinishedIDs, job.ID)
@@ -110,6 +131,16 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
110131
// Already processing by a worker. Skip running it again.
111132
return false
112133
}
134+
if isIngestJob(job) && j.processingIngestJobID != 0 {
135+
// We only allow one task to use ingest at the same time in order to limit the CPU/memory usage.
136+
if time.Since(j.lastLoggingTime) > 1*time.Minute {
137+
logutil.BgLogger().Info("ingest backfill worker is already in used by another DDL job",
138+
zap.String("category", "ddl-ingest"),
139+
zap.Int64("processing job ID", j.processingIngestJobID))
140+
j.lastLoggingTime = time.Now()
141+
}
142+
return false
143+
}
113144
for _, info := range job.GetInvolvingSchemaInfo() {
114145
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
115146
return false
@@ -131,3 +162,9 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
131162
}
132163
return true
133164
}
165+
166+
func isIngestJob(job *model.Job) bool {
167+
return (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
168+
job.ReorgMeta != nil &&
169+
IsEnableFastReorg()
170+
}

owner/manager.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ type Manager interface {
5959

6060
// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
6161
SetBeOwnerHook(hook func())
62-
// SetRetireOwnerHook will be called after retiring the owner.
63-
SetRetireOwnerHook(hook func())
6462
}
6563

6664
const (
@@ -87,8 +85,7 @@ type ownerManager struct {
8785
wg sync.WaitGroup
8886
campaignCancel context.CancelFunc
8987

90-
beOwnerHook func()
91-
retireOwnerHook func()
88+
beOwnerHook func()
9289
}
9390

9491
// NewOwnerManager creates a new Manager.
@@ -132,10 +129,6 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
132129
m.beOwnerHook = hook
133130
}
134131

135-
func (m *ownerManager) SetRetireOwnerHook(hook func()) {
136-
m.retireOwnerHook = hook
137-
}
138-
139132
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
140133
var ManagerSessionTTL = 60
141134

@@ -195,9 +188,6 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
195188

196189
// RetireOwner make the manager to be a not owner.
197190
func (m *ownerManager) RetireOwner() {
198-
if m.retireOwnerHook != nil {
199-
m.retireOwnerHook()
200-
}
201191
atomic.StorePointer(&m.elec, nil)
202192
}
203193

owner/mock.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,6 @@ func (m *mockManager) SetBeOwnerHook(hook func()) {
102102
m.beOwnerHook = hook
103103
}
104104

105-
// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface.
106-
func (m *mockManager) SetRetireOwnerHook(hook func()) {
107-
m.retireHook = hook
108-
}
109-
110105
// CampaignCancel implements Manager.CampaignCancel interface
111106
func (m *mockManager) CampaignCancel() {
112107
// do nothing

0 commit comments

Comments
 (0)