Skip to content

Commit

Permalink
Expose a Form::into_stream() method on async multipart forms.
Browse files Browse the repository at this point in the history
The async equivalent of the change in #2524.

An example use case is compressing multipart form data with zstd.
The entire contents of the payload have to be compressed together,
which requires accessing the contents of the Form.

The existing stream functionality mostly implements what we need,
but just isn't publicly accessible. This PR adds a pub method for it.
  • Loading branch information
obi1kenobi committed Jan 9, 2025
1 parent 8b8fdd2 commit 4f947ae
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/async_impl/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,23 @@ impl Form {
}

/// Consume this instance and transform into an instance of Body for use in a request.
pub(crate) fn stream(mut self) -> Body {
pub(crate) fn stream(self) -> Body {
if self.inner.fields.is_empty() {
return Body::empty();
}

Body::stream(self.into_stream())
}

/// Produce a stream of the bytes in this `Form`, consuming it.
pub fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, crate::Error>> + Send + Sync {
if self.inner.fields.is_empty() {
let empty_stream: Pin<
Box<dyn Stream<Item = Result<Bytes, crate::Error>> + Send + Sync>,
> = Box::pin(futures_util::stream::empty());
return empty_stream;
}

// create initial part to init reduce chain
let (name, part) = self.inner.fields.remove(0);
let start = Box::pin(self.part_stream(name, part))
Expand All @@ -161,7 +173,7 @@ impl Form {
let last = stream::once(future::ready(Ok(
format!("--{}--\r\n", self.boundary()).into()
)));
Body::stream(stream.chain(last))
Box::pin(stream.chain(last))
}

/// Generate a hyper::Body stream for a single Part instance of a Form request.
Expand Down

0 comments on commit 4f947ae

Please sign in to comment.