Skip to content

Commit 11f9b24

Browse files
committed
Order mixclient and syncer shutdown
During clean shutdown, signal the mixing client to shutdown first, waiting for it to finish running, before closing the RPC and SPV syncers. Updates the mixing module to a version that supports continuing active mixes before terminating the mixing client.
1 parent 447644e commit 11f9b24

File tree

5 files changed

+92
-54
lines changed

5 files changed

+92
-54
lines changed

chain/sync.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
542542

543543
params := s.wallet.ChainParams()
544544

545+
ntfnCtx, ntfnCtxCancel := context.WithCancel(context.Background())
546+
defer ntfnCtxCancel()
545547
s.notifier = &notifier{
546548
syncer: s,
547-
ctx: ctx,
549+
ctx: ntfnCtx,
548550
closed: make(chan struct{}),
549551
}
550552
addr, err := normalizeAddress(s.opts.Address, s.opts.DefaultPort)
@@ -589,12 +591,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
589591
}
590592
opts = append(opts, wsrpc.WithTLSConfig(tc))
591593
}
592-
client, err := wsrpc.Dial(ctx, addr, opts...)
594+
wsClient, err := wsrpc.Dial(ctx, addr, opts...)
593595
if err != nil {
594596
return err
595597
}
596-
defer client.Close()
597-
s.rpc = dcrd.New(client)
598+
defer wsClient.Close()
599+
s.rpc = dcrd.New(wsClient)
598600

599601
// Verify that the server is running on the expected network.
600602
var netID wire.CurrencyNet
@@ -723,10 +725,27 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
723725
return err
724726
}
725727

728+
defer func() {
729+
ntfnCtxCancel()
730+
731+
select {
732+
case <-ctx.Done():
733+
wsClient.Close()
734+
default:
735+
}
736+
737+
// Wait for notifications to finish before returning
738+
<-s.notifier.closed
739+
}()
740+
741+
// Ensure wallet.Run cleanly finishes/is canceled first when outer
742+
// context is canceled.
743+
walletCtx, walletCtxCancel := context.WithCancel(context.Background())
744+
defer walletCtxCancel()
726745
g.Go(func() error {
727746
// Run wallet background goroutines (currently, this just runs
728747
// mixclient).
729-
return s.wallet.Run(ctx)
748+
return s.wallet.Run(walletCtx)
730749
})
731750

732751
// Request notifications for mixing messages.
@@ -739,18 +758,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
739758

740759
log.Infof("Blockchain sync completed, wallet ready for general usage.")
741760

742-
// Wait for notifications to finish before returning
743-
defer func() {
744-
<-s.notifier.closed
745-
}()
746-
747761
g.Go(func() error {
748762
select {
749763
case <-ctx.Done():
750-
client.Close()
764+
walletCtxCancel()
751765
return ctx.Err()
752-
case <-client.Done():
753-
return client.Err()
766+
case <-wsClient.Done():
767+
return wsClient.Err()
754768
}
755769
})
756770
return g.Wait()

dcrwallet.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,10 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) {
534534
for {
535535
err := syncer.Run(ctx)
536536
if done(ctx) {
537+
loggers.SyncLog.Infof("SPV synchronization stopped")
537538
return
538539
}
539-
log.Errorf("SPV synchronization ended: %v", err)
540+
loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err)
540541
}
541542
}
542543

@@ -571,7 +572,11 @@ func rpcSyncLoop(ctx context.Context, w *wallet.Wallet) {
571572
syncer := chain.NewSyncer(w, rpcOptions)
572573
err := syncer.Run(ctx)
573574
if err != nil {
574-
loggers.SyncLog.Errorf("Wallet synchronization stopped: %v", err)
575+
if errors.Is(err, context.Canceled) || ctx.Err() != nil {
576+
loggers.SyncLog.Infof("RPC synchronization stopped")
577+
return
578+
}
579+
loggers.SyncLog.Errorf("RPC synchronization stopped: %v", err)
575580
select {
576581
case <-ctx.Done():
577582
return

go.mod

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module decred.org/dcrwallet/v5
33
go 1.23
44

55
require (
6-
decred.org/cspp/v2 v2.3.0
6+
decred.org/cspp/v2 v2.4.0
77
github.com/decred/dcrd/addrmgr/v2 v2.0.4
88
github.com/decred/dcrd/blockchain/stake/v5 v5.0.1
99
github.com/decred/dcrd/blockchain/standalone/v2 v2.2.1
@@ -13,15 +13,15 @@ require (
1313
github.com/decred/dcrd/chaincfg/v3 v3.2.1
1414
github.com/decred/dcrd/connmgr/v3 v3.1.2
1515
github.com/decred/dcrd/crypto/blake256 v1.1.0
16-
github.com/decred/dcrd/crypto/rand v1.0.0
16+
github.com/decred/dcrd/crypto/rand v1.0.1
1717
github.com/decred/dcrd/crypto/ripemd160 v1.0.2
1818
github.com/decred/dcrd/dcrec v1.0.1
1919
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0
2020
github.com/decred/dcrd/dcrjson/v4 v4.1.0
2121
github.com/decred/dcrd/dcrutil/v4 v4.0.2
2222
github.com/decred/dcrd/gcs/v4 v4.1.0
2323
github.com/decred/dcrd/hdkeychain/v3 v3.1.2
24-
github.com/decred/dcrd/mixing v0.4.2
24+
github.com/decred/dcrd/mixing v0.5.0
2525
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0
2626
github.com/decred/dcrd/rpcclient/v8 v8.0.1
2727
github.com/decred/dcrd/txscript/v4 v4.1.1
@@ -36,9 +36,9 @@ require (
3636
github.com/jrick/logrotate v1.0.0
3737
github.com/jrick/wsrpc/v2 v2.3.8
3838
go.etcd.io/bbolt v1.3.11
39-
golang.org/x/crypto v0.31.0
40-
golang.org/x/sync v0.10.0
41-
golang.org/x/term v0.27.0
39+
golang.org/x/crypto v0.33.0
40+
golang.org/x/sync v0.11.0
41+
golang.org/x/term v0.29.0
4242
google.golang.org/grpc v1.67.3
4343
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1
4444
google.golang.org/protobuf v1.36.1
@@ -54,8 +54,8 @@ require (
5454
github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect
5555
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
5656
golang.org/x/net v0.33.0 // indirect
57-
golang.org/x/sys v0.28.0 // indirect
58-
golang.org/x/text v0.21.0 // indirect
57+
golang.org/x/sys v0.30.0 // indirect
58+
golang.org/x/text v0.22.0 // indirect
5959
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
6060
lukechampine.com/blake3 v1.3.0 // indirect
6161
)

go.sum

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
decred.org/cspp/v2 v2.3.0 h1:GC8emJnLbOVAkgBTHK/1wy6o/m0AVsN1r4m1ZnZZWjo=
2-
decred.org/cspp/v2 v2.3.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA=
1+
decred.org/cspp/v2 v2.4.0 h1:whb0YW+UELHJS/UfT5MBXSJXrKUVw5omhgKNhjzYix4=
2+
decred.org/cspp/v2 v2.4.0/go.mod h1:9nO3bfvCheOPIFZw5f6sRQ42CjBFB5RKSaJ9Iq6G4MA=
33
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
44
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
55
github.com/companyzero/sntrup4591761 v0.0.0-20220309191932-9e0f3af2f07a h1:clYxJ3Os0EQUKDDVU8M0oipllX0EkuFNBfhVQuIfyF0=
@@ -30,8 +30,8 @@ github.com/decred/dcrd/container/lru v1.0.0 h1:7foQymtbu18aQWYiY9RnNIeE+kvpiN+fi
3030
github.com/decred/dcrd/container/lru v1.0.0/go.mod h1:vlPwj0l+IzAHhQSsbgQnJgO5Cte78+yI065V+Mc5PRQ=
3131
github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8=
3232
github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo=
33-
github.com/decred/dcrd/crypto/rand v1.0.0 h1:Ah9Asl36OZt09sGSMbJZuL1HfwGdlC38q/ZUeLDVKRg=
34-
github.com/decred/dcrd/crypto/rand v1.0.0/go.mod h1:coa7BbxSTiKH6esi257plGfMFYuGL4MTbQlLYnOdzpE=
33+
github.com/decred/dcrd/crypto/rand v1.0.1 h1:pYMgDRmRv1z1RNgAAs8izJstm4B+fLFiqGD5btOt2Wg=
34+
github.com/decred/dcrd/crypto/rand v1.0.1/go.mod h1:MsA2XySk/4KpCOYW6vsNYTGuOYRK1wpvulaWCuW7RyI=
3535
github.com/decred/dcrd/crypto/ripemd160 v1.0.2 h1:TvGTmUBHDU75OHro9ojPLK+Yv7gDl2hnUvRocRCjsys=
3636
github.com/decred/dcrd/crypto/ripemd160 v1.0.2/go.mod h1:uGfjDyePSpa75cSQLzNdVmWlbQMBuiJkvXw/MNKRY4M=
3737
github.com/decred/dcrd/database/v3 v3.0.2 h1:rgP7XNZemTs8ZC7bnTKO8JO79Woj5nq+yQYmB9ry7yM=
@@ -50,8 +50,8 @@ github.com/decred/dcrd/gcs/v4 v4.1.0 h1:tpW7JW53yJZlgNwl/n2NL1b8NxHaIPRUyNuLMkB/
5050
github.com/decred/dcrd/gcs/v4 v4.1.0/go.mod h1:nPTbGM/I3Ihe5KFvUmxZEqQP/jDZQjQ63+WEi/f4lqU=
5151
github.com/decred/dcrd/hdkeychain/v3 v3.1.2 h1:x25WuuE7zM/20EynuVMyOhL0K8BwGBBsexGq8xTiHFA=
5252
github.com/decred/dcrd/hdkeychain/v3 v3.1.2/go.mod h1:FnNJmZ7jqUDeAo6/c/xkQi5cuxh3EWtJeMmW6/Z8lcc=
53-
github.com/decred/dcrd/mixing v0.4.2 h1:mpt2pNIFTI6L1hXrieAWJTQJv5t9WzHcNnhI+tnAG90=
54-
github.com/decred/dcrd/mixing v0.4.2/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ=
53+
github.com/decred/dcrd/mixing v0.5.0 h1:KEWr6ZKuUcnAMsuWyrwpdCuL48OrCkIZbKn5B1V+wCY=
54+
github.com/decred/dcrd/mixing v0.5.0/go.mod h1:264YZ7KgKsjQGwart40E1QiVzPvLiaKkd/T0c8jtzNI=
5555
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 h1:l0DnCcILTNrpy8APF3FLN312ChpkQaAuW30aC/RgBaw=
5656
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0/go.mod h1:j+kkRPXPJB5S9VFOsx8SQLcU7PTFkPKRc1aCHN4ENzA=
5757
github.com/decred/dcrd/rpcclient/v8 v8.0.1 h1:hd81e4w1KSqvPcozJlnz6XJfWKDNuahgooH/N5E8vOU=
@@ -92,20 +92,20 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70
9292
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
9393
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
9494
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
95-
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
96-
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
95+
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
96+
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
9797
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
9898
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
99-
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
100-
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
99+
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
100+
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
101101
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
102102
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
103-
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
104-
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
105-
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
106-
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
107-
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
108-
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
103+
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
104+
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
105+
golang.org/x/term v0.29.0 h1:L6pJp37ocefwRRtYPKSWOWzOtWSxVajvz2ldH/xi3iU=
106+
golang.org/x/term v0.29.0/go.mod h1:6bl4lRlvVuDgSf3179VpIxBF0o10JUpXWOnI7nErv7s=
107+
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
108+
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
109109
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
110110
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
111111
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=

spv/sync.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ func (s *Syncer) setRequiredHeight(tipHeight int32) {
321321
}
322322

323323
// Run synchronizes the wallet, returning when synchronization fails or the
324-
// context is cancelled.
324+
// context is canceled.
325325
func (s *Syncer) Run(ctx context.Context) (err error) {
326326
s.doneMu.Lock()
327327
s.done = make(chan struct{})
@@ -367,32 +367,43 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
367367
}
368368

369369
// Start background handlers to read received messages from remote peers
370-
g, ctx := errgroup.WithContext(ctx)
371-
g.Go(func() error { return s.receiveGetData(ctx) })
372-
g.Go(func() error { return s.receiveInv(ctx) })
373-
g.Go(func() error { return s.receiveHeadersAnnouncements(ctx) })
374-
g.Go(func() error { return s.receiveMixMsgs(ctx) })
370+
g, gctx := errgroup.WithContext(context.Background())
371+
g.Go(func() error { return s.receiveGetData(gctx) })
372+
g.Go(func() error { return s.receiveInv(gctx) })
373+
g.Go(func() error { return s.receiveHeadersAnnouncements(gctx) })
374+
g.Go(func() error { return s.receiveMixMsgs(gctx) })
375375
s.lp.AddHandledMessages(p2p.MaskGetData | p2p.MaskInv)
376376

377377
if len(s.persistentPeers) != 0 {
378378
for i := range s.persistentPeers {
379379
raddr := s.persistentPeers[i]
380-
g.Go(func() error { return s.connectToPersistent(ctx, raddr) })
380+
g.Go(func() error { return s.connectToPersistent(gctx, raddr) })
381381
}
382382
} else {
383-
g.Go(func() error { return s.connectToCandidates(ctx) })
383+
g.Go(func() error { return s.connectToCandidates(gctx) })
384384
}
385385

386-
g.Go(func() error { return s.handleMempool(ctx) })
386+
g.Go(func() error { return s.handleMempool(gctx) })
387387

388388
s.wallet.SetNetworkBackend(s)
389389
defer s.wallet.SetNetworkBackend(nil)
390390

391+
// Ensure initial sync and wallet.Run cleanly finish/are canceled
392+
// first when outer context is canceled.
393+
walletCtx, walletCtxCancel := context.WithCancel(context.Background())
394+
go func() {
395+
select {
396+
case <-ctx.Done():
397+
case <-gctx.Done():
398+
}
399+
walletCtxCancel()
400+
}()
401+
391402
// Perform the initial startup sync.
392403
g.Go(func() error {
393404
// First step: fetch missing CFilters.
394405
progress := make(chan wallet.MissingCFilterProgress, 1)
395-
go s.wallet.FetchMissingCFiltersWithProgress(ctx, s, progress)
406+
go s.wallet.FetchMissingCFiltersWithProgress(walletCtx, s, progress)
396407

397408
log.Debugf("Fetching missing CFilters...")
398409
s.fetchMissingCfiltersStart()
@@ -408,14 +419,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
408419
// Next: fetch headers and cfilters up to mainchain tip.
409420
s.fetchHeadersStart()
410421
log.Debugf("Fetching headers and CFilters...")
411-
err = s.initialSyncHeaders(ctx)
422+
err = s.initialSyncHeaders(walletCtx)
412423
if err != nil {
413424
return err
414425
}
415426
s.fetchHeadersFinished()
416427

417428
// Finally: Perform the initial rescan over the received blocks.
418-
err = s.initialSyncRescan(ctx)
429+
err = s.initialSyncRescan(walletCtx)
419430
if err != nil {
420431
return err
421432
}
@@ -425,10 +436,18 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
425436
return nil
426437
})
427438

428-
// Run wallet background goroutines (currently, this just runs
429-
// mixclient).
430439
g.Go(func() error {
431-
return s.wallet.Run(ctx)
440+
// Run wallet background goroutines (currently, this just runs
441+
// mixclient).
442+
err := s.wallet.Run(walletCtx)
443+
if err != nil {
444+
return err
445+
}
446+
447+
// If gctx has not yet been canceled, do so here now.
448+
// walletCtx is canceled after either ctx or gctx is canceled.
449+
<-walletCtx.Done()
450+
return walletCtx.Err()
432451
})
433452

434453
// Wait until cancellation or a handler errors.

0 commit comments

Comments
 (0)