diff --git a/object_store/src/util.rs b/object_store/src/util.rs index 25b0fc343d31..764582a67f95 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -47,9 +47,13 @@ pub(crate) fn hmac_sha256( } /// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk -pub async fn collect_bytes(mut stream: S, size_hint: Option) -> Result +pub async fn collect_bytes( + mut stream: S, + size_hint: Option, +) -> Result where - S: Stream> + Send + Unpin, + E: Send, + S: Stream> + Send + Unpin, { let first = stream.next().await.transpose()?.unwrap_or_default(); @@ -99,14 +103,15 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10; /// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch` /// * Make multiple `fetch` requests in parallel (up to maximum of 10) /// -pub async fn coalesce_ranges( +pub async fn coalesce_ranges( ranges: &[std::ops::Range], fetch: F, coalesce: usize, -) -> Result> +) -> Result, E> where F: Send + FnMut(std::ops::Range) -> Fut, - Fut: std::future::Future> + Send, + E: Send, + Fut: std::future::Future> + Send, { let fetch_ranges = merge_ranges(ranges, coalesce); @@ -173,6 +178,8 @@ fn merge_ranges( #[cfg(test)] mod tests { + use crate::Error; + use super::*; use rand::{thread_rng, Rng}; use std::ops::Range; @@ -185,7 +192,7 @@ mod tests { let src: Vec<_> = (0..max).map(|x| x as u8).collect(); let mut fetches = vec![]; - let coalesced = coalesce_ranges( + let coalesced = coalesce_ranges::<_, Error, _>( &ranges, |range| { fetches.push(range.clone());