Skip to content

Commit

Permalink
Relaxing type bounds on coalesce_ranges and collect_bytes
Browse files Browse the repository at this point in the history
to allow using them with a wider range of Error types.
  • Loading branch information
sumerman committed Sep 7, 2023
1 parent 6fdbc26 commit 87d87e5
Showing 1 changed file with 13 additions and 6 deletions.
19 changes: 13 additions & 6 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub(crate) fn hmac_sha256(
}

/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
pub async fn collect_bytes<S, E>(
mut stream: S,
size_hint: Option<usize>,
) -> Result<Bytes, E>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
E: Send,
S: Stream<Item = Result<Bytes, E>> + Send + Unpin,
{
let first = stream.next().await.transpose()?.unwrap_or_default();

Expand Down Expand Up @@ -99,14 +103,15 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
///
pub async fn coalesce_ranges<F, Fut>(
pub async fn coalesce_ranges<F, E, Fut>(
ranges: &[std::ops::Range<usize>],
fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
) -> Result<Vec<Bytes>, E>
where
F: Send + FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>> + Send,
E: Send,
Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
{
let fetch_ranges = merge_ranges(ranges, coalesce);

Expand Down Expand Up @@ -173,6 +178,8 @@ fn merge_ranges(

#[cfg(test)]
mod tests {
use crate::Error;

use super::*;
use rand::{thread_rng, Rng};
use std::ops::Range;
Expand All @@ -185,7 +192,7 @@ mod tests {
let src: Vec<_> = (0..max).map(|x| x as u8).collect();

let mut fetches = vec![];
let coalesced = coalesce_ranges(
let coalesced = coalesce_ranges::<_, Error, _>(
&ranges,
|range| {
fetches.push(range.clone());
Expand Down

0 comments on commit 87d87e5

Please sign in to comment.