Skip to content

Commit 182efd8

Browse files
authored
ddl, owner: clean processing ddl jobs when owner retired (#50101)
close #50073
1 parent f19d876 commit 182efd8

File tree

4 files changed

+26
-1
lines changed

4 files changed

+26
-1
lines changed

pkg/ddl/ddl.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,12 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
792792
defer d.sessPool.Put(ctx)
793793

794794
ingest.InitGlobalLightningEnv()
795+
d.ownerManager.SetRetireOwnerHook(func() {
796+
// Since this instance is not DDL owner anymore, we clean up the processing job info.
797+
if ingest.LitBackCtxMgr != nil {
798+
ingest.LitBackCtxMgr.MarkJobFinish()
799+
}
800+
})
795801

796802
return nil
797803
}

pkg/ddl/job_table.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
225225
return false, nil
226226
}
227227
if (job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey) &&
228+
job.State == model.JobStateQueueing &&
228229
job.ReorgMeta != nil &&
229230
job.ReorgMeta.IsFastReorg &&
230231
ingest.LitBackCtxMgr != nil {

pkg/owner/manager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ type Manager interface {
6565

6666
// SetBeOwnerHook sets a hook. The hook is called before becoming an owner.
6767
SetBeOwnerHook(hook func())
68+
// SetRetireOwnerHook will be called after retiring the owner.
69+
SetRetireOwnerHook(hook func())
6870
}
6971

7072
const (
@@ -114,8 +116,10 @@ type ownerManager struct {
114116
elec unsafe.Pointer
115117
sessionLease *atomicutil.Int64
116118
wg sync.WaitGroup
117-
beOwnerHook func()
118119
campaignCancel context.CancelFunc
120+
121+
beOwnerHook func()
122+
retireOwnerHook func()
119123
}
120124

121125
// NewOwnerManager creates a new Manager.
@@ -160,6 +164,10 @@ func (m *ownerManager) SetBeOwnerHook(hook func()) {
160164
m.beOwnerHook = hook
161165
}
162166

167+
func (m *ownerManager) SetRetireOwnerHook(hook func()) {
168+
m.retireOwnerHook = hook
169+
}
170+
163171
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
164172
var ManagerSessionTTL = 60
165173

@@ -222,6 +230,9 @@ func (m *ownerManager) toBeOwner(elec *concurrency.Election) {
222230

223231
// RetireOwner make the manager to be a not owner.
224232
func (m *ownerManager) RetireOwner() {
233+
if m.retireOwnerHook != nil {
234+
m.retireOwnerHook()
235+
}
225236
atomic.StorePointer(&m.elec, nil)
226237
}
227238

pkg/owner/mock.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ type mockManager struct {
4242
wg sync.WaitGroup
4343
cancel context.CancelFunc
4444
beOwnerHook func()
45+
retireHook func()
4546
campaignDone chan struct{}
4647
resignDone chan struct{}
4748
}
@@ -168,10 +169,16 @@ func (*mockManager) RequireOwner(context.Context) error {
168169
return nil
169170
}
170171

172+
// SetBeOwnerHook implements Manager.SetBeOwnerHook interface.
171173
func (m *mockManager) SetBeOwnerHook(hook func()) {
172174
m.beOwnerHook = hook
173175
}
174176

177+
// SetRetireOwnerHook implements Manager.SetRetireOwnerHook interface.
178+
func (m *mockManager) SetRetireOwnerHook(hook func()) {
179+
m.retireHook = hook
180+
}
181+
175182
// CampaignCancel implements Manager.CampaignCancel interface
176183
func (m *mockManager) CampaignCancel() {
177184
m.campaignDone <- struct{}{}

0 commit comments

Comments
 (0)