Skip to content

Commit

Permalink
Merge pull request mempool#64 from mempool/mononaut/relaxed-mempool-b…
Browse files Browse the repository at this point in the history
…atching

Relax error propagation for batch loading mempool txs
  • Loading branch information
wiz authored Nov 13, 2023
2 parents 6d5aa98 + 77732ae commit ca05b2d
Showing 1 changed file with 40 additions and 9 deletions.
49 changes: 40 additions & 9 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,19 +387,46 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request_batch(
&self,
method: &str,
params_list: &[Value],
failure_threshold: f64,
) -> Result<Vec<Value>> {
let id = self.message_id.next();
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
let total_requests = params_list.len();
let mut failed_requests: u64 = 0;
let threshold = (failure_threshold * total_requests as f64).round() as u64;
let mut n = 0;

for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
n += 1;
match parse_jsonrpc_reply(reply.take(), method, id) {
Ok(parsed_reply) => results.push(parsed_reply),
Err(e) => {
failed_requests += 1;
warn!(
"batch request {} {}/{} failed: {}",
method,
n,
total_requests,
e.to_string()
);
// abort and return the last error once a threshold number of requests have failed
if failed_requests > threshold {
return Err(e);
}
}
}
}
} else {
bail!("non-array replies: {:?}", replies);
Expand All @@ -409,9 +436,14 @@ impl Daemon {
Ok(results)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request_batch(
&self,
method: &str,
params_list: &[Value],
failure_threshold: f64,
) -> Result<Vec<Value>> {
loop {
match self.handle_request_batch(method, params_list) {
match self.handle_request_batch(method, params_list, failure_threshold) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
self.signal.wait(Duration::from_secs(3), false)?;
Expand All @@ -425,13 +457,13 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
let mut values = self.retry_request_batch(method, &[params], 0.0)?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
}

fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
self.retry_request_batch(method, params_list, 0.0)
}

// bitcoind JSONRPC API:
Expand Down Expand Up @@ -506,13 +538,12 @@ impl Daemon {
.iter()
.map(|txhash| json!([txhash.to_hex(), /*verbose=*/ false]))
.collect();

let values = self.requests("getrawtransaction", &params_list)?;
let values = self.retry_request_batch("getrawtransaction", &params_list, 0.25)?;
let mut txs = vec![];
for value in values {
txs.push(tx_from_value(value)?);
}
assert_eq!(txhashes.len(), txs.len());
// missing transactions are skipped, so the number of txs returned may be less than the number of txids requested
Ok(txs)
}

Expand Down

0 comments on commit ca05b2d

Please sign in to comment.