Skip to content

Commit ecc2f3e

Browse files
authored
ddl: cherrypick some PRs to fix MDL bug (pingcap#47079)
close pingcap#46920
1 parent 5d19713 commit ecc2f3e

File tree

4 files changed

+55
-26
lines changed

4 files changed

+55
-26
lines changed

ddl/ddl.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ const (
152152
OnExistReplace
153153

154154
jobRecordCapacity = 16
155+
jobOnceCapacity = 1000
155156
)
156157

157158
var (
@@ -286,14 +287,14 @@ type waitSchemaSyncedController struct {
286287
mu sync.RWMutex
287288
job map[int64]struct{}
288289

289-
// true if this node is elected to the DDL owner, we should wait 2 * lease before it runs the first DDL job.
290-
once *atomicutil.Bool
290+
// Use to check if the DDL job is the first run on this owner.
291+
onceMap map[int64]struct{}
291292
}
292293

293294
func newWaitSchemaSyncedController() *waitSchemaSyncedController {
294295
return &waitSchemaSyncedController{
295-
job: make(map[int64]struct{}, jobRecordCapacity),
296-
once: atomicutil.NewBool(true),
296+
job: make(map[int64]struct{}, jobRecordCapacity),
297+
onceMap: make(map[int64]struct{}, jobOnceCapacity),
297298
}
298299
}
299300

@@ -316,6 +317,25 @@ func (w *waitSchemaSyncedController) synced(job *model.Job) {
316317
delete(w.job, job.ID)
317318
}
318319

320+
// maybeAlreadyRunOnce returns true means that the job may be the first run on this owner.
321+
// Returns false means that the job must not be the first run on this owner.
322+
func (w *waitSchemaSyncedController) maybeAlreadyRunOnce(id int64) bool {
323+
w.mu.Lock()
324+
defer w.mu.Unlock()
325+
_, ok := w.onceMap[id]
326+
return ok
327+
}
328+
329+
func (w *waitSchemaSyncedController) setAlreadyRunOnce(id int64) {
330+
w.mu.Lock()
331+
defer w.mu.Unlock()
332+
if len(w.onceMap) > jobOnceCapacity {
333+
// If the map is too large, we reset it. These jobs may need to check schema synced again, but it's ok.
334+
w.onceMap = make(map[int64]struct{}, jobRecordCapacity)
335+
}
336+
w.onceMap[id] = struct{}{}
337+
}
338+
319339
// ddlCtx is the context when we use worker to handle DDL jobs.
320340
type ddlCtx struct {
321341
ctx context.Context

ddl/ddl_worker.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,8 +1049,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
10491049
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
10501050
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
10511051
// the newest schema.
1052-
waitSchemaChanged(context.Background(), d, waitTime, schemaVer, job)
1053-
1052+
_ = waitSchemaChanged(d, waitTime, schemaVer, job)
10541053
if RunInGoTest {
10551054
// d.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously.
10561055
d.mu.RLock()
@@ -1352,14 +1351,14 @@ func toTError(err error) *terror.Error {
13521351
return dbterror.ClassDDL.Synthesize(terror.CodeUnknown, err.Error())
13531352
}
13541353

1355-
// waitSchemaChanged waits for the completion of updating all servers' schema. In order to make sure that happens,
1354+
// waitSchemaChanged waits for the completion of updating all servers' schema or MDL synced. In order to make sure that happens,
13561355
// we wait at most 2 * lease time(sessionTTL, 90 seconds).
1357-
func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) {
1356+
func waitSchemaChanged(d *ddlCtx, waitTime time.Duration, latestSchemaVersion int64, job *model.Job) error {
13581357
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
1359-
return
1358+
return nil
13601359
}
13611360
if waitTime == 0 {
1362-
return
1361+
return nil
13631362
}
13641363

13651364
timeStart := time.Now()
@@ -1370,29 +1369,33 @@ func waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time.Duration, l
13701369

13711370
if latestSchemaVersion == 0 {
13721371
logutil.Logger(d.ctx).Info("[ddl] schema version doesn't change")
1373-
return
1372+
return nil
13741373
}
13751374

1376-
err = d.schemaSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion)
1375+
err = d.schemaSyncer.OwnerUpdateGlobalVersion(d.ctx, latestSchemaVersion)
13771376
if err != nil {
13781377
logutil.Logger(d.ctx).Info("[ddl] update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
1378+
if variable.EnableMDL.Load() {
1379+
return err
1380+
}
13791381
if terror.ErrorEqual(err, context.DeadlineExceeded) {
13801382
// If err is context.DeadlineExceeded, it means waitTime(2 * lease) is elapsed. So all the schemas are synced by ticker.
13811383
// There is no need to use etcd to sync. The function returns directly.
1382-
return
1384+
return nil
13831385
}
13841386
}
13851387

13861388
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
1387-
err = d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
1389+
err = d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
13881390
if err != nil {
13891391
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
1390-
return
1392+
return err
13911393
}
13921394
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version changed(get the metadata lock if tidb_enable_metadata_lock is true)",
13931395
zap.Int64("ver", latestSchemaVersion),
13941396
zap.Duration("take time", time.Since(timeStart)),
13951397
zap.String("job", job.String()))
1398+
return nil
13961399
}
13971400

13981401
// waitSchemaSyncedForMDL likes waitSchemaSynced, but it waits for getting the metadata lock of the latest version of this DDL.
@@ -1409,7 +1412,7 @@ func waitSchemaSyncedForMDL(d *ddlCtx, job *model.Job, latestSchemaVersion int64
14091412

14101413
timeStart := time.Now()
14111414
// OwnerCheckAllVersions returns only when all TiDB schemas are synced(exclude the isolated TiDB).
1412-
err := d.schemaSyncer.OwnerCheckAllVersions(context.Background(), job.ID, latestSchemaVersion)
1415+
err := d.schemaSyncer.OwnerCheckAllVersions(d.ctx, job.ID, latestSchemaVersion)
14131416
if err != nil {
14141417
logutil.Logger(d.ctx).Info("[ddl] wait latest schema version encounter error", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
14151418
return err
@@ -1451,8 +1454,7 @@ func waitSchemaSynced(d *ddlCtx, job *model.Job, waitTime time.Duration) error {
14511454
}
14521455
})
14531456

1454-
waitSchemaChanged(context.Background(), d, waitTime, latestSchemaVersion, job)
1455-
return nil
1457+
return waitSchemaChanged(d, waitTime, latestSchemaVersion, job)
14561458
}
14571459

14581460
func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOption {

ddl/job_table.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (d *ddl) startDispatchLoop() {
174174
return
175175
}
176176
if !variable.EnableConcurrentDDL.Load() || !d.isOwner() || d.waiting.Load() {
177-
d.once.Store(true)
177+
d.onceMap = make(map[int64]struct{}, jobOnceCapacity)
178178
time.Sleep(time.Second)
179179
continue
180180
}
@@ -234,7 +234,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
234234
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec()
235235
}()
236236
// check if this ddl job is synced to all servers.
237-
if !d.isSynced(job) || d.once.Load() {
237+
if !job.NotStarted() && (!d.isSynced(job) || !d.maybeAlreadyRunOnce(job.ID)) {
238238
if variable.EnableMDL.Load() {
239239
exist, version, err := checkMDLInfo(job.ID, d.sessPool)
240240
if err != nil {
@@ -249,7 +249,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
249249
if err != nil {
250250
return
251251
}
252-
d.once.Store(false)
252+
d.setAlreadyRunOnce(job.ID)
253253
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
254254
// Don't have a worker now.
255255
return
@@ -263,7 +263,7 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
263263
pool.put(wk)
264264
return
265265
}
266-
d.once.Store(false)
266+
d.setAlreadyRunOnce(job.ID)
267267
}
268268
}
269269

@@ -282,9 +282,14 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) {
282282
})
283283

284284
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
285-
// If the job is done or still running or rolling back, we will wait 2 * lease time to guarantee other servers to update
285+
// If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update
286286
// the newest schema.
287-
waitSchemaChanged(context.Background(), d.ddlCtx, d.lease*2, schemaVer, job)
287+
err := waitSchemaChanged(d.ddlCtx, d.lease*2, schemaVer, job)
288+
if err != nil {
289+
// May be caused by server closing, shouldn't clean the MDL info.
290+
logutil.BgLogger().Info("wait latest schema version error", zap.String("category", "ddl"), zap.Error(err))
291+
return
292+
}
288293
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli)
289294
d.synced(job)
290295

ddl/syncer/syncer.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,15 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
310310
if variable.EnableMDL.Load() {
311311
for _, kv := range resp.Kvs {
312312
key := string(kv.Key)
313+
tidbIDInResp := key[strings.LastIndex(key, "/")+1:]
313314
ver, err := strconv.Atoi(string(kv.Value))
314315
if err != nil {
315316
logutil.BgLogger().Info("[ddl] syncer check all versions, convert value to int failed, continue checking.", zap.String("ddl", string(kv.Key)), zap.String("value", string(kv.Value)), zap.Error(err))
316317
succ = false
317318
break
318319
}
319-
if int64(ver) < latestVer {
320+
// We need to check if the tidb ID is in the updatedMap, in case that deleting etcd is failed, and tidb server is down.
321+
if int64(ver) < latestVer && updatedMap[tidbIDInResp] != "" {
320322
if notMatchVerCnt%intervalCnt == 0 {
321323
logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced, continue checking",
322324
zap.String("ddl", string(kv.Key)), zap.Int("currentVer", ver), zap.Int64("latestVer", latestVer))
@@ -325,7 +327,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i
325327
notMatchVerCnt++
326328
break
327329
}
328-
delete(updatedMap, key[strings.LastIndex(key, "/")+1:])
330+
delete(updatedMap, tidbIDInResp)
329331
}
330332
if len(updatedMap) > 0 {
331333
succ = false

0 commit comments

Comments
 (0)