Skip to content

Commit

Permalink
Merge branch 'main' into kill_moar
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminsavage committed May 13, 2024
2 parents 5600301 + a6924d8 commit 393eb7f
Show file tree
Hide file tree
Showing 32 changed files with 860 additions and 1,025 deletions.
2 changes: 1 addition & 1 deletion ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ ipa-macros = { version = "*", path = "../ipa-macros" }
aes = "0.8.3"
async-trait = "0.1.79"
async-scoped = { version = "0.9.0", features = ["use-tokio"], optional = true }
axum = { version = "0.5.17", optional = true, features = ["http2"] }
axum = { version = "0.6", optional = true, features = ["http2", "macros"] }
# The following is a temporary version until we can stabilize the build on a higher version
# of axum, rustls and the http stack.
axum-server = { git = "https://github.com/cberkhoff/axum-server/", branch = "rustls-0.23", version = "0.5.3", optional = true, features = [
Expand Down
34 changes: 20 additions & 14 deletions ipa-core/src/helpers/transport/stream/axum_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use std::{
task::{Context, Poll},
};

use axum::extract::{BodyStream, FromRequest, RequestParts};
use bytes::Bytes;
#[cfg(feature = "real-world-infra")]
use axum::RequestExt;
use axum::{
extract::{BodyStream, FromRequest},
http::Request,
};
use futures::{Stream, TryStreamExt};
use hyper::Body;
use pin_project::pin_project;
Expand All @@ -13,6 +17,8 @@ use crate::error::BoxError;

type AxumInner = futures::stream::MapErr<BodyStream, fn(axum::Error) -> crate::error::BoxError>;

/// This struct is a simple wrapper so that both in-memory-infra and real-world-infra have a
/// unified interface for streams consumed by transport layer.
#[pin_project]
pub struct WrappedAxumBodyStream(#[pin] AxumInner);

Expand All @@ -32,7 +38,7 @@ impl WrappedAxumBodyStream {

#[must_use]
pub fn empty() -> Self {
Self::from_body(Bytes::new())
Self::from_body(Body::empty())
}
}

Expand Down Expand Up @@ -61,12 +67,10 @@ impl WrappedAxumBodyStream {
// `BodyStream` never blocks, and it's not clear why it would need to, so it seems safe to
// resolve the future with `now_or_never`.
Self::new_internal(
futures::FutureExt::now_or_never(BodyStream::from_request(&mut RequestParts::new(
hyper::Request::builder()
.uri("/ignored")
.body(body.into())
.unwrap(),
)))
futures::FutureExt::now_or_never(BodyStream::from_request(
Request::builder().body(body.into()).unwrap(),
&(),
))
.unwrap()
.unwrap(),
)
Expand All @@ -75,12 +79,14 @@ impl WrappedAxumBodyStream {

#[cfg(feature = "real-world-infra")]
#[async_trait::async_trait]
impl<B: hyper::body::HttpBody<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static>
FromRequest<B> for WrappedAxumBodyStream
impl<
S: Send + Sync,
B: hyper::body::HttpBody<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
> FromRequest<S, B> for WrappedAxumBodyStream
{
type Rejection = <BodyStream as FromRequest<B>>::Rejection;
type Rejection = <BodyStream as FromRequest<S, B>>::Rejection;

async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
Ok(Self::new_internal(req.extract::<BodyStream>().await?))
async fn from_request(req: hyper::Request<B>, _state: &S) -> Result<Self, Self::Rejection> {
Ok(Self::new_internal(req.extract::<BodyStream, _>().await?))
}
}
14 changes: 8 additions & 6 deletions ipa-core/src/helpers/transport/stream/box_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{
task::{Context, Poll},
};

#[cfg(all(feature = "in-memory-infra", feature = "web-app"))]
use axum::RequestExt;
use bytes::Bytes;
use futures::{stream::StreamExt, Stream};

Expand Down Expand Up @@ -52,14 +54,14 @@ impl<Buf: Into<bytes::Bytes>> From<Buf> for WrappedBoxBodyStream {

#[cfg(all(feature = "in-memory-infra", feature = "web-app"))]
#[async_trait::async_trait]
impl<B: hyper::body::HttpBody<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static>
axum::extract::FromRequest<B> for WrappedBoxBodyStream
impl<
S: Send + Sync,
B: hyper::body::HttpBody<Data = bytes::Bytes, Error = hyper::Error> + Send + 'static,
> axum::extract::FromRequest<S, B> for WrappedBoxBodyStream
{
type Rejection = <axum::extract::BodyStream as axum::extract::FromRequest<B>>::Rejection;
type Rejection = <axum::extract::BodyStream as axum::extract::FromRequest<S, B>>::Rejection;

async fn from_request(
req: &mut axum::extract::RequestParts<B>,
) -> Result<Self, Self::Rejection> {
async fn from_request(req: hyper::Request<B>, _state: &S) -> Result<Self, Self::Rejection> {
Ok(Self::new(req.extract().await?))
}
}
3 changes: 0 additions & 3 deletions ipa-core/src/net/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ pub enum Error {
#[error("bad path: {0}")]
BadPathString(#[source] BoxError),
#[error(transparent)]
BodyAlreadyExtracted(#[from] axum::extract::rejection::BodyAlreadyExtracted),
#[error(transparent)]
MissingExtension(#[from] axum::extract::rejection::ExtensionRejection),
#[error("query id not found: {}", .0.as_ref())]
QueryIdNotFound(QueryId),
Expand Down Expand Up @@ -151,7 +149,6 @@ impl IntoResponse for Error {
| Self::HyperHttpPassthrough(_)
| Self::FailedHttpRequest { .. }
| Self::InvalidUri(_)
| Self::BodyAlreadyExtracted(_)
| Self::MissingExtension(_) => StatusCode::INTERNAL_SERVER_ERROR,

Self::Application { code, .. } => code,
Expand Down
Loading

0 comments on commit 393eb7f

Please sign in to comment.