Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
64 changes: 37 additions & 27 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,6 @@ func (bc *litBackendCtx) collectRemoteDuplicateRows(indexID int64, tbl table.Tab
return bc.handleErrorAfterCollectRemoteDuplicateRows(err, indexID, tbl, hasDupe)
}

func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*concurrency.Mutex, error) {
mu := concurrency.NewMutex(se, key)
err := mu.Lock(ctx)
if err != nil {
return nil, err
}
return mu, nil
}

// Flush implements FlushController.
func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err error) {
shouldFlush, shouldImport := bc.checkFlush(mode)
Expand All @@ -193,28 +184,14 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err erro
return true, false, nil
}

// Use distributed lock if run in distributed mode).
if bc.etcdClient != nil {
distLockKey := fmt.Sprintf("/tidb/distributeLock/%d", bc.jobID)
se, _ := concurrency.NewSession(bc.etcdClient)
mu, err := acquireLock(bc.ctx, se, distLockKey)
cleanup, err := acquireDistributedLock(bc.ctx, bc.etcdClient, bc.jobID)
if err != nil {
return true, false, errors.Trace(err)
return true, false, err
}
logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID))
defer func() {
err = mu.Unlock(bc.ctx)
if err != nil {
logutil.Logger(bc.ctx).Warn("release distributed flush lock error", zap.Error(err), zap.Int64("jobID", bc.jobID))
} else {
logutil.Logger(bc.ctx).Info("release distributed flush lock success", zap.Int64("jobID", bc.jobID))
}
err = se.Close()
if err != nil {
logutil.Logger(bc.ctx).Warn("close session error", zap.Error(err))
}
}()
defer cleanup()
}

failpoint.Inject("mockDMLExecutionStateBeforeImport", func(_ failpoint.Value) {
if MockDMLExecutionStateBeforeImport != nil {
MockDMLExecutionStateBeforeImport()
Expand Down Expand Up @@ -257,6 +234,39 @@ func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, err erro
return true, true, err
}

const distributedLockLease = 10 // Seconds

func acquireDistributedLock(ctx context.Context, cli *clientv3.Client, jobID int64) (cleanup func(), err error) {
leaseGrantResp, err := cli.Grant(ctx, distributedLockLease)
if err != nil {
return nil, errors.Trace(err)
}
distLockKey := fmt.Sprintf("/tidb/distributeLock/%d", jobID)
se, _ := concurrency.NewSession(cli, concurrency.WithLease(leaseGrantResp.ID))
mu := concurrency.NewMutex(se, distLockKey)
err = mu.Lock(ctx)
if err != nil {
return nil, err
}
logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", jobID))
return func() {
err = mu.Unlock(ctx)
if err != nil {
logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.Int64("jobID", jobID))
} else {
logutil.Logger(ctx).Info("release distributed flush lock success", zap.Int64("jobID", jobID))
}
err = se.Close()
if err != nil {
logutil.Logger(ctx).Warn("close session error", zap.Error(err))
}
_, err := cli.Revoke(ctx, leaseGrantResp.ID)
if err != nil {
logutil.Logger(ctx).Warn("revoke lease error", zap.Error(err))
}
}, nil
}

func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
logger := log.FromContext(bc.ctx).With(
zap.Stringer("engineUUID", ei.uuid),
Expand Down