Skip to content

Commit ab15d98

Browse files
committed
spv: Process header batches in parallel
This switches the initial SPV header fetching process to process each batch of headers in parallel through its various stages.
1 parent 6693162 commit ab15d98

File tree

2 files changed

+217
-63
lines changed

2 files changed

+217
-63
lines changed

spv/backend.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
148148
// watchdog interval means we'll try at least 4 different peers before
149149
// resetting.
150150
const watchdogTimeoutInterval = 2 * time.Minute
151-
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute)
151+
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, watchdogTimeoutInterval)
152152
defer cancelWatchdog()
153153

154154
nextTry:
@@ -226,7 +226,7 @@ type headersBatch struct {
226226
//
227227
// This function returns a batch with the done flag set to true when no peers
228228
// have more recent blocks for syncing.
229-
func (s *Syncer) getHeaders(ctx context.Context) (*headersBatch, error) {
229+
func (s *Syncer) getHeaders(ctx context.Context, likelyBestChain []*wallet.BlockNode) (*headersBatch, error) {
230230
cnet := s.wallet.ChainParams().Net
231231

232232
nextbatch:
@@ -245,7 +245,7 @@ nextbatch:
245245
log.Tracef("Attempting next batch of headers from %v", rp)
246246

247247
// Request headers from the selected peer.
248-
locators, locatorHeight, err := s.wallet.BlockLocators(ctx, nil)
248+
locators, locatorHeight, err := s.wallet.BlockLocators(ctx, likelyBestChain)
249249
if err != nil {
250250
return nil, err
251251
}

spv/sync.go

Lines changed: 214 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,81 +1322,235 @@ var hashStop chainhash.Hash
13221322
// is up to date with all connected peers. This is part of the startup sync
13231323
// process.
13241324
func (s *Syncer) initialSyncHeaders(ctx context.Context) error {
1325-
startTime := time.Now()
1325+
// The strategy for fetching initial headers is to split each batch of
1326+
// headers into 3 stages that are run in parallel, with each batch
1327+
// flowing through the pipeline:
1328+
//
1329+
// 1. Fetch headers
1330+
// 2. Fetch cfilters
1331+
// 3. Chain switch
1332+
//
1333+
// In the positive path during IBD, this allows processing the chain
1334+
// much faster, because the bottleneck is generally the latency for
1335+
// completing steps 1 and 2, and thus running them in parallel for
1336+
// each batch speeds up the overall sync time.
1337+
//
1338+
// The pipelining is accomplished by assuming that the next batch of
1339+
// headers will be a successor of the most recently fetched batch and
1340+
// requesting this next batch using locators derived from this (not yet
1341+
// completely processed) batch. For all but the most recent blocks in
1342+
// the main chain, this assumption will be true.
1343+
//
1344+
// Occasionally, in case of misbehaving peers, the sync process might be
1345+
// entirely reset and started from scratch using only the wallet derived
1346+
// block locators.
1347+
//
1348+
// The three stages are run as goroutines in the following g.
1349+
g, ctx := errgroup.WithContext(ctx)
13261350

1327-
nextbatch:
1328-
for ctx.Err() == nil {
1329-
// Fetch a batch of headers.
1330-
batch, err := s.getHeaders(ctx)
1331-
if err != nil {
1332-
return err
1333-
}
1334-
if batch.done {
1335-
// All done.
1336-
log.Debugf("Initial sync completed in %s",
1337-
time.Since(startTime).Round(time.Second))
1338-
return nil
1351+
// invalidateBatchChan is closed when one of the later stages (cfilter
1352+
// fetching or chain switch) fails and the header fetching stage should
1353+
// restart without using the cached chain.
1354+
invalidateBatchChan := make(chan struct{})
1355+
var invalidateBatchMu sync.Mutex
1356+
invalidateBatch := func() {
1357+
invalidateBatchMu.Lock()
1358+
select {
1359+
case <-invalidateBatchChan:
1360+
default:
1361+
close(invalidateBatchChan)
1362+
invalidateBatchChan = make(chan struct{})
13391363
}
1364+
invalidateBatchMu.Unlock()
1365+
}
1366+
nextInvalidateBatchChan := func() chan struct{} {
1367+
invalidateBatchMu.Lock()
1368+
res := invalidateBatchChan
1369+
invalidateBatchMu.Unlock()
1370+
return res
1371+
}
13401372

1341-
bestChain := batch.bestChain
1373+
// Stage 1: fetch headers.
1374+
headersChan := make(chan *headersBatch)
1375+
g.Go(func() error {
1376+
var batch *headersBatch
1377+
var err error
1378+
invalidateChan := nextInvalidateBatchChan()
1379+
for {
1380+
// If we have a previous batch, the next batch is
1381+
// likely to be a successor to it.
1382+
var likelyBestChain []*wallet.BlockNode
1383+
if batch != nil {
1384+
likelyBestChain = batch.bestChain
1385+
}
13421386

1343-
// Determine which nodes don't have cfilters yet.
1344-
s.sidechainMu.Lock()
1345-
var missingCfilter []*wallet.BlockNode
1346-
for i := range bestChain {
1347-
if bestChain[i].FilterV2 == nil {
1348-
missingCfilter = bestChain[i:]
1349-
break
1387+
// Fetch a batch of headers.
1388+
batch, err = s.getHeaders(ctx, likelyBestChain)
1389+
if err != nil {
1390+
return err
13501391
}
1351-
}
1352-
s.sidechainMu.Unlock()
13531392

1354-
// Fetch Missing CFilters.
1355-
err = s.cfiltersV2FromNodes(ctx, missingCfilter)
1356-
if err != nil {
1357-
log.Debugf("Unable to fetch missing cfilters from %v: %v",
1358-
batch.rp, err)
1359-
continue nextbatch
1360-
}
1361-
if len(missingCfilter) > 0 {
1362-
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
1363-
len(missingCfilter), missingCfilter[len(missingCfilter)-1].Header.Height)
1393+
// Before sending to the next stage, check if the
1394+
// last one has already been invalidated while we
1395+
// were waiting for a getHeaders response.
1396+
select {
1397+
case <-invalidateChan:
1398+
invalidateChan = nextInvalidateBatchChan()
1399+
batch = nil
1400+
continue
1401+
default:
1402+
}
1403+
1404+
// Otherwise, send to next stage.
1405+
select {
1406+
case headersChan <- batch:
1407+
case <-invalidateChan:
1408+
invalidateChan = nextInvalidateBatchChan()
1409+
batch = nil
1410+
continue
1411+
case <-ctx.Done():
1412+
return ctx.Err()
1413+
}
1414+
1415+
if batch.done {
1416+
return nil
1417+
}
13641418
}
1419+
})
13651420

1366-
// Switch the best chain, now that all cfilters have been
1367-
// fetched for it.
1368-
s.sidechainMu.Lock()
1369-
prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil)
1370-
if err != nil {
1421+
// Stage 2: fetch cfilters.
1422+
cfiltersChan := make(chan *headersBatch)
1423+
g.Go(func() error {
1424+
var batch *headersBatch
1425+
var err error
1426+
for {
1427+
// Wait for a batch of headers.
1428+
select {
1429+
case batch = <-headersChan:
1430+
case <-ctx.Done():
1431+
return ctx.Err()
1432+
}
1433+
1434+
// Once done, send the last batch forward and return.
1435+
if batch.done {
1436+
select {
1437+
case cfiltersChan <- batch:
1438+
case <-ctx.Done():
1439+
return ctx.Err()
1440+
}
1441+
return nil
1442+
}
1443+
1444+
// Determine which nodes don't have cfilters yet.
1445+
s.sidechainMu.Lock()
1446+
var missingCfilter []*wallet.BlockNode
1447+
for i := range batch.bestChain {
1448+
if batch.bestChain[i].FilterV2 == nil {
1449+
missingCfilter = batch.bestChain[i:]
1450+
break
1451+
}
1452+
}
13711453
s.sidechainMu.Unlock()
1372-
batch.rp.Disconnect(err)
1373-
continue nextbatch
1374-
}
13751454

1376-
if len(prevChain) != 0 {
1377-
log.Infof("Reorganize from %v to %v (total %d block(s) reorged)",
1378-
prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain))
1379-
for _, n := range prevChain {
1380-
s.sidechains.AddBlockNode(n)
1455+
// Fetch Missing CFilters.
1456+
err = s.cfiltersV2FromNodes(ctx, missingCfilter)
1457+
if errors.Is(err, errCfilterWatchdogTriggered) {
1458+
log.Debugf("Unable to fetch missing cfilters from %v: %v",
1459+
batch.rp, err)
1460+
invalidateBatch()
1461+
continue
1462+
}
1463+
if err != nil {
1464+
return err
1465+
}
1466+
1467+
if len(missingCfilter) > 0 {
1468+
lastHeight := missingCfilter[len(missingCfilter)-1].Header.Height
1469+
log.Debugf("Fetched %d new cfilters(s) ending at height %d",
1470+
len(missingCfilter), lastHeight)
1471+
}
1472+
1473+
// Pass the batch to the next stage.
1474+
select {
1475+
case cfiltersChan <- batch:
1476+
case <-ctx.Done():
1477+
return ctx.Err()
13811478
}
13821479
}
1383-
tip := bestChain[len(bestChain)-1]
1384-
if len(bestChain) == 1 {
1385-
log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height)
1386-
} else {
1387-
log.Infof("Connected %d blocks, new tip %v, height %d, date %v",
1388-
len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp)
1389-
}
1390-
s.fetchHeadersProgress(tip.Header)
1480+
})
13911481

1392-
s.sidechainMu.Unlock()
1482+
// Stage 3: chain switch.
1483+
g.Go(func() error {
1484+
startTime := time.Now()
1485+
for ctx.Err() == nil {
1486+
// Fetch a batch with cfilters filled in.
1487+
var batch *headersBatch
1488+
select {
1489+
case batch = <-cfiltersChan:
1490+
case <-ctx.Done():
1491+
return ctx.Err()
1492+
}
13931493

1394-
// Peers should not be significantly behind the new tip.
1395-
s.setRequiredHeight(int32(tip.Header.Height))
1396-
s.disconnectStragglers(int32(tip.Header.Height))
1397-
}
1494+
if batch.done {
1495+
// All done.
1496+
log.Debugf("Initial sync completed in %s",
1497+
time.Since(startTime).Round(time.Second))
1498+
return nil
1499+
}
1500+
1501+
// Switch the best chain, now that all cfilters have been
1502+
// fetched for it.
1503+
s.sidechainMu.Lock()
1504+
bestChain := batch.bestChain
1505+
1506+
// When the first N blocks of bestChain have already been added
1507+
// to mainchain tip, they don't have to be added again. This
1508+
// happens when requesting batches ahead of time.
1509+
tipHash, tipHeight := s.wallet.MainChainTip(ctx)
1510+
tipIndex := tipHeight - int32(bestChain[0].Header.Height)
1511+
if tipIndex > 0 && tipIndex < int32(len(bestChain)-1) && *bestChain[tipIndex].Hash == tipHash {
1512+
log.Tracef("Updating bestChain to tipIndex %d from %d to %d",
1513+
tipIndex, bestChain[0].Header.Height,
1514+
bestChain[tipIndex+1].Header.Height)
1515+
bestChain = bestChain[tipIndex+1:]
1516+
}
1517+
1518+
// Switch to the new main chain.
1519+
prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil)
1520+
if err != nil {
1521+
s.sidechainMu.Unlock()
1522+
batch.rp.Disconnect(err)
1523+
invalidateBatch()
1524+
continue
1525+
}
13981526

1399-
return ctx.Err()
1527+
if len(prevChain) != 0 {
1528+
log.Infof("Reorganize from %v to %v (total %d block(s) reorged)",
1529+
prevChain[len(prevChain)-1].Hash, bestChain[len(bestChain)-1].Hash, len(prevChain))
1530+
for _, n := range prevChain {
1531+
s.sidechains.AddBlockNode(n)
1532+
}
1533+
}
1534+
tip := bestChain[len(bestChain)-1]
1535+
if len(bestChain) == 1 {
1536+
log.Infof("Connected block %v, height %d", tip.Hash, tip.Header.Height)
1537+
} else {
1538+
log.Infof("Connected %d blocks, new tip %v, height %d, date %v",
1539+
len(bestChain), tip.Hash, tip.Header.Height, tip.Header.Timestamp)
1540+
}
1541+
s.fetchHeadersProgress(tip.Header)
1542+
1543+
s.sidechainMu.Unlock()
1544+
1545+
// Peers should not be significantly behind the new tip.
1546+
s.setRequiredHeight(int32(tip.Header.Height))
1547+
s.disconnectStragglers(int32(tip.Header.Height))
1548+
}
1549+
1550+
return ctx.Err()
1551+
})
1552+
1553+
return g.Wait()
14001554
}
14011555

14021556
// initialSyncRescan performs account and address discovery and rescans blocks

0 commit comments

Comments
 (0)