diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index bfead3e674..243adef1fc 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -134,7 +134,7 @@ type PrunedBlockDispatcher struct { // workManager handles satisfying all of our incoming pruned block // requests. - workManager *query.WorkManager + workManager query.WorkManager // blocksQueried represents the set of pruned blocks we've been // requested to query. Each block maps to a list of clients waiting to @@ -191,7 +191,7 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) ( peersConnected := make(chan query.Peer) return &PrunedBlockDispatcher{ cfg: *cfg, - workManager: query.New(&query.Config{ + workManager: query.NewWorkManager(&query.Config{ ConnectedPeers: func() (<-chan query.Peer, func(), error) { return peersConnected, func() {}, nil }, diff --git a/spv/neutrino.go b/spv/neutrino.go index fe740dea36..2cd3e91cd3 100644 --- a/spv/neutrino.go +++ b/spv/neutrino.go @@ -667,7 +667,7 @@ type ChainService struct { // nolint:maligned utxoScanner *UtxoScanner broadcaster *pushtx.Broadcaster banStore banman.Store - workManager *query.WorkManager + workManager query.WorkManager // peerSubscribers is a slice of active peer subscriptions, that we // will notify each time a new peer is connected. @@ -743,7 +743,7 @@ func NewChainService(cfg Config) (*ChainService, error) { persistToDisk: cfg.PersistToDisk, broadcastTimeout: cfg.BroadcastTimeout, } - s.workManager = query.New(&query.Config{ + s.workManager = query.NewWorkManager(&query.Config{ ConnectedPeers: s.ConnectedPeers, NewWorker: query.NewWorker, Ranking: query.NewPeerRanking(), diff --git a/spv/query/interface.go b/spv/query/interface.go index 5ef6767f36..0564c8287d 100644 --- a/spv/query/interface.go +++ b/spv/query/interface.go @@ -112,6 +112,19 @@ type Request struct { HandleResp func(req, resp wire.Message, peer string) Progress } +// WorkManager defines an API for a manager that dispatches queries to bitcoin +// peers that must be started and stopped in order to perform these queries. +type WorkManager interface { + Dispatcher + + // Start sets up any resources that the WorkManager requires. It must + // be called before any of the Dispatcher calls can be made. + Start() error + + // Stop cleans up the resources held by the WorkManager. + Stop() error +} + // Dispatcher is an interface defining the API for dispatching queries to // bitcoin peers. type Dispatcher interface { diff --git a/spv/query/workmanager.go b/spv/query/workmanager.go index 76e801a1d6..4fd3e1c0b9 100644 --- a/spv/query/workmanager.go +++ b/spv/query/workmanager.go @@ -96,10 +96,11 @@ type Config struct { Ranking PeerRanking } -// WorkManager is the main access point for outside callers, and satisfies the -// QueryAccess API. It receives queries to pass to peers, and schedules them -// among available workers, orchestrating where to send them. -type WorkManager struct { +// peerWorkManager is the main access point for outside callers, and satisfies +// the QueryAccess API. It receives queries to pass to peers, and schedules them +// among available workers, orchestrating where to send them. It implements the +// WorkManager interface. +type peerWorkManager struct { cfg *Config // newBatches is a channel where new batches of queries will be sent to @@ -114,12 +115,13 @@ type WorkManager struct { wg sync.WaitGroup } -// Compile time check to ensure WorkManager satisfies the Dispatcher interface. -var _ Dispatcher = (*WorkManager)(nil) +// Compile time check to ensure peerWorkManager satisfies the WorkManager interface. +var _ WorkManager = (*peerWorkManager)(nil) -// New returns a new WorkManager with the regular worker implementation. -func New(cfg *Config) *WorkManager { - return &WorkManager{ +// NewWorkManager returns a new WorkManager with the regular worker +// implementation. +func NewWorkManager(cfg *Config) WorkManager { + return &peerWorkManager{ cfg: cfg, newBatches: make(chan *batch), jobResults: make(chan *jobResult), @@ -127,8 +129,10 @@ func New(cfg *Config) *WorkManager { } } -// Start starts the WorkManager. -func (w *WorkManager) Start() error { +// Start starts the peerWorkManager. +// +// NOTE: this is part of the WorkManager interface. +func (w *peerWorkManager) Start() error { w.wg.Add(1) go w.workDispatcher() @@ -136,7 +140,10 @@ func (w *WorkManager) Start() error { } // Stop stops the WorkManager and all underlying goroutines. -func (w *WorkManager) Stop() error { +// Stop stops the peerWorkManager and all underlying goroutines. +// +// NOTE: this is part of the WorkManager interface. +func (w *peerWorkManager) Stop() error { close(w.quit) w.wg.Wait() @@ -149,7 +156,7 @@ func (w *WorkManager) Stop() error { // will be attempted completed first. // // NOTE: MUST be run as a goroutine. -func (w *WorkManager) workDispatcher() { +func (w *peerWorkManager) workDispatcher() { defer w.wg.Done() // Get a peer subscription. We do it in this goroutine rather than @@ -428,8 +435,8 @@ Loop: // Query distributes the slice of requests to the set of connected peers. // -// NOTO: Part of the Dispatcher interface. -func (w *WorkManager) Query(requests []*Request, +// NOTE: this is part of the WorkManager interface. +func (w *peerWorkManager) Query(requests []*Request, options ...QueryOption) chan error { qo := defaultQueryOptions() diff --git a/spv/query/workmanager_test.go b/spv/query/workmanager_test.go index b9609121d7..c99b5af032 100644 --- a/spv/query/workmanager_test.go +++ b/spv/query/workmanager_test.go @@ -5,6 +5,8 @@ import ( "sort" "testing" "time" + + "github.com/stretchr/testify/require" ) type mockWorker struct { @@ -63,7 +65,7 @@ func (p *mockPeerRanking) Reward(peer string) { // startWorkManager starts a new workmanager with the given number of mock // workers. -func startWorkManager(t *testing.T, numWorkers int) (*WorkManager, +func startWorkManager(t *testing.T, numWorkers int) (WorkManager, []*mockWorker) { // We set up a custom NewWorker closure for the WorkManager, such that @@ -71,7 +73,7 @@ func startWorkManager(t *testing.T, numWorkers int) (*WorkManager, workerChan := make(chan *mockWorker) peerChan := make(chan Peer) - wm := New(&Config{ + wm := NewWorkManager(&Config{ ConnectedPeers: func() (<-chan Peer, func(), error) { return peerChan, func() {}, nil }, @@ -379,13 +381,16 @@ func TestWorkManagerCancelBatch(t *testing.T) { } } -// TestWorkManaferWorkRankingScheduling checks that the work manager schedules +// TestWorkManagerWorkRankingScheduling checks that the work manager schedules // jobs among workers according to the peer ranking. func TestWorkManagerWorkRankingScheduling(t *testing.T) { const numQueries = 4 const numWorkers = 8 - wm, workers := startWorkManager(t, numWorkers) + workMgr, workers := startWorkManager(t, numWorkers) + + require.IsType(t, workMgr, &peerWorkManager{}) + wm := workMgr.(*peerWorkManager) //nolint:forcetypeassert // Set up the ranking to prioritize lower numbered workers. wm.cfg.Ranking.(*mockPeerRanking).less = func(i, j string) bool {