diff --git a/src/daemon.rs b/src/daemon.rs index 0be3a8ff..b794a1f9 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -387,19 +387,46 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request_batch( + &self, + method: &str, + params_list: &[Value], + failure_threshold: f64, + ) -> Result> { let id = self.message_id.next(); let chunks = params_list .iter() .map(|params| json!({"method": method, "params": params, "id": id})) .chunks(50_000); // Max Amount of batched requests let mut results = vec![]; + let total_requests = params_list.len(); + let mut failed_requests: u64 = 0; + let threshold = (failure_threshold * total_requests as f64).round() as u64; + let mut n = 0; + for chunk in &chunks { let reqs = chunk.collect(); let mut replies = self.call_jsonrpc(method, &reqs)?; if let Some(replies_vec) = replies.as_array_mut() { for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) + n += 1; + match parse_jsonrpc_reply(reply.take(), method, id) { + Ok(parsed_reply) => results.push(parsed_reply), + Err(e) => { + failed_requests += 1; + warn!( + "batch request {} {}/{} failed: {}", + method, + n, + total_requests, + e.to_string() + ); + // abort and return the last error once a threshold number of requests have failed + if failed_requests > threshold { + return Err(e); + } + } + } } } else { bail!("non-array replies: {:?}", replies); @@ -409,9 +436,14 @@ impl Daemon { Ok(results) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request_batch( + &self, + method: &str, + params_list: &[Value], + failure_threshold: f64, + ) -> Result> { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request_batch(method, params_list, failure_threshold) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -425,13 +457,13 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; + let mut values = self.retry_request_batch(method, &[params], 0.0)?; assert_eq!(values.len(), 1); Ok(values.remove(0)) } fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + self.retry_request_batch(method, params_list, 0.0) } // bitcoind JSONRPC API: @@ -506,13 +538,12 @@ impl Daemon { .iter() .map(|txhash| json!([txhash.to_hex(), /*verbose=*/ false])) .collect(); - - let values = self.requests("getrawtransaction", ¶ms_list)?; + let values = self.retry_request_batch("getrawtransaction", ¶ms_list, 0.25)?; let mut txs = vec![]; for value in values { txs.push(tx_from_value(value)?); } - assert_eq!(txhashes.len(), txs.len()); + // missing transactions are skipped, so the number of txs returned may be less than the number of txids requested Ok(txs) }