Skip to content

Commit d7fe7d6

Browse files
fishiuti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#56362
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 849dcc0 commit d7fe7d6

File tree

3 files changed

+133
-4
lines changed

3 files changed

+133
-4
lines changed

pkg/owner/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ go_test(
3737
],
3838
embed = [":owner"],
3939
flaky = True,
40+
<<<<<<< HEAD
4041
shard_count = 5,
42+
=======
43+
shard_count = 9,
44+
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362))
4145
deps = [
4246
"//pkg/ddl",
4347
"//pkg/infoschema",

pkg/owner/manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,9 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error {
192192
}
193193
m.sessionLease.Store(int64(session.Lease()))
194194
m.wg.Add(1)
195-
go m.campaignLoop(session)
195+
var campaignContext context.Context
196+
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
197+
go m.campaignLoop(campaignContext, session)
196198
return nil
197199
}
198200

@@ -232,9 +234,7 @@ func (m *ownerManager) CampaignCancel() {
232234
m.wg.Wait()
233235
}
234236

235-
func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
236-
var campaignContext context.Context
237-
campaignContext, m.campaignCancel = context.WithCancel(m.ctx)
237+
func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) {
238238
defer func() {
239239
m.campaignCancel()
240240
if r := recover(); r != nil {

pkg/owner/manager_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,3 +313,128 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
313313
_, err = cli.Delete(context.Background(), string(resp.Kvs[0].Key))
314314
return errors.Trace(err)
315315
}
316+
<<<<<<< HEAD
317+
=======
318+
319+
func TestImmediatelyCancel(t *testing.T) {
320+
if runtime.GOOS == "windows" {
321+
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
322+
}
323+
integration.BeforeTestExternal(t)
324+
325+
tInfo := newTestInfo(t)
326+
d := tInfo.ddl
327+
defer tInfo.Close(t)
328+
ownerManager := d.OwnerManager()
329+
for i := 0; i < 10; i++ {
330+
err := ownerManager.CampaignOwner()
331+
require.NoError(t, err)
332+
ownerManager.CampaignCancel()
333+
}
334+
}
335+
336+
func TestAcquireDistributedLock(t *testing.T) {
337+
const addrFmt = "http://127.0.0.1:%d"
338+
cfg := embed.NewConfig()
339+
cfg.Dir = t.TempDir()
340+
// rand port in [20000, 60000)
341+
randPort := int(rand.Int31n(40000)) + 20000
342+
clientAddr := fmt.Sprintf(addrFmt, randPort)
343+
lcurl, _ := url.Parse(clientAddr)
344+
cfg.ListenClientUrls, cfg.AdvertiseClientUrls = []url.URL{*lcurl}, []url.URL{*lcurl}
345+
lpurl, _ := url.Parse(fmt.Sprintf(addrFmt, randPort+1))
346+
cfg.ListenPeerUrls, cfg.AdvertisePeerUrls = []url.URL{*lpurl}, []url.URL{*lpurl}
347+
cfg.InitialCluster = "default=" + lpurl.String()
348+
cfg.Logger = "zap"
349+
embedEtcd, err := embed.StartEtcd(cfg)
350+
require.NoError(t, err)
351+
<-embedEtcd.Server.ReadyNotify()
352+
t.Cleanup(func() {
353+
embedEtcd.Close()
354+
})
355+
makeEtcdCli := func(t *testing.T) (cli *clientv3.Client) {
356+
cli, err := clientv3.New(clientv3.Config{
357+
Endpoints: []string{lcurl.String()},
358+
})
359+
require.NoError(t, err)
360+
t.Cleanup(func() {
361+
cli.Close()
362+
})
363+
return cli
364+
}
365+
t.Run("acquire distributed lock with same client", func(t *testing.T) {
366+
cli := makeEtcdCli(t)
367+
getLock := make(chan struct{})
368+
ctx := context.Background()
369+
370+
release1, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
371+
require.NoError(t, err)
372+
var wg util.WaitGroupWrapper
373+
wg.Run(func() {
374+
// Acquire another distributed lock will be blocked.
375+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock", 10)
376+
require.NoError(t, err)
377+
getLock <- struct{}{}
378+
release2()
379+
})
380+
timer := time.NewTimer(300 * time.Millisecond)
381+
select {
382+
case <-getLock:
383+
require.FailNow(t, "acquired same lock unexpectedly")
384+
case <-timer.C:
385+
release1()
386+
<-getLock
387+
}
388+
wg.Wait()
389+
390+
release1, err = owner.AcquireDistributedLock(ctx, cli, "test-lock/1", 10)
391+
require.NoError(t, err)
392+
release2, err := owner.AcquireDistributedLock(ctx, cli, "test-lock/2", 10)
393+
require.NoError(t, err)
394+
release1()
395+
release2()
396+
})
397+
398+
t.Run("acquire distributed lock with different clients", func(t *testing.T) {
399+
cli1 := makeEtcdCli(t)
400+
cli2 := makeEtcdCli(t)
401+
402+
getLock := make(chan struct{})
403+
ctx := context.Background()
404+
405+
release1, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 10)
406+
require.NoError(t, err)
407+
var wg util.WaitGroupWrapper
408+
wg.Run(func() {
409+
// Acquire another distributed lock will be blocked.
410+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
411+
require.NoError(t, err)
412+
getLock <- struct{}{}
413+
release2()
414+
})
415+
timer := time.NewTimer(300 * time.Millisecond)
416+
select {
417+
case <-getLock:
418+
require.FailNow(t, "acquired same lock unexpectedly")
419+
case <-timer.C:
420+
release1()
421+
<-getLock
422+
}
423+
wg.Wait()
424+
})
425+
426+
t.Run("acquire distributed lock until timeout", func(t *testing.T) {
427+
cli1 := makeEtcdCli(t)
428+
cli2 := makeEtcdCli(t)
429+
ctx := context.Background()
430+
431+
_, err := owner.AcquireDistributedLock(ctx, cli1, "test-lock", 1)
432+
require.NoError(t, err)
433+
cli1.Close() // Note that release() is not invoked.
434+
435+
release2, err := owner.AcquireDistributedLock(ctx, cli2, "test-lock", 10)
436+
require.NoError(t, err)
437+
release2()
438+
})
439+
}
440+
>>>>>>> afdd5c2ecd5 (owner: fix data race on ownerManager.campaignCancel (#56362))

0 commit comments

Comments
 (0)