Skip to content

Commit

Permalink
Expose a Form::into_stream() method on async multipart forms.
Browse files Browse the repository at this point in the history
The async equivalent of the change in #2524.

An example use case is compressing multipart form data with zstd.
The entire contents of the payload have to be compressed together,
which requires accessing the contents of the Form.

The existing stream functionality mostly implements what we need,
but just isn't publicly accessible. This PR adds a pub method for it.
  • Loading branch information
obi1kenobi committed Jan 9, 2025
1 parent 8b8fdd2 commit 97f0b3a
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions src/async_impl/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,21 @@ impl Form {
}

/// Consume this instance and transform into an instance of Body for use in a request.
pub(crate) fn stream(mut self) -> Body {
pub(crate) fn stream(self) -> Body {
if self.inner.fields.is_empty() {
return Body::empty();
}

Body::stream(self.into_stream())
}

/// Produce a stream of the bytes in this `Form`, consuming it.
pub fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, crate::Error>> + Send + Sync {
if self.inner.fields.is_empty() {
let empty_stream: Pin<Box<dyn Stream<Item = Result<Bytes, crate::Error>> + Send + Sync>> = Box::pin(futures_util::stream::empty());
return empty_stream;
}

// create initial part to init reduce chain
let (name, part) = self.inner.fields.remove(0);
let start = Box::pin(self.part_stream(name, part))
Expand All @@ -161,7 +171,7 @@ impl Form {
let last = stream::once(future::ready(Ok(
format!("--{}--\r\n", self.boundary()).into()
)));
Body::stream(stream.chain(last))
Box::pin(stream.chain(last))
}

/// Generate a hyper::Body stream for a single Part instance of a Form request.
Expand Down

0 comments on commit 97f0b3a

Please sign in to comment.