Skip to content

Commit 01f1e12

Browse files
committed
[release-v2.0] Add context wrapping for syncer disconnections
Any long-lived process (such as mixing) must return if the network backend (e.g. the dcrd RPC connection) is disconnected or the syncer encounters another error. However, it was observed that mixclient calls were continuing to execute despite the syncer erroring, resulting in locked outputs that could not be mixed after the syncer reconnected and restarted. To avoid this, the network backend interface gains additional methods to select on when disconnect occurs, and a new context wrapping function is added, which creates a derived context that is canceled (or "done") after the syncer exits. Wrapped contexts are added in the ticket purchasing and account mixing paths, including those being performed by the autobuyer. Backport of 4adf892.
1 parent 45d7841 commit 01f1e12

File tree

9 files changed

+149
-1
lines changed

9 files changed

+149
-1
lines changed

chain/backend.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ func (s *Syncer) ExistsLiveTickets(ctx context.Context, tickets []*chainhash.Has
8787
func (s *Syncer) UsedAddresses(ctx context.Context, addrs []stdaddr.Address) (bitset.Bytes, error) {
8888
return s.rpc.UsedAddresses(ctx, addrs)
8989
}
90+
91+
func (s *Syncer) Done() <-chan struct{} {
92+
s.doneMu.Lock()
93+
c := s.done
94+
s.doneMu.Unlock()
95+
return c
96+
}
97+
98+
func (s *Syncer) Err() error {
99+
s.doneMu.Lock()
100+
c := s.done
101+
err := s.err
102+
s.doneMu.Unlock()
103+
104+
select {
105+
case <-c:
106+
return err
107+
default:
108+
return nil
109+
}
110+
}

chain/sync.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ type Syncer struct {
5555
relevantTxs map[chainhash.Hash][]*wire.MsgTx
5656

5757
cb *Callbacks
58+
59+
done chan struct{}
60+
err error
61+
doneMu sync.Mutex
5862
}
5963

6064
// RPCOptions specifies the network and security settings for establishing a
@@ -525,6 +529,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
525529
}
526530
}()
527531

532+
s.doneMu.Lock()
533+
s.done = make(chan struct{})
534+
s.err = nil
535+
s.doneMu.Unlock()
536+
defer func() {
537+
s.doneMu.Lock()
538+
close(s.done)
539+
s.err = err
540+
s.doneMu.Unlock()
541+
}()
542+
528543
params := s.wallet.ChainParams()
529544

530545
s.notifier = &notifier{

spv/backend.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,3 +619,24 @@ func (s *Syncer) Rescan(ctx context.Context, blockHashes []chainhash.Hash, save
619619
func (s *Syncer) StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) {
620620
return 0, errors.E(errors.Invalid, "stake difficulty is not queryable over wire protocol")
621621
}
622+
623+
func (s *Syncer) Done() <-chan struct{} {
624+
s.doneMu.Lock()
625+
c := s.done
626+
s.doneMu.Unlock()
627+
return c
628+
}
629+
630+
func (s *Syncer) Err() error {
631+
s.doneMu.Lock()
632+
c := s.done
633+
err := s.err
634+
s.doneMu.Unlock()
635+
636+
select {
637+
case <-c:
638+
return err
639+
default:
640+
return nil
641+
}
642+
}

spv/sync.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ type Syncer struct {
9191
// Mempool for non-wallet-relevant transactions.
9292
mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx
9393
mempoolAdds chan *chainhash.Hash
94+
95+
done chan struct{}
96+
err error
97+
doneMu sync.Mutex
9498
}
9599

96100
// Notifications struct to contain all of the upcoming callbacks that will
@@ -318,7 +322,18 @@ func (s *Syncer) setRequiredHeight(tipHeight int32) {
318322

319323
// Run synchronizes the wallet, returning when synchronization fails or the
320324
// context is cancelled.
321-
func (s *Syncer) Run(ctx context.Context) error {
325+
func (s *Syncer) Run(ctx context.Context) (err error) {
326+
s.doneMu.Lock()
327+
s.done = make(chan struct{})
328+
s.err = nil
329+
s.doneMu.Unlock()
330+
defer func() {
331+
s.doneMu.Lock()
332+
close(s.done)
333+
s.err = err
334+
s.doneMu.Unlock()
335+
}()
336+
322337
tipHash, tipHeight := s.wallet.MainChainTip(ctx)
323338
s.setRequiredHeight(tipHeight)
324339
rescanPoint, err := s.wallet.RescanPoint(ctx)

ticketbuyer/tb.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ func (tb *TB) buy(ctx context.Context, passphrase []byte, tip *wire.BlockHeader,
227227
if err != nil {
228228
return err
229229
}
230+
ctx, cancel := wallet.WrapNetworkBackendContext(n, ctx)
231+
defer cancel()
230232

231233
if len(passphrase) > 0 {
232234
// Ensure wallet is unlocked with the current passphrase. If the passphase

wallet/mixing.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ func (w *Wallet) MixOutput(ctx context.Context, output *wire.OutPoint, changeAcc
275275
return errors.E(op, errors.Invalid, s)
276276
}
277277

278+
nb, err := w.NetworkBackend()
279+
if err != nil {
280+
return err
281+
}
282+
ctx, cancel := WrapNetworkBackendContext(nb, ctx)
283+
defer cancel()
284+
278285
sdiff, err := w.NextStakeDifficulty(ctx)
279286
if err != nil {
280287
return errors.E(op, err)

wallet/network.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package wallet
66

77
import (
88
"context"
9+
"sync"
910

1011
"decred.org/dcrwallet/v4/errors"
1112
"github.com/decred/dcrd/chaincfg/chainhash"
@@ -49,6 +50,12 @@ type NetworkBackend interface {
4950
// the wallet to the underlying network, and if not, it returns the
5051
// target height that it is attempting to sync to.
5152
Synced(ctx context.Context) (bool, int32)
53+
54+
// Done return a channel that is closed after the syncer disconnects.
55+
// The error (if any) can be returned via Err.
56+
// These semantics match that of context.Context.
57+
Done() <-chan struct{}
58+
Err() error
5259
}
5360

5461
// NetworkBackend returns the currently associated network backend of the
@@ -73,6 +80,47 @@ func (w *Wallet) SetNetworkBackend(n NetworkBackend) {
7380
w.networkBackendMu.Unlock()
7481
}
7582

83+
type networkContext struct {
84+
context.Context
85+
err error
86+
mu sync.Mutex
87+
}
88+
89+
func (c *networkContext) Err() error {
90+
c.mu.Lock()
91+
err := c.err
92+
c.mu.Unlock()
93+
94+
if err != nil {
95+
return err
96+
}
97+
return c.Context.Err()
98+
}
99+
100+
// WrapNetworkBackendContext returns a derived context that is canceled when
101+
// the NetworkBackend is disconnected. The cancel func must be called
102+
// (e.g. using defer) otherwise a goroutine leak may occur.
103+
func WrapNetworkBackendContext(nb NetworkBackend, ctx context.Context) (context.Context, context.CancelFunc) {
104+
childCtx, cancel := context.WithCancel(ctx)
105+
nbContext := &networkContext{
106+
Context: childCtx,
107+
}
108+
109+
go func() {
110+
select {
111+
case <-nb.Done():
112+
err := nb.Err()
113+
nbContext.mu.Lock()
114+
nbContext.err = err
115+
nbContext.mu.Unlock()
116+
case <-childCtx.Done():
117+
}
118+
cancel()
119+
}()
120+
121+
return nbContext, cancel
122+
}
123+
76124
// Caller provides a client interface to perform remote procedure calls.
77125
// Serialization and calling conventions are implementation-specific.
78126
type Caller interface {
@@ -122,6 +170,20 @@ func (o OfflineNetworkBackend) Synced(ctx context.Context) (bool, int32) {
122170
return true, 0
123171
}
124172

173+
var closedDone = make(chan struct{})
174+
175+
func init() {
176+
close(closedDone)
177+
}
178+
179+
func (o OfflineNetworkBackend) Done() <-chan struct{} {
180+
return closedDone
181+
}
182+
183+
func (o OfflineNetworkBackend) Err() error {
184+
return errors.E("offline")
185+
}
186+
125187
// Compile time check to ensure OfflineNetworkBackend fulfills the
126188
// NetworkBackend interface.
127189
var _ NetworkBackend = OfflineNetworkBackend{}

wallet/network_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,5 @@ func (mockNetwork) Rescan(ctx context.Context, blocks []chainhash.Hash, save fun
3535
}
3636
func (mockNetwork) StakeDifficulty(ctx context.Context) (dcrutil.Amount, error) { return 0, nil }
3737
func (mockNetwork) Synced(ctx context.Context) (bool, int32) { return false, 0 }
38+
func (mockNetwork) Done() <-chan struct{} { return nil }
39+
func (mockNetwork) Err() error { return nil }

wallet/wallet.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,6 +1608,9 @@ func (w *Wallet) PurchaseTickets(ctx context.Context, n NetworkBackend,
16081608

16091609
const op errors.Op = "wallet.PurchaseTickets"
16101610

1611+
ctx, cancel := WrapNetworkBackendContext(n, ctx)
1612+
defer cancel()
1613+
16111614
resp, err := w.purchaseTickets(ctx, op, n, req)
16121615
if err == nil || !errors.Is(err, errVSPFeeRequiresUTXOSplit) || req.DontSignTx {
16131616
return resp, err

0 commit comments

Comments
 (0)