From 49b5b5f6740576d4a0fe2eebed597d55f38501d6 Mon Sep 17 00:00:00 2001 From: Tanner Rogalsky Date: Wed, 29 Nov 2023 14:52:57 +0000 Subject: [PATCH 1/2] Update Axum example to use axum 0.7 and hyper 1. --- examples/axum/Cargo.toml | 10 +++- examples/axum/src/main.rs | 114 ++++++++++++++++++++++---------------- 2 files changed, 74 insertions(+), 50 deletions(-) diff --git a/examples/axum/Cargo.toml b/examples/axum/Cargo.toml index fb37d7d..2b3dc33 100644 --- a/examples/axum/Cargo.toml +++ b/examples/axum/Cargo.toml @@ -5,11 +5,15 @@ edition = "2021" publish = false [dependencies] -axum = "0.6" -hyper = { version = "0.14", features = ["full"] } -async-stream = "0.3" +axum = "0.7" +hyper = { version = "1", features = ["full"] } +hyper-util = { git = "https://github.com/hyperium/hyper-util.git", features = [ + "full", +] } +http-body-util = "0.1" turmoil = { path = "../.." } tracing = "0.1" tracing-subscriber = "0.3" tokio = "1" tower = "0.4" +pin-project-lite = "0.2" diff --git a/examples/axum/src/main.rs b/examples/axum/src/main.rs index 0b5e43b..2d2a3e0 100644 --- a/examples/axum/src/main.rs +++ b/examples/axum/src/main.rs @@ -1,12 +1,7 @@ -use axum::extract::Path; -use axum::response::Response; -use axum::routing::get; -use axum::Router; -use axum::{body::Body, http::Request}; -use hyper::server::accept::from_stream; -use hyper::{Client, Server, Uri}; +use axum::{body::Body, extract::Path, http::Request, routing::get, Router}; +use http_body_util::BodyExt as _; +use hyper_util::{client::legacy::Client, rt::TokioExecutor}; use std::net::{IpAddr, Ipv4Addr}; -use tower::make::Shared; use tracing::{info_span, Instrument}; use turmoil::{net, Builder}; @@ -29,15 +24,27 @@ fn main() { sim.host("server", move || { let router = router.clone(); async move { - Server::builder(from_stream(async_stream::stream! { - let listener = net::TcpListener::bind(addr).await?; - loop { - yield listener.accept().await.map(|(s, _)| s); + let listener = net::TcpListener::bind(addr).await?; + loop { + let (tcp_stream, _remote_addr) = listener.accept().await?; + let tcp_stream = hyper_util::rt::TokioIo::new(tcp_stream); + + let hyper_service = hyper_util::service::TowerToHyperService::new(router.clone()); + + let result = hyper_util::server::conn::auto::Builder::new( + hyper_util::rt::TokioExecutor::new(), + ) + .serve_connection_with_upgrades(tcp_stream, hyper_service) + .await; + if result.is_err() { + // This error only appears when the client doesn't send a request and + // terminate the connection. + // + // If client sends one request then terminate connection whenever, it doesn't + // appear. + break; } - })) - .serve(Shared::new(router)) - .await - .unwrap(); + } Ok(()) } @@ -47,15 +54,15 @@ fn main() { sim.client( "client", async move { - let client = Client::builder().build(connector::connector()); + let client = Client::builder(TokioExecutor::new()).build(connector::connector()); let mut request = Request::new(Body::empty()); - *request.uri_mut() = Uri::from_static("http://server:9999/greet/foo"); + *request.uri_mut() = hyper::Uri::from_static("http://server:9999/greet/foo"); let res = client.request(request).await?; let (parts, body) = res.into_parts(); - let body = hyper::body::to_bytes(body).await?; - let res = Response::from_parts(parts, body); + let body = body.collect().await?.to_bytes(); + let res = hyper::Response::from_parts(parts, body); tracing::info!("Got response: {:?}", res); @@ -68,68 +75,81 @@ fn main() { } mod connector { - use std::{future::Future, pin::Pin}; - - use hyper::{ - client::connect::{Connected, Connection}, - Uri, - }; - use tokio::io::{AsyncRead, AsyncWrite}; + use hyper::Uri; + use pin_project_lite::pin_project; + use std::{future::Future, io::Error, pin::Pin}; + use tokio::io::AsyncWrite; use tower::Service; use turmoil::net::TcpStream; - type Fut = Pin> + Send>>; + type Fut = Pin> + Send>>; pub fn connector( - ) -> impl Service + Clone - { + ) -> impl Service + Clone { tower::service_fn(|uri: Uri| { Box::pin(async move { let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?; - Ok::<_, std::io::Error>(TurmoilConnection(conn)) + Ok::<_, Error>(TurmoilConnection { fut: conn }) }) as Fut }) } - pub struct TurmoilConnection(turmoil::net::TcpStream); + pin_project! { + pub struct TurmoilConnection{ + #[pin] + fut: turmoil::net::TcpStream + } + } - impl AsyncRead for TurmoilConnection { + impl hyper::rt::Read for TurmoilConnection { fn poll_read( - mut self: std::pin::Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> std::task::Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + let result = tokio::io::AsyncRead::poll_read(self.project().fut, cx, &mut tbuf); + match result { + std::task::Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + std::task::Poll::Ready(Ok(())) } } - impl AsyncWrite for TurmoilConnection { + impl hyper::rt::Write for TurmoilConnection { fn poll_write( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) + ) -> std::task::Poll> { + Pin::new(&mut self.fut).poll_write(cx, buf) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_flush(cx) + ) -> std::task::Poll> { + Pin::new(&mut self.fut).poll_flush(cx) } fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) + ) -> std::task::Poll> { + Pin::new(&mut self.fut).poll_shutdown(cx) } } - impl Connection for TurmoilConnection { - fn connected(&self) -> hyper::client::connect::Connected { - Connected::new() + impl hyper_util::client::legacy::connect::Connection for TurmoilConnection { + fn connected(&self) -> hyper_util::client::legacy::connect::Connected { + hyper_util::client::legacy::connect::Connected::new() } } } From 74e015bc9aced1900995d3c9ff8d06e07f7b575d Mon Sep 17 00:00:00 2001 From: Tanner Rogalsky Date: Thu, 21 Dec 2023 18:40:19 +0000 Subject: [PATCH 2/2] update hyper-util dependency --- examples/axum/Cargo.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/axum/Cargo.toml b/examples/axum/Cargo.toml index 2b3dc33..a02a379 100644 --- a/examples/axum/Cargo.toml +++ b/examples/axum/Cargo.toml @@ -7,9 +7,7 @@ publish = false [dependencies] axum = "0.7" hyper = { version = "1", features = ["full"] } -hyper-util = { git = "https://github.com/hyperium/hyper-util.git", features = [ - "full", -] } +hyper-util = { version = "0.1.2", features = ["full"] } http-body-util = "0.1" turmoil = { path = "../.." } tracing = "0.1"