Skip to content

Commit 954d306

Browse files
authored
ddl: add lease for ingest distributed lock (#56184) (#59265)
close #55917
1 parent a8e02a8 commit 954d306

File tree

5 files changed

+156
-30
lines changed

5 files changed

+156
-30
lines changed

pkg/ddl/ingest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ go_library(
3636
"//pkg/lightning/log",
3737
"//pkg/meta",
3838
"//pkg/metrics",
39+
"//pkg/owner",
3940
"//pkg/parser/mysql",
4041
"//pkg/parser/terror",
4142
"//pkg/sessionctx",
@@ -54,7 +55,6 @@ go_library(
5455
"@com_github_tikv_client_go_v2//util",
5556
"@com_github_tikv_pd_client//:client",
5657
"@io_etcd_go_etcd_client_v3//:client",
57-
"@io_etcd_go_etcd_client_v3//concurrency",
5858
"@org_golang_x_exp//maps",
5959
"@org_uber_go_atomic//:atomic",
6060
"@org_uber_go_zap//:zap",

pkg/ddl/ingest/backend.go

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import (
2929
lightning "github.com/pingcap/tidb/pkg/lightning/config"
3030
"github.com/pingcap/tidb/pkg/lightning/errormanager"
3131
"github.com/pingcap/tidb/pkg/lightning/log"
32+
"github.com/pingcap/tidb/pkg/owner"
3233
"github.com/pingcap/tidb/pkg/parser/mysql"
3334
"github.com/pingcap/tidb/pkg/parser/terror"
3435
"github.com/pingcap/tidb/pkg/table"
3536
"github.com/pingcap/tidb/pkg/util/dbterror"
3637
"github.com/pingcap/tidb/pkg/util/generic"
3738
"github.com/pingcap/tidb/pkg/util/logutil"
3839
clientv3 "go.etcd.io/etcd/client/v3"
39-
"go.etcd.io/etcd/client/v3/concurrency"
4040
atomicutil "go.uber.org/atomic"
4141
"go.uber.org/zap"
4242
)
@@ -169,15 +169,6 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
169169
return nil
170170
}
171171

172-
func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*concurrency.Mutex, error) {
173-
mu := concurrency.NewMutex(se, key)
174-
err := mu.Lock(ctx)
175-
if err != nil {
176-
return nil, err
177-
}
178-
return mu, nil
179-
}
180-
181172
// Flush checks the disk quota and imports the current key-values in engine to the storage.
182173
func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) {
183174
ei, exist := bc.Load(indexID)
@@ -207,27 +198,15 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
207198
return true, false, nil
208199
}
209200

210-
// Use distributed lock if run in distributed mode).
211201
if bc.etcdClient != nil {
212-
distLockKey := fmt.Sprintf("/tidb/distributeLock/%d/%d", bc.jobID, indexID)
213-
se, _ := concurrency.NewSession(bc.etcdClient)
214-
mu, err := acquireLock(bc.ctx, se, distLockKey)
202+
key := fmt.Sprintf("/tidb/distributeLock/%d", bc.jobID)
203+
release, err := owner.AcquireDistributedLock(bc.ctx, bc.etcdClient, key, 10)
215204
if err != nil {
216-
return true, false, errors.Trace(err)
205+
return true, false, err
206+
}
207+
if release != nil {
208+
defer release()
217209
}
218-
logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID))
219-
defer func() {
220-
err = mu.Unlock(bc.ctx)
221-
if err != nil {
222-
logutil.Logger(bc.ctx).Warn("release distributed flush lock error", zap.Error(err), zap.Int64("jobID", bc.jobID))
223-
} else {
224-
logutil.Logger(bc.ctx).Info("release distributed flush lock success", zap.Int64("jobID", bc.jobID))
225-
}
226-
err = se.Close()
227-
if err != nil {
228-
logutil.Logger(bc.ctx).Warn("close session error", zap.Error(err))
229-
}
230-
}()
231210
}
232211
failpoint.Inject("mockDMLExecutionStateBeforeImport", func(_ failpoint.Value) {
233212
if MockDMLExecutionStateBeforeImport != nil {
@@ -242,6 +221,8 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported
242221
return true, true, nil
243222
}
244223

224+
const distributedLockLease = 10 // Seconds
225+
245226
func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
246227
logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID),
247228
zap.String("usage info", bc.diskRoot.UsageInfo()))

pkg/owner/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ go_test(
3737
],
3838
embed = [":owner"],
3939
flaky = True,
40-
shard_count = 7,
40+
shard_count = 8,
4141
deps = [
4242
"//pkg/ddl",
4343
"//pkg/infoschema",
@@ -54,6 +54,7 @@ go_test(
5454
"@com_github_stretchr_testify//require",
5555
"@io_etcd_go_etcd_client_v3//:client",
5656
"@io_etcd_go_etcd_client_v3//concurrency",
57+
"@io_etcd_go_etcd_server_v3//embed",
5758
"@io_etcd_go_etcd_tests_v3//integration",
5859
"@org_golang_google_grpc//:grpc",
5960
"@org_uber_go_goleak//:goleak",

pkg/owner/manager.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,38 @@ func init() {
490490
logutil.BgLogger().Warn("set manager session TTL failed", zap.Error(err))
491491
}
492492
}
493+
494+
// AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function.
495+
func AcquireDistributedLock(
496+
ctx context.Context,
497+
cli *clientv3.Client,
498+
key string,
499+
ttlInSec int,
500+
) (release func(), err error) {
501+
se, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlInSec))
502+
if err != nil {
503+
return nil, err
504+
}
505+
mu := concurrency.NewMutex(se, key)
506+
err = mu.Lock(ctx)
507+
if err != nil {
508+
err1 := se.Close()
509+
if err1 != nil {
510+
logutil.Logger(ctx).Warn("close session error", zap.Error(err1))
511+
}
512+
return nil, err
513+
}
514+
logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key))
515+
return func() {
516+
err = mu.Unlock(ctx)
517+
if err != nil {
518+
logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.String("key", key))
519+
} else {
520+
logutil.Logger(ctx).Info("release distributed flush lock success", zap.String("key", key))
521+
}
522+
err = se.Close()
523+
if err != nil {
524+
logutil.Logger(ctx).Warn("close session error", zap.Error(err))
525+
}
526+
}, nil
527+
}

pkg/owner/manager_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package owner_test
1717
import (
1818
"context"
1919
"fmt"
20+
"math/rand"
21+
"net/url"
2022
"runtime"
2123
"testing"
2224
"time"
@@ -30,10 +32,12 @@ import (
3032
"github.com/pingcap/tidb/pkg/parser/terror"
3133
"github.com/pingcap/tidb/pkg/store/mockstore"
3234
"github.com/pingcap/tidb/pkg/testkit"
35+
"github.com/pingcap/tidb/pkg/util"
3336
"github.com/pingcap/tidb/pkg/util/logutil"
3437
"github.com/stretchr/testify/require"
3538
clientv3 "go.etcd.io/etcd/client/v3"
3639
"go.etcd.io/etcd/client/v3/concurrency"
40+
"go.etcd.io/etcd/server/v3/embed"
3741
"go.etcd.io/etcd/tests/v3/integration"
3842
)
3943

@@ -413,3 +417,108 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
413417
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
414418
return errors.Trace(err)
415419
}
420+
421+
func TestAcquireDistributedLock(t *testing.T) {
422+
const addrFmt = "http://127.0.0.1:%d"
423+
cfg := embed.NewConfig()
424+
cfg.Dir = t.TempDir()
425+
// rand port in [20000, 60000)
426+
randPort := int(rand.Int31n(40000)) + 20000
427+
clientAddr := fmt.Sprintf(addrFmt, randPort)
428+
lcurl, _ := url.Parse(clientAddr)
429+
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
430+
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
431+
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
432+
cfg.InitialCluster = "default=" + lpurl.String()
433+
cfg.Logger = "zap"
434+
embedEtcd, err := embed.StartEtcd(cfg)
435+
require.NoError(t, err)
436+
<-embedEtcd.Server.ReadyNotify()
437+
t.Cleanup(func() {
438+
embedEtcd.Close()
439+
})
440+
makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) {
441+
cli, err := clientv3.New(clientv3.Config{
442+
Endpoints: []string{lcurl.String()},
443+
})
444+
require.NoError(t, err)
445+
t.Cleanup(func() {
446+
cli.Close()
447+
})
448+
return cli
449+
}
450+
t.Run("acquire distributed lock with same client", func(t *testing.T) {
451+
cli := makeEtcdCli(t)
452+
getLock := make(chan struct{})
453+
ctx := context.Background()
454+
455+
release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
456+
require.NoError(t, err)
457+
var wg util.WaitGroupWrapper
458+
wg.Run(func() {
459+
// Acquire another distributed lock will be blocked.
460+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
461+
require.NoError(t, err)
462+
getLock <- struct{}{}
463+
release2()
464+
})
465+
timer := time.NewTimer(300 * time.Millisecond)
466+
select {
467+
case <-getLock:
468+
require.FailNow(t, "acquired same lock unexpectedly")
469+
case <-timer.C:
470+
release1()
471+
<-getLock
472+
}
473+
wg.Wait()
474+
475+
release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10)
476+
require.NoError(t, err)
477+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10)
478+
require.NoError(t, err)
479+
release1()
480+
release2()
481+
})
482+
483+
t.Run("acquire distributed lock with different clients", func(t *testing.T) {
484+
cli1 := makeEtcdCli(t)
485+
cli2 := makeEtcdCli(t)
486+
487+
getLock := make(chan struct{})
488+
ctx := context.Background()
489+
490+
release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10)
491+
require.NoError(t, err)
492+
var wg util.WaitGroupWrapper
493+
wg.Run(func() {
494+
// Acquire another distributed lock will be blocked.
495+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
496+
require.NoError(t, err)
497+
getLock <- struct{}{}
498+
release2()
499+
})
500+
timer := time.NewTimer(300 * time.Millisecond)
501+
select {
502+
case <-getLock:
503+
require.FailNow(t, "acquired same lock unexpectedly")
504+
case <-timer.C:
505+
release1()
506+
<-getLock
507+
}
508+
wg.Wait()
509+
})
510+
511+
t.Run("acquire distributed lock until timeout", func(t *testing.T) {
512+
cli1 := makeEtcdCli(t)
513+
cli2 := makeEtcdCli(t)
514+
ctx := context.Background()
515+
516+
_, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1)
517+
require.NoError(t, err)
518+
cli1.Close() // Note that release() is not invoked.
519+
520+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
521+
require.NoError(t, err)
522+
release2()
523+
})
524+
}

0 commit comments

Comments
 (0)