Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relaxing type bounds on coalesce_ranges and collect_bytes #4787

Merged
merged 1 commit into from
Sep 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions object_store/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub(crate) fn hmac_sha256(
}

/// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk
pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
pub async fn collect_bytes<S, E>(
mut stream: S,
size_hint: Option<usize>,
) -> Result<Bytes, E>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
E: Send,
S: Stream<Item = Result<Bytes, E>> + Send + Unpin,
{
let first = stream.next().await.transpose()?.unwrap_or_default();

Expand Down Expand Up @@ -99,14 +103,15 @@ pub const OBJECT_STORE_COALESCE_PARALLEL: usize = 10;
/// * Combine ranges less than `coalesce` bytes apart into a single call to `fetch`
/// * Make multiple `fetch` requests in parallel (up to maximum of 10)
///
pub async fn coalesce_ranges<F, Fut>(
pub async fn coalesce_ranges<F, E, Fut>(
ranges: &[std::ops::Range<usize>],
fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
) -> Result<Vec<Bytes>, E>
where
F: Send + FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>> + Send,
E: Send,
Fut: std::future::Future<Output = Result<Bytes, E>> + Send,
{
let fetch_ranges = merge_ranges(ranges, coalesce);

Expand Down Expand Up @@ -173,6 +178,8 @@ fn merge_ranges(

#[cfg(test)]
mod tests {
use crate::Error;

use super::*;
use rand::{thread_rng, Rng};
use std::ops::Range;
Expand All @@ -185,7 +192,7 @@ mod tests {
let src: Vec<_> = (0..max).map(|x| x as u8).collect();

let mut fetches = vec![];
let coalesced = coalesce_ranges(
let coalesced = coalesce_ranges::<_, Error, _>(
&ranges,
|range| {
fetches.push(range.clone());
Expand Down
Loading