Skip to content

Commit

Permalink
refactor: multipart body without Box
Browse files Browse the repository at this point in the history
  • Loading branch information
nwtgck committed Jan 17, 2024
1 parent ac5d828 commit c278b16
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions src/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,21 @@ struct DataSender {
res_body_tx: futures::channel::mpsc::Sender<Result<http_body::Frame<Bytes>, anyhow::Error>>,
}

#[derive(Debug)]
type IncomingMapErrBody =
http_body_util::combinators::MapErr<hyper::body::Incoming, fn(hyper::Error) -> anyhow::Error>;

type BodyStreamNewMapToBytesStream = futures::stream::Map<
http_body_util::BodyStream<IncomingMapErrBody>,
fn(anyhow::Result<hyper::body::Frame<Bytes>>) -> anyhow::Result<Bytes>,
>;

type MultipartFieldMapToFrameStream = futures::stream::Map<
mpart_async::server::MultipartField<BodyStreamNewMapToBytesStream, anyhow::Error>,
fn(
Result<Bytes, mpart_async::server::MultipartError>,
) -> anyhow::Result<hyper::body::Frame<Bytes>>,
>;

#[auto_enums::enum_derive(http_body1::Body)]
enum TransferRequestBody {
Incoming(
Expand All @@ -42,8 +56,8 @@ enum TransferRequestBody {
fn(hyper::Error) -> anyhow::Error,
>,
),
// only for multipart for now
// TODO: not use Box but type is super complex
Multipart(http_body_util::StreamBody<MultipartFieldMapToFrameStream>),
#[allow(dead_code)]
Box(http_body_util::combinators::BoxBody<Bytes, anyhow::Error>),
}

Expand Down Expand Up @@ -404,7 +418,6 @@ impl PipingServer {
}
}

#[derive(Debug)]
struct TransferRequest {
content_type: Option<http::HeaderValue>,
content_length: Option<http::HeaderValue>,
Expand Down Expand Up @@ -447,13 +460,13 @@ async fn get_transfer_request(
if mime_type.essence_str() != "multipart/form-data" {
return Ok(TransferRequest::from_hyper_incoming(headers, body));
}
let body = body.map_err(|e| e.into());
let body: IncomingMapErrBody = body.map_err(|e| e.into());
let boundary = mime_type
.get_param("boundary")
.map(|b| b.to_string())
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "boundary not found"))?;

let body_stream = http_body_util::BodyStream::new(body).map(
let body_stream: BodyStreamNewMapToBytesStream = http_body_util::BodyStream::new(body).map(
|result: Result<hyper::body::Frame<Bytes>, anyhow::Error>| {
let b: Bytes = result?
.into_data()
Expand All @@ -466,7 +479,7 @@ async fn get_transfer_request(
while let Ok(Some(field)) = multipart_stream.try_next().await {
// NOTE: Only first one is transferred
let headers = field.headers().clone();
let frame_stream = field.map(|result| {
let frame_stream: MultipartFieldMapToFrameStream = field.map(|result| {
result
.map(|b| hyper::body::Frame::data(b))
.map_err(|e| e.into())
Expand All @@ -475,9 +488,7 @@ async fn get_transfer_request(
content_type: headers.get("content-type").cloned(),
content_length: headers.get("content-length").cloned(),
content_disposition: headers.get("content-disposition").cloned(),
body: TransferRequestBody::Box(http_body_util::BodyExt::boxed(
http_body_util::StreamBody::new(frame_stream),
)),
body: TransferRequestBody::Multipart(http_body_util::StreamBody::new(frame_stream)),
});
}
anyhow::bail!("multipart error")
Expand Down

0 comments on commit c278b16

Please sign in to comment.