Skip to content

Commit ab74543

Browse files
fishiuti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#56362
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 0627e65 commit ab74543

File tree

3 files changed

+133
-4
lines changed

3 files changed

+133
-4
lines changed

owner/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ go_test(
3535
],
3636
embed = [":owner"],
3737
flaky = True,
38+
<<<<<<< HEAD:owner/BUILD.bazel
3839
shard_count = 6,
40+
=======
41+
shard_count = 9,
42+
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/BUILD.bazel
3943
deps = [
4044
"//ddl",
4145
"//infoschema",

owner/manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error {
186186
}
187187
m.sessionLease.Store(int64(session.Lease()))
188188
m.wg.Add(1)
189-
go m.campaignLoop(session)
189+
var campaignContext context.Context
190+
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
191+
go m.campaignLoop(campaignContext, session)
190192
return nil
191193
}
192194

@@ -226,9 +228,7 @@ func (m *ownerManager) CampaignCancel() {
226228
m.wg.Wait()
227229
}
228230

229-
func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
230-
var campaignContext context.Context
231-
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
231+
func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) {
232232
defer func() {
233233
m.campaignCancel()
234234
if r := recover(); r != nil {

owner/manager_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,3 +379,128 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
379379
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
380380
return errors.Trace(err)
381381
}
382+
<<<<<<< HEAD:owner/manager_test.go
383+
=======
384+
385+
func TestImmediatelyCancel(t *testing.T) {
386+
if runtime.GOOS == "windows" {
387+
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
388+
}
389+
integration.BeforeTestExternal(t)
390+
391+
tInfo := newTestInfo(t)
392+
d := tInfo.ddl
393+
defer tInfo.Close(t)
394+
ownerManager := d.OwnerManager()
395+
for i := 0; i < 10; i++ {
396+
err := ownerManager.CampaignOwner()
397+
require.NoError(t, err)
398+
ownerManager.CampaignCancel()
399+
}
400+
}
401+
402+
func TestAcquireDistributedLock(t *testing.T) {
403+
const addrFmt = "http://127.0.0.1:%d"
404+
cfg := embed.NewConfig()
405+
cfg.Dir = t.TempDir()
406+
// rand port in [20000, 60000)
407+
randPort := int(rand.Int31n(40000)) + 20000
408+
clientAddr := fmt.Sprintf(addrFmt, randPort)
409+
lcurl, _ := url.Parse(clientAddr)
410+
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
411+
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
412+
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
413+
cfg.InitialCluster = "default=" + lpurl.String()
414+
cfg.Logger = "zap"
415+
embedEtcd, err := embed.StartEtcd(cfg)
416+
require.NoError(t, err)
417+
<-embedEtcd.Server.ReadyNotify()
418+
t.Cleanup(func() {
419+
embedEtcd.Close()
420+
})
421+
makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) {
422+
cli, err := clientv3.New(clientv3.Config{
423+
Endpoints: []string{lcurl.String()},
424+
})
425+
require.NoError(t, err)
426+
t.Cleanup(func() {
427+
cli.Close()
428+
})
429+
return cli
430+
}
431+
t.Run("acquire distributed lock with same client", func(t *testing.T) {
432+
cli := makeEtcdCli(t)
433+
getLock := make(chan struct{})
434+
ctx := context.Background()
435+
436+
release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
437+
require.NoError(t, err)
438+
var wg util.WaitGroupWrapper
439+
wg.Run(func() {
440+
// Acquire another distributed lock will be blocked.
441+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
442+
require.NoError(t, err)
443+
getLock <- struct{}{}
444+
release2()
445+
})
446+
timer := time.NewTimer(300 * time.Millisecond)
447+
select {
448+
case <-getLock:
449+
require.FailNow(t, "acquired same lock unexpectedly")
450+
case <-timer.C:
451+
release1()
452+
<-getLock
453+
}
454+
wg.Wait()
455+
456+
release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10)
457+
require.NoError(t, err)
458+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10)
459+
require.NoError(t, err)
460+
release1()
461+
release2()
462+
})
463+
464+
t.Run("acquire distributed lock with different clients", func(t *testing.T) {
465+
cli1 := makeEtcdCli(t)
466+
cli2 := makeEtcdCli(t)
467+
468+
getLock := make(chan struct{})
469+
ctx := context.Background()
470+
471+
release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10)
472+
require.NoError(t, err)
473+
var wg util.WaitGroupWrapper
474+
wg.Run(func() {
475+
// Acquire another distributed lock will be blocked.
476+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
477+
require.NoError(t, err)
478+
getLock <- struct{}{}
479+
release2()
480+
})
481+
timer := time.NewTimer(300 * time.Millisecond)
482+
select {
483+
case <-getLock:
484+
require.FailNow(t, "acquired same lock unexpectedly")
485+
case <-timer.C:
486+
release1()
487+
<-getLock
488+
}
489+
wg.Wait()
490+
})
491+
492+
t.Run("acquire distributed lock until timeout", func(t *testing.T) {
493+
cli1 := makeEtcdCli(t)
494+
cli2 := makeEtcdCli(t)
495+
ctx := context.Background()
496+
497+
_, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1)
498+
require.NoError(t, err)
499+
cli1.Close() // Note that release() is not invoked.
500+
501+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
502+
require.NoError(t, err)
503+
release2()
504+
})
505+
}
506+
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362)):pkg/owner/manager_test.go

0 commit comments

Comments
 (0)