From a32cb52c064579b92fb06729c042d637648552e6 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 26 Jan 2024 17:03:40 -0800 Subject: [PATCH] Merge pull request #903 from ziggie1984/prune-node-deadlock-blocksync Plus https://github.com/lightninglabs/neutrino/pull/273 Author: Elle Mouton Date: Mon May 22 15:53:49 2023 +0200 chain: PrunedBlockDispatcher bugfix. --- chain/bitcoind_conn.go | 28 ++++- chain/pruned_block_dispatcher.go | 111 +++++++++++++++++--- chain/pruned_block_dispatcher_test.go | 143 ++++++++++++++++++++++---- spv/blockmanager.go | 2 +- spv/query/interface.go | 30 ++++++ spv/query/worker.go | 1 + spv/query/workmanager.go | 47 +++++++-- 7 files changed, 323 insertions(+), 39 deletions(-) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 9fea27eef2..59de1aaae0 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -469,10 +469,16 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { return nil, err } + // cancelChan is needed in case a block request with the same hash is + // already registered and fails via the returned errChan. Because we + // don't register a new request in this case this cancelChan is used + // to signal the failure of the dependant block request. + cancelChan := make(chan error, 1) + // Now that we know the block has been pruned for sure, request it from // our backend peers. blockChan, errChan := c.prunedBlockDispatcher.Query( - []*chainhash.Hash{hash}, + []*chainhash.Hash{hash}, cancelChan, ) for { @@ -482,12 +488,32 @@ func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { case err := <-errChan: if err != nil { + // An error was returned for this block request. + // We have to make sure we remove the blockhash + // from the list of queried blocks. + // Moreover because in case a block is requested + // more than once no redundant block requests + // are registered but rather a reply channel is + // added to the pending block request. This + // means we need to cancel all dependent + // `GetBlock` calls via the cancel channel when + // the fetching of the block was NOT successful. + c.prunedBlockDispatcher.CancelRequest( + *hash, err, + ) + return nil, err } // errChan fired before blockChan with a nil error, wait // for the block now. + // The cancelChan is only used when there is already a pending + // block request for this hash and that block request fails via + // the error channel above. + case err := <-cancelChan: + return nil, err + case <-c.quit: return nil, ErrBitcoindClientShuttingDown } diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go index 1e73ccea58..543dec3a9a 100644 --- a/chain/pruned_block_dispatcher.go +++ b/chain/pruned_block_dispatcher.go @@ -7,6 +7,7 @@ import ( "fmt" "math/rand" "net" + "sort" "sync" "time" @@ -143,7 +144,14 @@ type PrunedBlockDispatcher struct { // NOTE: The blockMtx lock must always be held when accessing this // field. blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock - blockMtx sync.Mutex + + // blockQueryCancel signals the cancellation of a `GetBlock` request. + // + // NOTE: The blockMtx lock must always be held when accessing this + // field. + blockQueryCancel map[chainhash.Hash][]chan<- error + + blockMtx sync.Mutex // currentPeers represents the set of peers we're currently connected // to. Each peer found here will have a worker spawned within the @@ -198,12 +206,13 @@ func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) ( NewWorker: query.NewWorker, Ranking: query.NewPeerRanking(), }), - blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock), - currentPeers: make(map[string]*peer.Peer), - bannedPeers: make(map[string]struct{}), - peersConnected: peersConnected, - timeSource: blockchain.NewMedianTime(), - quit: make(chan struct{}), + blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock), + blockQueryCancel: make(map[chainhash.Hash][]chan<- error), + currentPeers: make(map[string]*peer.Peer), + bannedPeers: make(map[string]struct{}), + peersConnected: peersConnected, + timeSource: blockchain.NewMedianTime(), + quit: make(chan struct{}), }, nil } @@ -386,8 +395,26 @@ func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) { // requests, i.e., any peer which is not considered a segwit-enabled // "full-node". func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) { - var eligible []string + var eligible []string // nolint:prealloc + + // First we sort the peers by the measured ping time, to choose the best + // peers to fetch blocks from. + sort.Slice(peers, func(i, j int) bool { + return peers[i].PingTime < peers[j].PingTime + }) + for _, peer := range peers { + // We cannot use the inbound peers here because the referenced + // port in the `addr` field is not the listen port for the p2p + // connection but a random outgoing port of the peer. + if peer.Inbound { + log.Debugf("Inbound peer %v not considering for "+ + "outbound connection to fetch pruned blocks", + peer) + + continue + } + rawServices, err := hex.DecodeString(peer.Services) if err != nil { return nil, err @@ -502,9 +529,10 @@ func (d *PrunedBlockDispatcher) banPeer(peer string) { // Query submits a request to query the information of the given blocks. func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, + cancelChan chan<- error, opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) { - reqs, blockChan, err := d.newRequest(blocks) + reqs, blockChan, err := d.newRequest(blocks, cancelChan) if err != nil { errChan := make(chan error, 1) errChan <- err @@ -515,14 +543,18 @@ func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, if len(reqs) > 0 { errChan = d.workManager.Query(reqs, opts...) } + return blockChan, errChan } // newRequest construct a new query request for the given blocks to submit to // the internal workManager. A channel is also returned through which the // requested blocks are sent through. -func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( - []*query.Request, <-chan *wire.MsgBlock, error) { +// +// NOTE: The cancelChan must be buffered. +func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash, + cancelChan chan<- error) ([]*query.Request, <-chan *wire.MsgBlock, + error) { // Make sure the channel is buffered enough to handle all blocks. blockChan := make(chan *wire.MsgBlock, len(blocks)) @@ -550,6 +582,10 @@ func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( } else { log.Debugf("Received new request for pending query of "+ "block %v", *block) + + d.blockQueryCancel[*block] = append( + d.blockQueryCancel[*block], cancelChan, + ) } d.blocksQueried[*block] = append( @@ -614,6 +650,8 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, Finished: false, } } + copyblockChans := make([]chan *wire.MsgBlock, len(blockChans)) + copy(copyblockChans, blockChans) err := blockchain.CheckBlockSanity( ltcutil.NewBlock(block), d.cfg.ChainParams.PowLimit, @@ -667,7 +705,7 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, go func() { defer d.wg.Done() - for _, blockChan := range blockChans { + for _, blockChan := range copyblockChans { select { case blockChan <- block: case <-d.quit: @@ -678,3 +716,52 @@ func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, return progress } + +// CancelRequest removes all information regarding a failed block request. +// When for example the Peer disconnects or runs in a timeout we make sure +// that all related information is deleted and a new request for this block +// can be registered. Moreover will also cancel all depending goroutines. +func (d *PrunedBlockDispatcher) CancelRequest(blockHash chainhash.Hash, + err error) { + + // failDependant is a helper function which fails all dependant + // goroutines via their cancel channels. + failDependant := func(cancelChans []chan<- error) { + defer d.wg.Done() + + for _, cancel := range cancelChans { + select { + case cancel <- err: + case <-d.quit: + return + } + } + } + + d.blockMtx.Lock() + + // Before removing the block hash we get the cancelChans which were + // registered for block requests that had already an ongoing pending + // request. + cancelChans, ok := d.blockQueryCancel[blockHash] + var copycancelChans []chan<- error + if ok { + copycancelChans = make([]chan<- error, len(cancelChans)) + copy(copycancelChans, cancelChans) + } + + // Remove all data related to this block request to make sure the same + // block can be registered again in the future. + delete(d.blocksQueried, blockHash) + delete(d.blockQueryCancel, blockHash) + + d.blockMtx.Unlock() + + // In case there are goroutines depending on this block request we + // make sure we cancel them. + // We do this in a goroutine to not block the initial request. + if ok { + d.wg.Add(1) + go failDependant(copycancelChans) + } +} diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go index f0e1cd8057..4253fef53d 100644 --- a/chain/pruned_block_dispatcher_test.go +++ b/chain/pruned_block_dispatcher_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/dcrlabs/ltcwallet/spv/query" "github.com/ltcsuite/lnd/ticker" "github.com/ltcsuite/ltcd/btcjson" "github.com/ltcsuite/ltcd/chaincfg" @@ -258,12 +259,18 @@ func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer { } // query requests the given blocks from the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( - <-chan *wire.MsgBlock, <-chan error) { +func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash, + opts ...query.QueryOption) ( + <-chan *wire.MsgBlock, <-chan error, <-chan error) { h.t.Helper() - blockChan, errChan := h.dispatcher.Query(blocks) + // cancelChan will receive an error msg in case the dependant block + // request fails. This is used for block requests already have a pending + // request registered and this request fails. + cancelChan := make(chan error, 1) + + blockChan, errChan := h.dispatcher.Query(blocks, cancelChan, opts...) select { case err := <-errChan: require.NoError(h.t, err) @@ -274,7 +281,7 @@ func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( h.blocksQueried[*block]++ } - return blockChan, errChan + return blockChan, errChan, cancelChan } // disablePeerReplies prevents the query peer from replying. @@ -363,7 +370,7 @@ func (h *prunedBlockDispatcherHarness) assertPeerQueried() { // assertPeerReplied asserts that the query peer replies with a block the // PrunedBlockDispatcher queried for. func (h *prunedBlockDispatcherHarness) assertPeerReplied( - blockChan <-chan *wire.MsgBlock, errChan <-chan error, + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error, expectCompletionSignal bool) { h.t.Helper() @@ -385,10 +392,18 @@ func (h *prunedBlockDispatcherHarness) assertPeerReplied( delete(h.blocksQueried, blockHash) } + // We need to check the errChan after a timeout because when a request + // was successful a nil error is signaled via the errChan and this + // might happen even before the block is received. case <-time.After(5 * time.Second): select { case err := <-errChan: h.t.Fatalf("received unexpected error send: %v", err) + + case err := <-cancelChan: + h.t.Fatalf("received unexpected cancel request with "+ + "error: %v", err) + default: } h.t.Fatal("expected reply from peer") @@ -406,6 +421,38 @@ func (h *prunedBlockDispatcherHarness) assertPeerReplied( } } +// assertPeerFailed asserts that the query request fails with an expected +// error. +func (h *prunedBlockDispatcherHarness) assertPeerFailed( + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error, + expectedErr error) { + + h.t.Helper() + + select { + case <-blockChan: + h.t.Fatalf("expected no reply from peer") + + case err := <-errChan: + require.ErrorIs(h.t, err, expectedErr) + for _, hash := range h.hashes { + h.dispatcher.CancelRequest(*hash, err) + // The corresponding block is deleted from the request + // queue in `CancelRequest` so we delete it from the + // harness as well. + delete(h.blocksQueried, *hash) + } + + case err := <-cancelChan: + require.ErrorIs(h.t, err, expectedErr) + + case <-time.After(5 * time.Second): + h.t.Fatalf("expected the error for the block request: %v", + expectedErr) + } + +} + // assertNoPeerDialed asserts that the PrunedBlockDispatcher hasn't established // a new peer connection. func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() { @@ -420,15 +467,21 @@ func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() { // assertNoReply asserts that the peer hasn't replied to a query. func (h *prunedBlockDispatcherHarness) assertNoReply( - blockChan <-chan *wire.MsgBlock, errChan <-chan error) { + blockChan <-chan *wire.MsgBlock, errChan, cancelChan <-chan error) { h.t.Helper() select { case block := <-blockChan: h.t.Fatalf("received unexpected block %v", block.BlockHash()) + case err := <-errChan: h.t.Fatalf("received unexpected error send: %v", err) + + case err := <-cancelChan: + h.t.Fatalf("received unexpected cancel request with error: %v", + err) + case <-time.After(2 * time.Second): } } @@ -449,16 +502,64 @@ func TestPrunedBlockDispatcherQuerySameBlock(t *testing.T) { // Queue all the block requests one by one. blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests) errChans := make([]<-chan error, 0, numRequests) + cancelChans := make([]<-chan error, 0, numRequests) + + for i := 0; i < numRequests; i++ { + blockChan, errChan, cancelChan := h.query(h.hashes) + blockChans = append(blockChans, blockChan) + errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) + + } + + // We should only see one query. + h.assertPeerQueried() + for i := 0; i < numRequests; i++ { + h.assertPeerReplied(blockChans[i], errChans[i], cancelChans[i], + i == 0) + } +} + +// TestPrunedBlockDispatcherQuerySameBlock tests that client requests for the +// same block result in only fetching the block once while pending. +func TestPrunedBlockDispatcherQueryFailSameBlock(t *testing.T) { + t.Parallel() + + const numBlocks = 1 + const numPeers = 5 + const numRequests = numBlocks * numPeers + + h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers) + h.start() + defer h.stop() + + // Queue all the block requests one by one. + blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests) + errChans := make([]<-chan error, 0, numRequests) + cancelChans := make([]<-chan error, 0, numRequests) + + // We want to force a timeout. + h.disablePeerReplies() + for i := 0; i < numRequests; i++ { - blockChan, errChan := h.query(h.hashes) + // The default retry number is 2 and is defined in the neutrino + // package. We want to fail the request therefore we use 1. + // Moreover the default timeout of a single request is 2 seconds + // and currently not configurable so we have to make sure when + // asserting not not timeout before. + blockChan, errChan, cancelChan := h.query( + h.hashes, query.NumRetries(1), + ) blockChans = append(blockChans, blockChan) errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) } // We should only see one query. h.assertPeerQueried() for i := 0; i < numRequests; i++ { - h.assertPeerReplied(blockChans[i], errChans[i], i == 0) + h.assertPeerFailed(blockChans[i], errChans[i], cancelChans[i], + query.ErrQueryTimeout) } } @@ -476,7 +577,7 @@ func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) { defer h.stop() // Request all blocks. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) // Since we have more blocks than can fit in a single GetData message, // we should expect multiple queries. For each query, we should expect @@ -491,7 +592,8 @@ func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) { for j := 0; j < maxRequestInvs; j++ { expectCompletionSignal := blocksRecvd == numBlocks-1 h.assertPeerReplied( - blockChan, errChan, expectCompletionSignal, + blockChan, errChan, cancelChan, + expectCompletionSignal, ) blocksRecvd++ @@ -517,16 +619,21 @@ func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) { // Queue all the block requests one by one. blockChans := make([]<-chan *wire.MsgBlock, 0, numBlocks) errChans := make([]<-chan error, 0, numBlocks) + cancelChans := make([]<-chan error, 0, numBlocks) + for i := 0; i < numBlocks; i++ { - blockChan, errChan := h.query(h.hashes[i : i+1]) + blockChan, errChan, cancelChan := h.query(h.hashes[i : i+1]) blockChans = append(blockChans, blockChan) errChans = append(errChans, errChan) + cancelChans = append(cancelChans, cancelChan) + } // We should see one query per block. for i := 0; i < numBlocks; i++ { h.assertPeerQueried() - h.assertPeerReplied(blockChans[i], errChans[i], true) + h.assertPeerReplied(blockChans[i], errChans[i], cancelChans[i], + true) } } @@ -544,7 +651,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { h.assertNoPeerDialed() // We'll then query for a block. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) // Refresh our peers. This would dial some peers, but we don't have any // yet. @@ -563,7 +670,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { // Disconnect our peer and re-enable replies. h.disconnectPeer(peer, false) h.enablePeerReplies() - h.assertNoReply(blockChan, errChan) + h.assertNoReply(blockChan, errChan, cancelChan) // Force a refresh once again. Since the peer has disconnected, a new // connection should be made and the peer should be queried again. @@ -579,7 +686,7 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { // Now that we know we've connected to the peer, we should be able to // receive their response. - h.assertPeerReplied(blockChan, errChan, true) + h.assertPeerReplied(blockChan, errChan, cancelChan, true) } // TestPrunedBlockDispatcherInvalidBlock ensures that validation is performed on @@ -597,9 +704,9 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { // We'll then query for a block. We shouldn't see a response as the // block should have failed validation. - blockChan, errChan := h.query(h.hashes) + blockChan, errChan, cancelChan := h.query(h.hashes) h.assertPeerQueried() - h.assertNoReply(blockChan, errChan) + h.assertNoReply(blockChan, errChan, cancelChan) // Since the peer sent us an invalid block, they should have been // disconnected and banned. Refreshing our peers shouldn't result in a @@ -618,7 +725,7 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { h.refreshPeers() h.assertPeerDialed() h.assertPeerQueried() - h.assertPeerReplied(blockChan, errChan, true) + h.assertPeerReplied(blockChan, errChan, cancelChan, true) } func TestSatisfiesRequiredServices(t *testing.T) { diff --git a/spv/blockmanager.go b/spv/blockmanager.go index 6135c7b34b..6a51e9e286 100644 --- a/spv/blockmanager.go +++ b/spv/blockmanager.go @@ -1051,7 +1051,7 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, // Hand the queries to the work manager, and consume the verified // responses as they come back. errChan := b.cfg.QueryDispatcher.Query( - q.requests(), query.Cancel(b.quit), + q.requests(), query.Cancel(b.quit), query.NoRetryMax(), ) // Keep waiting for more headers as long as we haven't received an diff --git a/spv/query/interface.go b/spv/query/interface.go index 0564c8287d..ed3729b272 100644 --- a/spv/query/interface.go +++ b/spv/query/interface.go @@ -14,6 +14,10 @@ const ( // defaultQueryEncoding specifies the default encoding (witness or not) // for `getdata` and other similar messages. defaultQueryEncoding = wire.WitnessEncoding + + // defaultNumRetries is the default number of times that a query job + // will be retried. + defaultNumRetries = 2 ) // queries are a set of options that can be modified per-query, unlike global @@ -30,6 +34,15 @@ type queryOptions struct { // cancelChan is an optional channel that can be closed to indicate // that the query should be canceled. cancelChan chan struct{} + + // numRetries tells the query how many times to retry asking each peer + // the query. + numRetries uint8 + + // noRetryMax is set if no cap should be applied to the number of times + // that a query can be retried. If this is set then numRetries has no + // effect. + noRetryMax bool } // QueryOption is a functional option argument to any of the network query @@ -43,6 +56,7 @@ func defaultQueryOptions() *queryOptions { return &queryOptions{ timeout: defaultQueryTimeout, encoding: defaultQueryEncoding, + numRetries: defaultNumRetries, cancelChan: nil, } } @@ -62,6 +76,22 @@ func Timeout(timeout time.Duration) QueryOption { } } +// NumRetries is a query option that lets the query know the maximum number of +// times each peer should be queried. The default is one. +func NumRetries(numRetries uint8) QueryOption { + return func(qo *queryOptions) { + qo.numRetries = numRetries + } +} + +// NoRetryMax is a query option that can be used to disable the cap on the +// number of retries. If this is set then NumRetries has no effect. +func NoRetryMax() QueryOption { + return func(qo *queryOptions) { + qo.noRetryMax = true + } +} + // Encoding is a query option that allows the caller to set a message encoding // for the query messages. func Encoding(encoding wire.MessageEncoding) QueryOption { diff --git a/spv/query/worker.go b/spv/query/worker.go index feb3d764cc..496d710891 100644 --- a/spv/query/worker.go +++ b/spv/query/worker.go @@ -24,6 +24,7 @@ var ( // queryJob is the internal struct that wraps the Query to work on, in // addition to some information about the query. type queryJob struct { + tries uint8 index uint64 timeout time.Duration encoding wire.MessageEncoding diff --git a/spv/query/workmanager.go b/spv/query/workmanager.go index 4fd3e1c0b9..bbe07fb64f 100644 --- a/spv/query/workmanager.go +++ b/spv/query/workmanager.go @@ -176,9 +176,11 @@ func (w *peerWorkManager) workDispatcher() { heap.Init(work) type batchProgress struct { - timeout <-chan time.Time - rem int - errChan chan error + noRetryMax bool + maxRetries uint8 + timeout <-chan time.Time + rem int + errChan chan error } // We set up a batch index counter to keep track of batches that still @@ -330,11 +332,40 @@ Loop: } // If the query ended with any other error, put it back - // into the work queue. + // into the work queue if it has not reached the + // maximum number of retries. case result.err != nil: // Punish the peer for the failed query. w.cfg.Ranking.Punish(result.peer.Addr()) + if batch != nil && !batch.noRetryMax { + result.job.tries++ + } + + // Check if this query has reached its maximum + // number of retries. If so, remove it from the + // batch and don't reschedule it. + if batch != nil && !batch.noRetryMax && + result.job.tries >= batch.maxRetries { + + log.Warnf("Query(%d) from peer %v "+ + "failed and reached maximum "+ + "number of retries, not "+ + "rescheduling: %v", + result.job.index, + result.peer.Addr(), result.err) + + // Return the error and cancel the + // batch. + batch.errChan <- result.err + delete(currentBatches, batchNum) + + log.Debugf("Canceled batch %v", + batchNum) + + continue Loop + } + log.Warnf("Query(%d) from peer %v failed, "+ "rescheduling: %v", result.job.index, result.peer.Addr(), result.err) @@ -421,9 +452,11 @@ Loop: } currentBatches[batchIndex] = &batchProgress{ - timeout: time.After(batch.options.timeout), - rem: len(batch.requests), - errChan: batch.errChan, + noRetryMax: batch.options.noRetryMax, + maxRetries: batch.options.numRetries, + timeout: time.After(batch.options.timeout), + rem: len(batch.requests), + errChan: batch.errChan, } batchIndex++