Skip to content

Commit

Permalink
Combines two commits from different repos
Browse files Browse the repository at this point in the history
lightninglabs/neutrino#273

Author: Elle Mouton <[email protected]>
Date:   Tue May 9 12:33:19 2023 +0200

and

btcsuite#884

Author: Olaoluwa Osuntokun <[email protected]>
Date:   Fri Aug 4 11:46:12 2023 -0700
  • Loading branch information
buck54321 committed Apr 21, 2024
1 parent 687d078 commit ee357c4
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 23 deletions.
4 changes: 2 additions & 2 deletions chain/pruned_block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type PrunedBlockDispatcher struct {

// workManager handles satisfying all of our incoming pruned block
// requests.
workManager *query.WorkManager
workManager query.WorkManager

// blocksQueried represents the set of pruned blocks we've been
// requested to query. Each block maps to a list of clients waiting to
Expand Down Expand Up @@ -191,7 +191,7 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) (
peersConnected := make(chan query.Peer)
return &PrunedBlockDispatcher{
cfg: *cfg,
workManager: query.New(&query.Config{
workManager: query.NewWorkManager(&query.Config{
ConnectedPeers: func() (<-chan query.Peer, func(), error) {
return peersConnected, func() {}, nil
},
Expand Down
4 changes: 2 additions & 2 deletions spv/neutrino.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ type ChainService struct { // nolint:maligned
utxoScanner *UtxoScanner
broadcaster *pushtx.Broadcaster
banStore banman.Store
workManager *query.WorkManager
workManager query.WorkManager

// peerSubscribers is a slice of active peer subscriptions, that we
// will notify each time a new peer is connected.
Expand Down Expand Up @@ -743,7 +743,7 @@ func NewChainService(cfg Config) (*ChainService, error) {
persistToDisk: cfg.PersistToDisk,
broadcastTimeout: cfg.BroadcastTimeout,
}
s.workManager = query.New(&query.Config{
s.workManager = query.NewWorkManager(&query.Config{
ConnectedPeers: s.ConnectedPeers,
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
Expand Down
13 changes: 13 additions & 0 deletions spv/query/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ type Request struct {
HandleResp func(req, resp wire.Message, peer string) Progress
}

// WorkManager defines an API for a manager that dispatches queries to bitcoin
// peers that must be started and stopped in order to perform these queries.
type WorkManager interface {
Dispatcher

// Start sets up any resources that the WorkManager requires. It must
// be called before any of the Dispatcher calls can be made.
Start() error

// Stop cleans up the resources held by the WorkManager.
Stop() error
}

// Dispatcher is an interface defining the API for dispatching queries to
// bitcoin peers.
type Dispatcher interface {
Expand Down
37 changes: 22 additions & 15 deletions spv/query/workmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ type Config struct {
Ranking PeerRanking
}

// WorkManager is the main access point for outside callers, and satisfies the
// QueryAccess API. It receives queries to pass to peers, and schedules them
// among available workers, orchestrating where to send them.
type WorkManager struct {
// peerWorkManager is the main access point for outside callers, and satisfies
// the QueryAccess API. It receives queries to pass to peers, and schedules them
// among available workers, orchestrating where to send them. It implements the
// WorkManager interface.
type peerWorkManager struct {
cfg *Config

// newBatches is a channel where new batches of queries will be sent to
Expand All @@ -114,29 +115,35 @@ type WorkManager struct {
wg sync.WaitGroup
}

// Compile time check to ensure WorkManager satisfies the Dispatcher interface.
var _ Dispatcher = (*WorkManager)(nil)
// Compile time check to ensure peerWorkManager satisfies the WorkManager interface.
var _ WorkManager = (*peerWorkManager)(nil)

// New returns a new WorkManager with the regular worker implementation.
func New(cfg *Config) *WorkManager {
return &WorkManager{
// NewWorkManager returns a new WorkManager with the regular worker
// implementation.
func NewWorkManager(cfg *Config) WorkManager {
return &peerWorkManager{
cfg: cfg,
newBatches: make(chan *batch),
jobResults: make(chan *jobResult),
quit: make(chan struct{}),
}
}

// Start starts the WorkManager.
func (w *WorkManager) Start() error {
// Start starts the peerWorkManager.
//
// NOTE: this is part of the WorkManager interface.
func (w *peerWorkManager) Start() error {
w.wg.Add(1)
go w.workDispatcher()

return nil
}

// Stop stops the WorkManager and all underlying goroutines.
func (w *WorkManager) Stop() error {
// Stop stops the peerWorkManager and all underlying goroutines.
//
// NOTE: this is part of the WorkManager interface.
func (w *peerWorkManager) Stop() error {
close(w.quit)
w.wg.Wait()

Expand All @@ -149,7 +156,7 @@ func (w *WorkManager) Stop() error {
// will be attempted completed first.
//
// NOTE: MUST be run as a goroutine.
func (w *WorkManager) workDispatcher() {
func (w *peerWorkManager) workDispatcher() {
defer w.wg.Done()

// Get a peer subscription. We do it in this goroutine rather than
Expand Down Expand Up @@ -428,8 +435,8 @@ Loop:

// Query distributes the slice of requests to the set of connected peers.
//
// NOTO: Part of the Dispatcher interface.
func (w *WorkManager) Query(requests []*Request,
// NOTE: this is part of the WorkManager interface.
func (w *peerWorkManager) Query(requests []*Request,
options ...QueryOption) chan error {

qo := defaultQueryOptions()
Expand Down
13 changes: 9 additions & 4 deletions spv/query/workmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type mockWorker struct {
Expand Down Expand Up @@ -63,15 +65,15 @@ func (p *mockPeerRanking) Reward(peer string) {

// startWorkManager starts a new workmanager with the given number of mock
// workers.
func startWorkManager(t *testing.T, numWorkers int) (*WorkManager,
func startWorkManager(t *testing.T, numWorkers int) (WorkManager,
[]*mockWorker) {

// We set up a custom NewWorker closure for the WorkManager, such that
// we can start mockWorkers when it is called.
workerChan := make(chan *mockWorker)

peerChan := make(chan Peer)
wm := New(&Config{
wm := NewWorkManager(&Config{
ConnectedPeers: func() (<-chan Peer, func(), error) {
return peerChan, func() {}, nil
},
Expand Down Expand Up @@ -379,13 +381,16 @@ func TestWorkManagerCancelBatch(t *testing.T) {
}
}

// TestWorkManaferWorkRankingScheduling checks that the work manager schedules
// TestWorkManagerWorkRankingScheduling checks that the work manager schedules
// jobs among workers according to the peer ranking.
func TestWorkManagerWorkRankingScheduling(t *testing.T) {
const numQueries = 4
const numWorkers = 8

wm, workers := startWorkManager(t, numWorkers)
workMgr, workers := startWorkManager(t, numWorkers)

require.IsType(t, workMgr, &peerWorkManager{})
wm := workMgr.(*peerWorkManager) //nolint:forcetypeassert

// Set up the ranking to prioritize lower numbered workers.
wm.cfg.Ranking.(*mockPeerRanking).less = func(i, j string) bool {
Expand Down

0 comments on commit ee357c4

Please sign in to comment.