Skip to content

Commit

Permalink
Relaxing type bounds on coalesce_ranges and collect_bytes
Browse files Browse the repository at this point in the history
to allow using them with a wider range of Error types.
  • Loading branch information
sumerman committed Sep 7, 2023
1 parent 6fdbc26 commit 60b9e85
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use super::Result;
use bytes::Bytes;
use futures::{stream::StreamExt, Stream, TryStreamExt};

use std::result::Result as StdResult;

#[cfg(any(feature = "azure", feature = "http"))]
pub static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";

Expand Down Expand Up @@ -47,9 +49,10 @@ pub(crate) fn hmac_sha256(
}

/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
pub async fn collect_bytes<S, E>(mut stream: S, size_hint: Option<usize>) -> StdResult<Bytes, E>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
E: Send,
S: Stream<Item = StdResult<Bytes, E>> + Send + Unpin,
{
let first = stream.next().await.transpose()?.unwrap_or_default();

Expand Down Expand Up @@ -99,14 +102,15 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
///
pub async fn coalesce_ranges<F, Fut>(
pub async fn coalesce_ranges<F, E, Fut>(
ranges: &[std::ops::Range<usize>],
fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
) -> StdResult<Vec<Bytes>, E>
where
F: Send + FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>> + Send,
E: Send,
Fut: std::future::Future<Output = StdResult<Bytes, E>> + Send,
{
let fetch_ranges = merge_ranges(ranges, coalesce);

Expand Down Expand Up @@ -173,6 +177,8 @@ fn merge_ranges(

#[cfg(test)]
mod tests {
use crate::Error;

use super::*;
use rand::{thread_rng, Rng};
use std::ops::Range;
Expand All @@ -185,7 +191,7 @@ mod tests {
let src: Vec<_> = (0..max).map(|x| x as u8).collect();

let mut fetches = vec![];
let coalesced = coalesce_ranges(
let coalesced = coalesce_ranges::<_, Error, _>(
&ranges,
|range| {
fetches.push(range.clone());
Expand Down

0 comments on commit 60b9e85

Please sign in to comment.