@@ -428,3 +428,128 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error {
428
428
_ , err = cli .Delete (context .Background (), string (resp .Kvs [0 ].Key ))
429
429
return errors .Trace (err )
430
430
}
431
+ << << << < HEAD
432
+ == == == =
433
+
434
+ func TestImmediatelyCancel (t * testing.T ) {
435
+ if runtime .GOOS == "windows" {
436
+ t .Skip ("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows" )
437
+ }
438
+ integration .BeforeTestExternal (t )
439
+
440
+ tInfo := newTestInfo (t )
441
+ d := tInfo .ddl
442
+ defer tInfo .Close (t )
443
+ ownerManager := d .OwnerManager ()
444
+ for i := 0 ; i < 10 ; i ++ {
445
+ err := ownerManager .CampaignOwner ()
446
+ require .NoError (t , err )
447
+ ownerManager .CampaignCancel ()
448
+ }
449
+ }
450
+
451
+ func TestAcquireDistributedLock (t * testing.T ) {
452
+ const addrFmt = "http://127.0.0.1:%d"
453
+ cfg := embed .NewConfig ()
454
+ cfg .Dir = t .TempDir ()
455
+ // rand port in [20000, 60000)
456
+ randPort := int (rand .Int31n (40000 )) + 20000
457
+ clientAddr := fmt .Sprintf (addrFmt , randPort )
458
+ lcurl , _ := url .Parse (clientAddr )
459
+ cfg .ListenClientUrls , cfg .AdvertiseClientUrls = []url.URL {* lcurl }, []url.URL {* lcurl }
460
+ lpurl , _ := url .Parse (fmt .Sprintf (addrFmt , randPort + 1 ))
461
+ cfg .ListenPeerUrls , cfg .AdvertisePeerUrls = []url.URL {* lpurl }, []url.URL {* lpurl }
462
+ cfg .InitialCluster = "default=" + lpurl .String ()
463
+ cfg .Logger = "zap"
464
+ embedEtcd , err := embed .StartEtcd (cfg )
465
+ require .NoError (t , err )
466
+ <- embedEtcd .Server .ReadyNotify ()
467
+ t .Cleanup (func () {
468
+ embedEtcd .Close ()
469
+ })
470
+ makeEtcdCli := func (t * testing.T ) (cli * clientv3.Client ) {
471
+ cli , err := clientv3 .New (clientv3.Config {
472
+ Endpoints : []string {lcurl .String ()},
473
+ })
474
+ require .NoError (t , err )
475
+ t .Cleanup (func () {
476
+ cli .Close ()
477
+ })
478
+ return cli
479
+ }
480
+ t .Run ("acquire distributed lock with same client" , func (t * testing.T ) {
481
+ cli := makeEtcdCli (t )
482
+ getLock := make (chan struct {})
483
+ ctx := context .Background ()
484
+
485
+ release1 , err := owner .AcquireDistributedLock (ctx , cli , "test-lock" , 10 )
486
+ require .NoError (t , err )
487
+ var wg util.WaitGroupWrapper
488
+ wg .Run (func () {
489
+ // Acquire another distributed lock will be blocked.
490
+ release2 , err := owner .AcquireDistributedLock (ctx , cli , "test-lock" , 10 )
491
+ require .NoError (t , err )
492
+ getLock <- struct {}{}
493
+ release2 ()
494
+ })
495
+ timer := time .NewTimer (300 * time .Millisecond )
496
+ select {
497
+ case <- getLock :
498
+ require .FailNow (t , "acquired same lock unexpectedly" )
499
+ case <- timer .C :
500
+ release1 ()
501
+ <- getLock
502
+ }
503
+ wg .Wait ()
504
+
505
+ release1 , err = owner .AcquireDistributedLock (ctx , cli , "test-lock/1" , 10 )
506
+ require .NoError (t , err )
507
+ release2 , err := owner .AcquireDistributedLock (ctx , cli , "test-lock/2" , 10 )
508
+ require .NoError (t , err )
509
+ release1 ()
510
+ release2 ()
511
+ })
512
+
513
+ t .Run ("acquire distributed lock with different clients" , func (t * testing.T ) {
514
+ cli1 := makeEtcdCli (t )
515
+ cli2 := makeEtcdCli (t )
516
+
517
+ getLock := make (chan struct {})
518
+ ctx := context .Background ()
519
+
520
+ release1 , err := owner .AcquireDistributedLock (ctx , cli1 , "test-lock" , 10 )
521
+ require .NoError (t , err )
522
+ var wg util.WaitGroupWrapper
523
+ wg .Run (func () {
524
+ // Acquire another distributed lock will be blocked.
525
+ release2 , err := owner .AcquireDistributedLock (ctx , cli2 , "test-lock" , 10 )
526
+ require .NoError (t , err )
527
+ getLock <- struct {}{}
528
+ release2 ()
529
+ })
530
+ timer := time .NewTimer (300 * time .Millisecond )
531
+ select {
532
+ case <- getLock :
533
+ require .FailNow (t , "acquired same lock unexpectedly" )
534
+ case <- timer .C :
535
+ release1 ()
536
+ <- getLock
537
+ }
538
+ wg .Wait ()
539
+ })
540
+
541
+ t .Run ("acquire distributed lock until timeout" , func (t * testing.T ) {
542
+ cli1 := makeEtcdCli (t )
543
+ cli2 := makeEtcdCli (t )
544
+ ctx := context .Background ()
545
+
546
+ _ , err := owner .AcquireDistributedLock (ctx , cli1 , "test-lock" , 1 )
547
+ require .NoError (t , err )
548
+ cli1 .Close () // Note that release() is not invoked.
549
+
550
+ release2 , err := owner .AcquireDistributedLock (ctx , cli2 , "test-lock" , 10 )
551
+ require .NoError (t , err )
552
+ release2 ()
553
+ })
554
+ }
555
+ >> >> >> > afdd5c2ecd5 (owner : fix data race on ownerManager.campaignCancel (#56362 ))
0 commit comments