From 19ab22ef9db6927bfa15f654f06c355dd2c80a8a Mon Sep 17 00:00:00 2001 From: glendc Date: Tue, 28 Nov 2023 15:21:29 +0100 Subject: [PATCH 1/3] init refactor of tokio usage into rama-rt mostly works except for the rama-rt-macros part, seems we are still missing something there... it's a fork of tokio-macros, which we can probably cleanup, as we always assume a specific setting (multithreaded for now), but anyway... something for later... --- Cargo.lock | 81 +-- Cargo.toml | 11 +- examples/tokio_tcp_echo_server.rs | 6 +- examples/tokio_tcp_hello.rs | 4 +- examples/tokio_tcp_http_hello.rs | 16 +- examples/tokio_tls_proxy.rs | 15 +- rama-book/book.toml | 2 +- rama-book/src/preface.md | 2 +- rama-cli/Cargo.toml | 1 + rama-rt-macros/Cargo.toml | 27 + rama-rt-macros/README.md | 61 ++ rama-rt-macros/src/entry.rs | 591 ++++++++++++++++++ rama-rt-macros/src/lib.rs | 484 ++++++++++++++ rama-rt-macros/src/select.rs | 109 ++++ rama-rt/Cargo.toml | 5 + rama-rt/README.md | 62 ++ rama-rt/src/lib.rs | 48 +- rama-rt/src/rt_tokio/mod.rs | 103 +++ .../src/rt_tokio}/tls/mod.rs | 0 rama-rt/src/rt_tokio/tls/rustls.rs | 9 + snippets/README.md | 0 .../proxy/http.rs => snippets/http-proxy.rs | 0 src/client/mod.rs | 1 - src/client/tcp/mod.rs | 10 - src/graceful.rs | 1 - src/{net => }/http/header_value.rs | 2 +- src/{net => }/http/headers/mod.rs | 2 +- .../middleware}/header_config.rs | 13 +- .../http/layer => http/middleware}/mod.rs | 0 src/{net => }/http/mod.rs | 3 + src/{server/tcp/http => http/server}/conn.rs | 75 +-- src/http/server/executor.rs | 39 ++ src/http/server/io.rs | 161 +++++ src/http/server/mod.rs | 8 + src/io.rs | 5 - src/lib.rs | 25 +- src/net/mod.rs | 6 - src/rt.rs | 13 + src/runtime.rs | 1 - src/server/http/mod.rs | 2 - src/server/http/service/mod.rs | 2 - src/server/http/service/router.rs | 19 - src/server/mod.rs | 4 - src/server/proxy/mod.rs | 2 - src/server/tcp/http/mod.rs | 2 - src/server/tls/rustls/mod.rs | 5 - src/service/hyper/service.rs | 6 +- src/service/mod.rs | 2 +- src/service/spawn.rs | 9 +- src/state.rs | 4 +- src/stream/layer/tracker/bytes.rs | 22 +- src/stream/layer/tracker/mod.rs | 2 +- src/stream/mod.rs | 4 +- src/stream/service/echo.rs | 10 +- src/stream/service/forward.rs | 22 +- src/sync.rs | 9 - src/{client/tcp => tcp/client}/forward.rs | 4 +- src/tcp/client/mod.rs | 10 + src/{net/tcp.rs => tcp/mod.rs} | 10 +- src/{server/tcp => tcp/server}/listener.rs | 25 +- src/{server/tcp => tcp/server}/mod.rs | 2 - src/tls/mod.rs | 1 + src/tls/server/mod.rs | 1 + .../service.rs => tls/server/rustls.rs} | 14 +- 64 files changed, 1898 insertions(+), 297 deletions(-) create mode 100644 rama-rt-macros/Cargo.toml create mode 100644 rama-rt-macros/README.md create mode 100644 rama-rt-macros/src/entry.rs create mode 100644 rama-rt-macros/src/lib.rs create mode 100644 rama-rt-macros/src/select.rs create mode 100644 rama-rt/README.md create mode 100644 rama-rt/src/rt_tokio/mod.rs rename {src/server => rama-rt/src/rt_tokio}/tls/mod.rs (100%) create mode 100644 rama-rt/src/rt_tokio/tls/rustls.rs create mode 100644 snippets/README.md rename src/server/proxy/http.rs => snippets/http-proxy.rs (100%) delete mode 100644 src/client/mod.rs delete mode 100644 src/client/tcp/mod.rs delete mode 100644 src/graceful.rs rename src/{net => }/http/header_value.rs (96%) rename src/{net => }/http/headers/mod.rs (97%) rename src/{server/http/layer => http/middleware}/header_config.rs (95%) rename src/{server/http/layer => http/middleware}/mod.rs (100%) rename src/{net => }/http/mod.rs (84%) rename src/{server/tcp/http => http/server}/conn.rs (93%) create mode 100644 src/http/server/executor.rs create mode 100644 src/http/server/io.rs create mode 100644 src/http/server/mod.rs delete mode 100644 src/io.rs delete mode 100644 src/net/mod.rs create mode 100644 src/rt.rs delete mode 100644 src/runtime.rs delete mode 100644 src/server/http/mod.rs delete mode 100644 src/server/http/service/mod.rs delete mode 100644 src/server/http/service/router.rs delete mode 100644 src/server/mod.rs delete mode 100644 src/server/proxy/mod.rs delete mode 100644 src/server/tcp/http/mod.rs delete mode 100644 src/server/tls/rustls/mod.rs delete mode 100644 src/sync.rs rename src/{client/tcp => tcp/client}/forward.rs (94%) create mode 100644 src/tcp/client/mod.rs rename src/{net/tcp.rs => tcp/mod.rs} (94%) rename src/{server/tcp => tcp/server}/listener.rs (94%) rename src/{server/tcp => tcp/server}/mod.rs (83%) create mode 100644 src/tls/mod.rs create mode 100644 src/tls/server/mod.rs rename src/{server/tls/rustls/service.rs => tls/server/rustls.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index e00f9374..3de6b32e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -117,12 +117,6 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.4.1" @@ -596,16 +590,6 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" -[[package]] -name = "lock_api" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.20" @@ -732,29 +716,6 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" -[[package]] -name = "parking_lot" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets", -] - [[package]] name = "pem" version = "3.0.2" @@ -847,15 +808,13 @@ dependencies = [ "pin-project-lite", "rama-hyper", "rama-hyper-util", + "rama-rt", + "rama-rt-macros", "rcgen", "rustls", "rustls-pki-types", "serde", "serde_urlencoded", - "tokio", - "tokio-graceful", - "tokio-rustls", - "tokio-test", "tower-async", "tower-async-http", "tracing", @@ -899,8 +858,6 @@ dependencies = [ "http-body", "pin-project-lite", "rama-hyper", - "socket2", - "tokio", "tower", "tower-service", "tracing", @@ -909,6 +866,22 @@ dependencies = [ [[package]] name = "rama-rt" version = "0.2.0" +dependencies = [ + "tokio", + "tokio-graceful", + "tokio-rustls", + "tokio-test", +] + +[[package]] +name = "rama-rt-macros" +version = "0.2.0" +dependencies = [ + "proc-macro2", + "quote", + "rama", + "syn", +] [[package]] name = "rcgen" @@ -922,15 +895,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "redox_syscall" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "regex" version = "1.10.2" @@ -1059,12 +1023,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "serde" version = "1.0.193" @@ -1230,7 +1188,6 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1349,7 +1306,7 @@ dependencies = [ "async-compression", "async-lock", "base64", - "bitflags 2.4.1", + "bitflags", "bytes", "futures-core", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 5f950ac4..5ca04c79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "rama-cli", "rama-rt"] +members = [".", "rama-cli", "rama-rt", "rama-rt-macros"] [workspace.package] version = "0.2.0" @@ -28,15 +28,14 @@ http-body = "1" http-body-util = "0.1" httparse = "1.8" hyper = { package = "rama-hyper", version = "0.1000001", features = ["http1", "http2", "server"] } -hyper-util = { package = "rama-hyper-util", version = "0.1001", features = ["tokio", "server-auto"] } +hyper-util = { package = "rama-hyper-util", version = "0.1001", features = ["server-auto"] } matchit = "0.4.2" pin-project-lite = "0.2.13" +rama-rt = { version = "0.2", path = "./rama-rt" } +rama-rt-macros = { version = "0.2", path = "./rama-rt-macros" } rustls = "0.22.0-alpha.3" serde = { version = "1.0", features = ["derive"] } serde_urlencoded = "0.7" -tokio = { version = "1.33", features = ["net", "io-util"] } -tokio-graceful = "0.1" -tokio-rustls = "0.25.0-alpha.1" tower-async = { version = "0.2", features = [ "util", "timeout", @@ -50,6 +49,4 @@ tracing = "0.1" futures = "0.3.29" pki-types = { package = "rustls-pki-types", version = "0.2.1" } rcgen = "0.11.3" -tokio = { version = "1.33.0", features = ["full"] } -tokio-test = "0.4.3" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } diff --git a/examples/tokio_tcp_echo_server.rs b/examples/tokio_tcp_echo_server.rs index ebb39bf5..21215c97 100644 --- a/examples/tokio_tcp_echo_server.rs +++ b/examples/tokio_tcp_echo_server.rs @@ -1,18 +1,18 @@ use std::time::Duration; use rama::{ - graceful::Shutdown, - server::tcp::TcpListener, + rt::graceful::Shutdown, service::{limit::ConcurrentPolicy, Layer, Service}, state::Extendable, stream::layer::BytesRWTrackerHandle, stream::service::EchoService, + tcp::server::TcpListener, }; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -#[rama::main] +#[rama::rt::main] async fn main() { tracing_subscriber::registry() .with(fmt::layer()) diff --git a/examples/tokio_tcp_hello.rs b/examples/tokio_tcp_hello.rs index 1efa5ce1..a1971742 100644 --- a/examples/tokio_tcp_hello.rs +++ b/examples/tokio_tcp_hello.rs @@ -1,6 +1,6 @@ -use rama::{server::tcp::TcpListener, stream::AsyncWriteExt}; +use rama::{rt::io::AsyncWriteExt, tcp::server::TcpListener}; -#[rama::main] +#[rama::rt::main] async fn main() { TcpListener::bind("127.0.0.1:9000") .await diff --git a/examples/tokio_tcp_http_hello.rs b/examples/tokio_tcp_http_hello.rs index fd60a23b..b1ee5be1 100644 --- a/examples/tokio_tcp_http_hello.rs +++ b/examples/tokio_tcp_http_hello.rs @@ -1,20 +1,20 @@ use std::{convert::Infallible, time::Duration}; use rama::{ - graceful::Shutdown, - net::http::StatusCode, - net::TcpStream, - server::tcp::http, - server::tcp::TcpListener, + http::server as http, + http::StatusCode, + rt::graceful::Shutdown, service::{limit::ConcurrentPolicy, Layer, Service}, state::Extendable, stream::{layer::BytesRWTrackerHandle, Stream}, + tcp::server::TcpListener, + tcp::TcpStream, }; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -#[rama::main] +#[rama::rt::main] async fn main() { tracing_subscriber::registry() .with(fmt::layer()) @@ -158,11 +158,11 @@ impl WebServer { } async fn render_page_fast(&self) -> Response { - self.render_page(rama::net::http::StatusCode::OK, "This was a fast response.") + self.render_page(StatusCode::OK, "This was a fast response.") } async fn render_page_slow(&self) -> Response { - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + rama::rt::time::sleep(std::time::Duration::from_secs(5)).await; self.render_page(StatusCode::OK, "This was a slow response.") } diff --git a/examples/tokio_tls_proxy.rs b/examples/tokio_tls_proxy.rs index 79782979..4b1ab2b6 100644 --- a/examples/tokio_tls_proxy.rs +++ b/examples/tokio_tls_proxy.rs @@ -1,19 +1,16 @@ use std::time::Duration; use rama::{ - graceful::Shutdown, - server::{ - tcp::TcpListener, - tls::rustls::{RustlsAcceptorLayer, RustlsServerConfig}, - }, - stream::AsyncWriteExt, + rt::{graceful::Shutdown, io::AsyncWriteExt}, + tcp::server::TcpListener, + tls::server::rustls::{RustlsAcceptorLayer, RustlsServerConfig}, }; use pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use tracing::metadata::LevelFilter; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -#[rama::main] +#[rama::rt::main] async fn main() { tracing_subscriber::registry() .with(fmt::layer()) @@ -73,8 +70,8 @@ async fn main() { )) .serve_fn_graceful(guard, |mut stream| async move { let result = async { - let mut target_stream = rama::client::tcp::connect("127.0.0.1:8080").await?; - match rama::io::copy_bidirectional(&mut stream, &mut target_stream).await { + let mut target_stream = rama::tcp::client::connect("127.0.0.1:8080").await?; + match rama::rt::io::copy_bidirectional(&mut stream, &mut target_stream).await { Ok(_) => Ok(()), Err(err) => { if err.kind() == std::io::ErrorKind::ConnectionReset { diff --git a/rama-book/book.toml b/rama-book/book.toml index 00e847a4..49e7e5fb 100644 --- a/rama-book/book.toml +++ b/rama-book/book.toml @@ -3,4 +3,4 @@ authors = ["Glen De Cauwsemaecker "] language = "en" multilingual = false src = "src" -title = "Rama Book" +title = "Rama" diff --git a/rama-book/src/preface.md b/rama-book/src/preface.md index 7bf5c29b..a426f9b4 100644 --- a/rama-book/src/preface.md +++ b/rama-book/src/preface.md @@ -1,6 +1,6 @@ [![rama banner](https://raw.githubusercontent.com/plabayo/rama/main/docs/img/banner.svg)](https://raw.githubusercontent.com/plabayo/rama/main/docs/img/banner.svg) -# Rama Book +# Rama [![Crates.io][crates-badge]][crates-url] [![Docs.rs][docs-badge]][docs-url] diff --git a/rama-cli/Cargo.toml b/rama-cli/Cargo.toml index 1c3732f8..9ac8aac7 100644 --- a/rama-cli/Cargo.toml +++ b/rama-cli/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "rama-cli" +description = "binary version of and cli utility for rama, a distortion proxy framework" version = { workspace = true } license = { workspace = true } edition = { workspace = true } diff --git a/rama-rt-macros/Cargo.toml b/rama-rt-macros/Cargo.toml new file mode 100644 index 00000000..e4603d84 --- /dev/null +++ b/rama-rt-macros/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "rama-rt-macros" +description = "macros in function of rama-rt" +version = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +keywords = { workspace = true } +categories = { workspace = true } +authors = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[lib] +proc-macro = true + +[features] + +[dependencies] +proc-macro2 = "1.0.70" +quote = "1" +syn = { version = "2.0", features = ["full"] } + +[dev-dependencies] +rama = { version = "0.2", path = ".." } + +[package.metadata.docs.rs] +all-features = true diff --git a/rama-rt-macros/README.md b/rama-rt-macros/README.md new file mode 100644 index 00000000..87ae11e0 --- /dev/null +++ b/rama-rt-macros/README.md @@ -0,0 +1,61 @@ +[![rama banner](https://raw.githubusercontent.com/plabayo/rama/main/docs/img/banner.svg)](https://ramaproxy.org/) + +[![Crates.io][crates-badge]][crates-url] +[![Docs.rs][docs-badge]][docs-url] +[![MIT License][license-mit-badge]][license-mit-url] +[![Apache 2.0 License][license-apache-badge]][license-apache-url] +[![Build Status][actions-badge]][actions-url] + +[![Discord][discord-badge]][discord-url] +[![Buy Me A Coffee][bmac-badge]][bmac-url] +[![GitHub Sponsors][ghs-badge]][ghs-url] + +[crates-badge]: https://img.shields.io/crates/v/rama-rt-macros.svg +[crates-url]: https://crates.io/crates/rama-rt-macros +[docs-badge]: https://img.shields.io/docsrs/rama-rt-macros/latest +[docs-url]: https://docs.rs/rama-rt-macros/latest/rama_rt_macros/index.html +[license-mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg +[license-mit-url]: https://github.com/plabayo/rama/blob/main/LICENSE-MIT +[license-apache-badge]: https://img.shields.io/badge/license-APACHE-blue.svg +[license-apache-url]: https://github.com/plabayo/rama/blob/main/LICENSE-APACHE +[actions-badge]: https://github.com/plabayo/rama/workflows/CI/badge.svg +[actions-url]: https://github.com/plabayo/rama/actions?query=workflow%3ACI+branch%main + +[discord-badge]: https://img.shields.io/badge/Discord-%235865F2.svg?style=for-the-badge&logo=discord&logoColor=white +[discord-url]: https://discord.gg/29EetaSYCD +[bmac-badge]: https://img.shields.io/badge/Buy%20Me%20a%20Coffee-ffdd00?style=for-the-badge&logo=buy-me-a-coffee&logoColor=black +[bmac-url]: https://www.buymeacoffee.com/plabayo +[ghs-badge]: https://img.shields.io/badge/sponsor-30363D?style=for-the-badge&logo=GitHub-Sponsors&logoColor=#EA4AAA +[ghs-url]: https://github.com/sponsors/plabayo + +# rama-rt-macros + +Macros for the runtime definitions and implementation in function of Rama. + +> rama is early work in progress, use at your own risk. +> +> Not everything that exists is documented and not everything that is documented is implemented. + +Rama's full documentatuon can be found at . + +## Contributing + +:balloon: Thanks for your help improving the project! We are so happy to have +you! We have a [contributing guide][contributing] to help you get involved in the +`rama` project. + +Should you want to contribure this project but you do not yet know how to program in Rust, you could start learning Rust with as goal to contribute as soon as possible to `rama` by using "[the Rust 101 Learning Guide](https://rust-lang.guide/)" as your study companion. Glen can also be hired as a mentor or teacher to give you paid 1-on-1 lessons and other similar consultancy services. You can find his contact details at . + +## License + +This project is dual-licensed under both the [MIT license][mit-license] and [Apache 2.0 License][apache-license]. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in `rama` by you, shall be licensed as both [MIT][mit-license] and [Apache 2.0][apache-license], +without any additional terms or conditions. + +[contributing]: https://github.com/plabayo/rama/blob/main/CONTRIBUTING.md +[mit-license]: https://github.com/plabayo/rama/blob/main/LICENSE-MIT +[apache-license]: https://github.com/plabayo/rama/blob/main/LICENSE-APACHE diff --git a/rama-rt-macros/src/entry.rs b/rama-rt-macros/src/entry.rs new file mode 100644 index 00000000..d1c760c6 --- /dev/null +++ b/rama-rt-macros/src/entry.rs @@ -0,0 +1,591 @@ +use proc_macro2::{Span, TokenStream, TokenTree}; +use quote::{quote, quote_spanned, ToTokens}; +use syn::parse::{Parse, ParseStream, Parser}; +use syn::{braced, Attribute, Ident, Path, Signature, Visibility}; + +// syn::AttributeArgs does not implement syn::Parse +type AttributeArgs = syn::punctuated::Punctuated; + +#[derive(Clone, Copy, PartialEq)] +enum RuntimeFlavor { + CurrentThread, + Threaded, +} + +impl RuntimeFlavor { + fn from_str(s: &str) -> Result { + match s { + "current_thread" => Ok(RuntimeFlavor::CurrentThread), + "multi_thread" => Ok(RuntimeFlavor::Threaded), + "single_thread" => Err("The single threaded runtime flavor is called `current_thread`.".to_string()), + "basic_scheduler" => Err("The `basic_scheduler` runtime flavor has been renamed to `current_thread`.".to_string()), + "threaded_scheduler" => Err("The `threaded_scheduler` runtime flavor has been renamed to `multi_thread`.".to_string()), + _ => Err(format!("No such runtime flavor `{}`. The runtime flavors are `current_thread` and `multi_thread`.", s)), + } + } +} + +struct FinalConfig { + flavor: RuntimeFlavor, + worker_threads: Option, + start_paused: Option, + crate_name: Option, +} + +/// Config used in case of the attribute not being able to build a valid config +const DEFAULT_ERROR_CONFIG: FinalConfig = FinalConfig { + flavor: RuntimeFlavor::CurrentThread, + worker_threads: None, + start_paused: None, + crate_name: None, +}; + +struct Configuration { + rt_multi_thread_available: bool, + default_flavor: RuntimeFlavor, + flavor: Option, + worker_threads: Option<(usize, Span)>, + start_paused: Option<(bool, Span)>, + is_test: bool, + crate_name: Option, +} + +impl Configuration { + fn new(is_test: bool, rt_multi_thread: bool) -> Self { + Configuration { + rt_multi_thread_available: rt_multi_thread, + default_flavor: match is_test { + true => RuntimeFlavor::CurrentThread, + false => RuntimeFlavor::Threaded, + }, + flavor: None, + worker_threads: None, + start_paused: None, + is_test, + crate_name: None, + } + } + + fn set_flavor(&mut self, runtime: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.flavor.is_some() { + return Err(syn::Error::new(span, "`flavor` set multiple times.")); + } + + let runtime_str = parse_string(runtime, span, "flavor")?; + let runtime = + RuntimeFlavor::from_str(&runtime_str).map_err(|err| syn::Error::new(span, err))?; + self.flavor = Some(runtime); + Ok(()) + } + + fn set_worker_threads( + &mut self, + worker_threads: syn::Lit, + span: Span, + ) -> Result<(), syn::Error> { + if self.worker_threads.is_some() { + return Err(syn::Error::new( + span, + "`worker_threads` set multiple times.", + )); + } + + let worker_threads = parse_int(worker_threads, span, "worker_threads")?; + if worker_threads == 0 { + return Err(syn::Error::new(span, "`worker_threads` may not be 0.")); + } + self.worker_threads = Some((worker_threads, span)); + Ok(()) + } + + fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.start_paused.is_some() { + return Err(syn::Error::new(span, "`start_paused` set multiple times.")); + } + + let start_paused = parse_bool(start_paused, span, "start_paused")?; + self.start_paused = Some((start_paused, span)); + Ok(()) + } + + fn set_crate_name(&mut self, name: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.crate_name.is_some() { + return Err(syn::Error::new(span, "`crate` set multiple times.")); + } + let name_path = parse_path(name, span, "crate")?; + self.crate_name = Some(name_path); + Ok(()) + } + + fn macro_name(&self) -> &'static str { + if self.is_test { + "rama::rt::test" + } else { + "rama::rt::main" + } + } + + fn build(&self) -> Result { + use RuntimeFlavor as F; + + let flavor = self.flavor.unwrap_or(self.default_flavor); + let worker_threads = match (flavor, self.worker_threads) { + (F::CurrentThread, Some((_, worker_threads_span))) => { + let msg = format!( + "The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[{}(flavor = \"multi_thread\")]`", + self.macro_name(), + ); + return Err(syn::Error::new(worker_threads_span, msg)); + } + (F::CurrentThread, None) => None, + (F::Threaded, worker_threads) if self.rt_multi_thread_available => { + worker_threads.map(|(val, _span)| val) + } + (F::Threaded, _) => { + let msg = if self.flavor.is_none() { + "The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled." + } else { + "The runtime flavor `multi_thread` requires the `rt-multi-thread` feature." + }; + return Err(syn::Error::new(Span::call_site(), msg)); + } + }; + + let start_paused = match (flavor, self.start_paused) { + (F::Threaded, Some((_, start_paused_span))) => { + let msg = format!( + "The `start_paused` option requires the `current_thread` runtime flavor. Use `#[{}(flavor = \"current_thread\")]`", + self.macro_name(), + ); + return Err(syn::Error::new(start_paused_span, msg)); + } + (F::CurrentThread, Some((start_paused, _))) => Some(start_paused), + (_, None) => None, + }; + + Ok(FinalConfig { + crate_name: self.crate_name.clone(), + flavor, + worker_threads, + start_paused, + }) + } +} + +fn parse_int(int: syn::Lit, span: Span, field: &str) -> Result { + match int { + syn::Lit::Int(lit) => match lit.base10_parse::() { + Ok(value) => Ok(value), + Err(e) => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as integer: {}", field, e), + )), + }, + _ => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as integer.", field), + )), + } +} + +fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result { + match int { + syn::Lit::Str(s) => Ok(s.value()), + syn::Lit::Verbatim(s) => Ok(s.to_string()), + _ => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as string.", field), + )), + } +} + +fn parse_path(lit: syn::Lit, span: Span, field: &str) -> Result { + match lit { + syn::Lit::Str(s) => { + let err = syn::Error::new( + span, + format!( + "Failed to parse value of `{}` as path: \"{}\"", + field, + s.value() + ), + ); + s.parse::().map_err(|_| err.clone()) + } + _ => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as path.", field), + )), + } +} + +fn parse_bool(bool: syn::Lit, span: Span, field: &str) -> Result { + match bool { + syn::Lit::Bool(b) => Ok(b.value), + _ => Err(syn::Error::new( + span, + format!("Failed to parse value of `{}` as bool.", field), + )), + } +} + +fn build_config( + input: &ItemFn, + args: AttributeArgs, + is_test: bool, + rt_multi_thread: bool, +) -> Result { + if input.sig.asyncness.is_none() { + let msg = "the `async` keyword is missing from the function declaration"; + return Err(syn::Error::new_spanned(input.sig.fn_token, msg)); + } + + let mut config = Configuration::new(is_test, rt_multi_thread); + let macro_name = config.macro_name(); + + for arg in args { + match arg { + syn::Meta::NameValue(namevalue) => { + let ident = namevalue + .path + .get_ident() + .ok_or_else(|| { + syn::Error::new_spanned(&namevalue, "Must have specified ident") + })? + .to_string() + .to_lowercase(); + let lit = match &namevalue.value { + syn::Expr::Lit(syn::ExprLit { lit, .. }) => lit, + expr => return Err(syn::Error::new_spanned(expr, "Must be a literal")), + }; + match ident.as_str() { + "worker_threads" => { + config.set_worker_threads(lit.clone(), syn::spanned::Spanned::span(lit))?; + } + "flavor" => { + config.set_flavor(lit.clone(), syn::spanned::Spanned::span(lit))?; + } + "start_paused" => { + config.set_start_paused(lit.clone(), syn::spanned::Spanned::span(lit))?; + } + "core_threads" => { + let msg = "Attribute `core_threads` is renamed to `worker_threads`"; + return Err(syn::Error::new_spanned(namevalue, msg)); + } + "crate" => { + config.set_crate_name(lit.clone(), syn::spanned::Spanned::span(lit))?; + } + name => { + let msg = format!( + "Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`, `crate`", + name, + ); + return Err(syn::Error::new_spanned(namevalue, msg)); + } + } + } + syn::Meta::Path(path) => { + let name = path + .get_ident() + .ok_or_else(|| syn::Error::new_spanned(&path, "Must have specified ident"))? + .to_string() + .to_lowercase(); + let msg = match name.as_str() { + "threaded_scheduler" | "multi_thread" => { + format!( + "Set the runtime flavor with #[{}(flavor = \"multi_thread\")].", + macro_name + ) + } + "basic_scheduler" | "current_thread" | "single_threaded" => { + format!( + "Set the runtime flavor with #[{}(flavor = \"current_thread\")].", + macro_name + ) + } + "flavor" | "worker_threads" | "start_paused" => { + format!("The `{}` attribute requires an argument.", name) + } + name => { + format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`, `crate`", name) + } + }; + return Err(syn::Error::new_spanned(path, msg)); + } + other => { + return Err(syn::Error::new_spanned( + other, + "Unknown attribute inside the macro", + )); + } + } + } + + config.build() +} + +fn parse_knobs(mut input: ItemFn, is_test: bool, config: FinalConfig) -> TokenStream { + input.sig.asyncness = None; + + // If type mismatch occurs, the current rustc points to the last statement. + let (last_stmt_start_span, last_stmt_end_span) = { + let mut last_stmt = input.stmts.last().cloned().unwrap_or_default().into_iter(); + + // `Span` on stable Rust has a limitation that only points to the first + // token, not the whole tokens. We can work around this limitation by + // using the first/last span of the tokens like + // `syn::Error::new_spanned` does. + let start = last_stmt.next().map_or_else(Span::call_site, |t| t.span()); + let end = last_stmt.last().map_or(start, |t| t.span()); + (start, end) + }; + + let crate_path = config + .crate_name + .map(ToTokens::into_token_stream) + .unwrap_or_else(|| Ident::new("rama", last_stmt_start_span).into_token_stream()); + + let mut rt = match config.flavor { + RuntimeFlavor::CurrentThread => quote_spanned! {last_stmt_start_span=> + #crate_path::rt::Builder::new_current_thread() + }, + RuntimeFlavor::Threaded => quote_spanned! {last_stmt_start_span=> + #crate_path::rt::Builder::new_multi_thread() + }, + }; + if let Some(v) = config.worker_threads { + rt = quote_spanned! {last_stmt_start_span=> #rt.worker_threads(#v) }; + } + if let Some(v) = config.start_paused { + rt = quote_spanned! {last_stmt_start_span=> #rt.start_paused(#v) }; + } + + let header = if is_test { + quote! { + #[::core::prelude::v1::test] + } + } else { + quote! {} + }; + + let body_ident = quote! { body }; + let last_block = quote_spanned! {last_stmt_end_span=> + #[allow(clippy::expect_used, clippy::diverging_sub_expression)] + { + return #rt + .enable_all() + .build() + .expect("Failed building the Runtime") + .block_on(#body_ident); + } + }; + + let body = input.body(); + + // For test functions pin the body to the stack and use `Pin<&mut dyn + // Future>` to reduce the amount of `rt::block_on` (and related + // functions) copies we generate during compilation due to the generic + // parameter `F` (the future to block on). This could have an impact on + // performance, but because it's only for testing it's unlikely to be very + // large. + // + // We don't do this for the main function as it should only be used once so + // there will be no benefit. + let body = if is_test { + let output_type = match &input.sig.output { + // For functions with no return value syn doesn't print anything, + // but that doesn't work as `Output` for our boxed `Future`, so + // default to `()` (the same type as the function output). + syn::ReturnType::Default => quote! { () }, + syn::ReturnType::Type(_, ret_type) => quote! { #ret_type }, + }; + quote! { + let body = async #body; + #crate_path::pin!(body); + let body: ::core::pin::Pin<&mut dyn ::core::future::Future> = body; + } + } else { + quote! { + let body = async #body; + } + }; + + input.into_tokens(header, body, last_block) +} + +fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenStream { + tokens.extend(error.into_compile_error()); + tokens +} + +#[cfg(not(test))] // Work around for rust-lang/rust#62127 +pub(crate) fn main(args: TokenStream, item: TokenStream, rt_multi_thread: bool) -> TokenStream { + // If any of the steps for this macro fail, we still want to expand to an item that is as close + // to the expected output as possible. This helps out IDEs such that completions and other + // related features keep working. + let input: ItemFn = match syn::parse2(item.clone()) { + Ok(it) => it, + Err(e) => return token_stream_with_error(item, e), + }; + + let config = if input.sig.ident == "main" && !input.sig.inputs.is_empty() { + let msg = "the main function cannot accept arguments"; + Err(syn::Error::new_spanned(&input.sig.ident, msg)) + } else { + AttributeArgs::parse_terminated + .parse2(args) + .and_then(|args| build_config(&input, args, false, rt_multi_thread)) + }; + + match config { + Ok(config) => parse_knobs(input, false, config), + Err(e) => token_stream_with_error(parse_knobs(input, false, DEFAULT_ERROR_CONFIG), e), + } +} + +pub(crate) fn test(args: TokenStream, item: TokenStream, rt_multi_thread: bool) -> TokenStream { + // If any of the steps for this macro fail, we still want to expand to an item that is as close + // to the expected output as possible. This helps out IDEs such that completions and other + // related features keep working. + let input: ItemFn = match syn::parse2(item.clone()) { + Ok(it) => it, + Err(e) => return token_stream_with_error(item, e), + }; + let config = if let Some(attr) = input.attrs().find(|attr| attr.meta.path().is_ident("test")) { + let msg = "second test attribute is supplied"; + Err(syn::Error::new_spanned(attr, msg)) + } else { + AttributeArgs::parse_terminated + .parse2(args) + .and_then(|args| build_config(&input, args, true, rt_multi_thread)) + }; + + match config { + Ok(config) => parse_knobs(input, true, config), + Err(e) => token_stream_with_error(parse_knobs(input, true, DEFAULT_ERROR_CONFIG), e), + } +} + +struct ItemFn { + outer_attrs: Vec, + vis: Visibility, + sig: Signature, + brace_token: syn::token::Brace, + inner_attrs: Vec, + stmts: Vec, +} + +impl ItemFn { + /// Access all attributes of the function item. + fn attrs(&self) -> impl Iterator { + self.outer_attrs.iter().chain(self.inner_attrs.iter()) + } + + /// Get the body of the function item in a manner so that it can be + /// conveniently used with the `quote!` macro. + fn body(&self) -> Body<'_> { + Body { + brace_token: self.brace_token, + stmts: &self.stmts, + } + } + + /// Convert our local function item into a token stream. + fn into_tokens( + self, + header: proc_macro2::TokenStream, + body: proc_macro2::TokenStream, + last_block: proc_macro2::TokenStream, + ) -> TokenStream { + let mut tokens = proc_macro2::TokenStream::new(); + header.to_tokens(&mut tokens); + + // Outer attributes are simply streamed as-is. + for attr in self.outer_attrs { + attr.to_tokens(&mut tokens); + } + + // Inner attributes require extra care, since they're not supported on + // blocks (which is what we're expanded into) we instead lift them + // outside of the function. This matches the behaviour of `syn`. + for mut attr in self.inner_attrs { + attr.style = syn::AttrStyle::Outer; + attr.to_tokens(&mut tokens); + } + + self.vis.to_tokens(&mut tokens); + self.sig.to_tokens(&mut tokens); + + self.brace_token.surround(&mut tokens, |tokens| { + body.to_tokens(tokens); + last_block.to_tokens(tokens); + }); + + tokens + } +} + +impl Parse for ItemFn { + #[inline] + fn parse(input: ParseStream<'_>) -> syn::Result { + // This parse implementation has been largely lifted from `syn`, with + // the exception of: + // * We don't have access to the plumbing necessary to parse inner + // attributes in-place. + // * We do our own statements parsing to avoid recursively parsing + // entire statements and only look for the parts we're interested in. + + let outer_attrs = input.call(Attribute::parse_outer)?; + let vis: Visibility = input.parse()?; + let sig: Signature = input.parse()?; + + let content; + let brace_token = braced!(content in input); + let inner_attrs = Attribute::parse_inner(&content)?; + + let mut buf = proc_macro2::TokenStream::new(); + let mut stmts = Vec::new(); + + while !content.is_empty() { + if let Some(semi) = content.parse::>()? { + semi.to_tokens(&mut buf); + stmts.push(buf); + buf = proc_macro2::TokenStream::new(); + continue; + } + + // Parse a single token tree and extend our current buffer with it. + // This avoids parsing the entire content of the sub-tree. + buf.extend([content.parse::()?]); + } + + if !buf.is_empty() { + stmts.push(buf); + } + + Ok(Self { + outer_attrs, + vis, + sig, + brace_token, + inner_attrs, + stmts, + }) + } +} + +struct Body<'a> { + brace_token: syn::token::Brace, + // Statements, with terminating `;`. + stmts: &'a [TokenStream], +} + +impl ToTokens for Body<'_> { + fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) { + self.brace_token.surround(tokens, |tokens| { + for stmt in self.stmts { + stmt.to_tokens(tokens); + } + }); + } +} diff --git a/rama-rt-macros/src/lib.rs b/rama-rt-macros/src/lib.rs new file mode 100644 index 00000000..669043eb --- /dev/null +++ b/rama-rt-macros/src/lib.rs @@ -0,0 +1,484 @@ +#![allow(clippy::needless_doctest_main)] +#![warn( + missing_debug_implementations, + missing_docs, + rust_2018_idioms, + unreachable_pub +)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Macros for use with Rama + +// This `extern` is required for older `rustc` versions but newer `rustc` +// versions warn about the unused `extern crate`. +#[allow(unused_extern_crates)] +extern crate proc_macro; + +mod entry; +mod select; + +use proc_macro::TokenStream; + +/// Marks async function to be executed by the selected runtime. This macro +/// helps set up a `Runtime` without requiring the user to use +/// Runtime or Builder directly. +/// +/// Note: This macro is designed to be simplistic and targets applications that +/// do not require a complex setup. If the provided functionality is not +/// sufficient, you may be interested in using Builder, which provides a more +/// powerful interface. +/// +/// Note: This macro can be used on any function and not just the `main` +/// function. Using it on a non-main function makes the function behave as if it +/// was synchronous by starting a new runtime each time it is called. If the +/// function is called often, it is preferable to create the runtime using the +/// runtime builder so the runtime can be reused across calls. +/// +/// # Non-worker async function +/// +/// Note that the async function marked with this macro does not run as a +/// worker. The expectation is that other tasks are spawned by the function here. +/// Awaiting on other futures from the function provided here will not +/// perform as fast as those spawned as workers. +/// +/// # Multi-threaded runtime +/// +/// To use the multi-threaded runtime, the macro can be configured using +/// +/// ``` +/// #[rama::rt::main(flavor = "multi_thread", worker_threads = 10)] +/// # async fn main() {} +/// ``` +/// +/// The `worker_threads` option configures the number of worker threads, and +/// defaults to the number of cpus on the system. This is the default flavor. +/// +/// Note: The multi-threaded runtime requires the `rt-multi-thread` feature +/// flag. +/// +/// # Current thread runtime +/// +/// To use the single-threaded runtime known as the `current_thread` runtime, +/// the macro can be configured using +/// +/// ``` +/// #[rama::rt::main(flavor = "current_thread")] +/// # async fn main() {} +/// ``` +/// +/// ## Function arguments: +/// +/// Arguments are allowed for any functions aside from `main` which is special +/// +/// ## Usage +/// +/// ### Using the multi-thread runtime +/// +/// ```rust +/// #[rama::rt::main] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// fn main() { +/// rama::rt::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// ### Using current thread runtime +/// +/// The basic scheduler is single-threaded. +/// +/// ```rust +/// #[rama::rt::main(flavor = "current_thread")] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// fn main() { +/// rama::rt::Builder::new_current_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// ### Set number of worker threads +/// +/// ```rust +/// #[rama::rt::main(worker_threads = 2)] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// fn main() { +/// rama::rt::Builder::new_multi_thread() +/// .worker_threads(2) +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// ### Configure the runtime to start with time paused +/// +/// ```rust +/// #[rama::rt::main(flavor = "current_thread", start_paused = true)] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// fn main() { +/// rama::rt::Builder::new_current_thread() +/// .enable_all() +/// .start_paused(true) +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// Note that `start_paused` requires the `test-util` feature to be enabled. +/// +/// ### Rename package +/// +/// ```rust +/// use rama as rama1; +/// +/// #[rama1::main(crate = "rama1")] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// use rama as rama1; +/// +/// fn main() { +/// rama1::rt::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +#[proc_macro_attribute] +#[cfg(not(test))] // Work around for rust-lang/rust#62127 +pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { + entry::main(args.into(), item.into(), true).into() +} + +/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` +/// without requiring the user to use Runtime or Builder directly. +/// +/// ## Function arguments: +/// +/// Arguments are allowed for any functions aside from `main` which is special +/// +/// ## Usage +/// +/// ### Using default +/// +/// ```rust +/// #[rama::rt::main(flavor = "current_thread")] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// fn main() { +/// rama::rt::Builder::new_current_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// +/// ### Rename package +/// +/// ```rust +/// use rama as rama1; +/// +/// #[rama1::main(crate = "rama")] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::main]` +/// +/// ```rust +/// use rama as rama1; +/// +/// fn main() { +/// rama1::rt::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +#[proc_macro_attribute] +#[cfg(not(test))] // Work around for rust-lang/rust#62127 +pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { + entry::main(args.into(), item.into(), false).into() +} + +/// Marks async function to be executed by runtime, suitable to test environment. +/// This macro helps set up a `Runtime` without requiring the user to use +/// Runtime or Builder directly. +/// +/// Note: This macro is designed to be simplistic and targets applications that +/// do not require a complex setup. If the provided functionality is not +/// sufficient, you may be interested in using Builder, which provides a more +/// powerful interface. +/// +/// # Multi-threaded runtime +/// +/// To use the multi-threaded runtime, the macro can be configured using +/// +/// ```no_run +/// #[rama::rt::test(flavor = "multi_thread", worker_threads = 1)] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// The `worker_threads` option configures the number of worker threads, and +/// defaults to the number of cpus on the system. +/// +/// Note: The multi-threaded runtime requires the `rt-multi-thread` feature +/// flag. +/// +/// # Current thread runtime +/// +/// The default test runtime is single-threaded. Each test gets a +/// separate current-thread runtime. +/// +/// ```no_run +/// #[rama::rt::test] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// ## Usage +/// +/// ### Using the multi-thread runtime +/// +/// ```no_run +/// #[rama::rt::test(flavor = "multi_thread")] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::test]` +/// +/// ```no_run +/// #[test] +/// fn my_test() { +/// rama::rt::Builder::new_multi_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// assert!(true); +/// }) +/// } +/// ``` +/// +/// ### Using current thread runtime +/// +/// ```no_run +/// #[rama::rt::test] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::test]` +/// +/// ```no_run +/// #[test] +/// fn my_test() { +/// rama::rt::Builder::new_current_thread() +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// assert!(true); +/// }) +/// } +/// ``` +/// +/// ### Set number of worker threads +/// +/// ```no_run +/// #[rama::rt::test(flavor ="multi_thread", worker_threads = 2)] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::test]` +/// +/// ```no_run +/// #[test] +/// fn my_test() { +/// rama::rt::Builder::new_multi_thread() +/// .worker_threads(2) +/// .enable_all() +/// .build() +/// .unwrap() +/// .block_on(async { +/// assert!(true); +/// }) +/// } +/// ``` +/// +/// ### Configure the runtime to start with time paused +/// +/// ```no_run +/// #[rama::rt::test(start_paused = true)] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// +/// Equivalent code not using `#[rama::rt::test]` +/// +/// ```no_run +/// #[test] +/// fn my_test() { +/// rama::rt::Builder::new_current_thread() +/// .enable_all() +/// .start_paused(true) +/// .build() +/// .unwrap() +/// .block_on(async { +/// assert!(true); +/// }) +/// } +/// ``` +/// +/// Note that `start_paused` requires the `test-util` feature to be enabled. +/// +/// ### Rename package +/// +/// ```rust +/// use rama as rama1; +/// +/// #[rama1::test(crate = "rama1")] +/// async fn my_test() { +/// println!("Hello world"); +/// } +/// ``` +#[proc_macro_attribute] +pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { + entry::test(args.into(), item.into(), true).into() +} + +/// Marks async function to be executed by runtime, suitable to test environment +/// +/// ## Usage +/// +/// ```no_run +/// #[rama::rt::test] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +#[proc_macro_attribute] +pub fn test_rt(args: TokenStream, item: TokenStream) -> TokenStream { + entry::test(args.into(), item.into(), false).into() +} + +/// Always fails with the error message below. +/// ```text +/// The #[rama::rt::main] macro requires rt or rt-multi-thread. +/// ``` +#[proc_macro_attribute] +pub fn main_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { + syn::Error::new( + proc_macro2::Span::call_site(), + "The #[rama::rt::main] macro requires rt or rt-multi-thread.", + ) + .to_compile_error() + .into() +} + +/// Always fails with the error message below. +/// ```text +/// The #[rama::rt::test] macro requires rt or rt-multi-thread. +/// ``` +#[proc_macro_attribute] +pub fn test_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { + syn::Error::new( + proc_macro2::Span::call_site(), + "The #[rama::rt::test] macro requires rt or rt-multi-thread.", + ) + .to_compile_error() + .into() +} + +/// Implementation detail of the `select!` macro. This macro is **not** intended +/// to be used as part of the public API and is permitted to change. +#[proc_macro] +#[doc(hidden)] +pub fn select_priv_declare_output_enum(input: TokenStream) -> TokenStream { + select::declare_output_enum(input) +} + +/// Implementation detail of the `select!` macro. This macro is **not** intended +/// to be used as part of the public API and is permitted to change. +#[proc_macro] +#[doc(hidden)] +pub fn select_priv_clean_pattern(input: TokenStream) -> TokenStream { + select::clean_pattern_macro(input) +} diff --git a/rama-rt-macros/src/select.rs b/rama-rt-macros/src/select.rs new file mode 100644 index 00000000..324b8f94 --- /dev/null +++ b/rama-rt-macros/src/select.rs @@ -0,0 +1,109 @@ +use proc_macro::{TokenStream, TokenTree}; +use proc_macro2::Span; +use quote::quote; +use syn::{parse::Parser, Ident}; + +pub(crate) fn declare_output_enum(input: TokenStream) -> TokenStream { + // passed in is: `(_ _ _)` with one `_` per branch + let branches = match input.into_iter().next() { + Some(TokenTree::Group(group)) => group.stream().into_iter().count(), + _ => panic!("unexpected macro input"), + }; + + let variants = (0..branches) + .map(|num| Ident::new(&format!("_{}", num), Span::call_site())) + .collect::>(); + + // Use a bitfield to track which futures completed + let mask = Ident::new( + if branches <= 8 { + "u8" + } else if branches <= 16 { + "u16" + } else if branches <= 32 { + "u32" + } else if branches <= 64 { + "u64" + } else { + panic!("up to 64 branches supported"); + }, + Span::call_site(), + ); + + TokenStream::from(quote! { + pub(super) enum Out<#( #variants ),*> { + #( #variants(#variants), )* + // Include a `Disabled` variant signifying that all select branches + // failed to resolve. + Disabled, + } + + pub(super) type Mask = #mask; + }) +} + +pub(crate) fn clean_pattern_macro(input: TokenStream) -> TokenStream { + // If this isn't a pattern, we return the token stream as-is. The select! + // macro is using it in a location requiring a pattern, so an error will be + // emitted there. + let mut input: syn::Pat = match syn::Pat::parse_single.parse(input.clone()) { + Ok(it) => it, + Err(_) => return input, + }; + + clean_pattern(&mut input); + quote::ToTokens::into_token_stream(input).into() +} + +// Removes any occurrences of ref or mut in the provided pattern. +fn clean_pattern(pat: &mut syn::Pat) { + match pat { + syn::Pat::Lit(_literal) => {} + syn::Pat::Macro(_macro) => {} + syn::Pat::Path(_path) => {} + syn::Pat::Range(_range) => {} + syn::Pat::Rest(_rest) => {} + syn::Pat::Verbatim(_tokens) => {} + syn::Pat::Wild(_underscore) => {} + syn::Pat::Ident(ident) => { + ident.by_ref = None; + ident.mutability = None; + if let Some((_at, pat)) = &mut ident.subpat { + clean_pattern(&mut *pat); + } + } + syn::Pat::Or(or) => { + for case in &mut or.cases { + clean_pattern(case); + } + } + syn::Pat::Slice(slice) => { + for elem in &mut slice.elems { + clean_pattern(elem); + } + } + syn::Pat::Struct(struct_pat) => { + for field in &mut struct_pat.fields { + clean_pattern(&mut field.pat); + } + } + syn::Pat::Tuple(tuple) => { + for elem in &mut tuple.elems { + clean_pattern(elem); + } + } + syn::Pat::TupleStruct(tuple) => { + for elem in &mut tuple.elems { + clean_pattern(elem); + } + } + syn::Pat::Reference(reference) => { + reference.mutability = None; + clean_pattern(&mut reference.pat); + } + syn::Pat::Type(type_pat) => { + clean_pattern(&mut type_pat.pat); + } + _ => {} + } +} diff --git a/rama-rt/Cargo.toml b/rama-rt/Cargo.toml index d4a3015a..4e9daed3 100644 --- a/rama-rt/Cargo.toml +++ b/rama-rt/Cargo.toml @@ -1,5 +1,6 @@ [package] name = "rama-rt" +description = "async-first runtime for rama, a distortion proxy framework" version = { workspace = true } license = { workspace = true } edition = { workspace = true } @@ -10,3 +11,7 @@ authors = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +tokio = { version = "1.33", features = ["net", "io-util", "rt-multi-thread"] } +tokio-graceful = "0.1" +tokio-rustls = "0.25.0-alpha.1" +tokio-test = "0.4.3" diff --git a/rama-rt/README.md b/rama-rt/README.md new file mode 100644 index 00000000..1e618726 --- /dev/null +++ b/rama-rt/README.md @@ -0,0 +1,62 @@ +[![rama banner](https://raw.githubusercontent.com/plabayo/rama/main/docs/img/banner.svg)](https://ramaproxy.org/) + +[![Crates.io][crates-badge]][crates-url] +[![Docs.rs][docs-badge]][docs-url] +[![MIT License][license-mit-badge]][license-mit-url] +[![Apache 2.0 License][license-apache-badge]][license-apache-url] +[![Build Status][actions-badge]][actions-url] + +[![Discord][discord-badge]][discord-url] +[![Buy Me A Coffee][bmac-badge]][bmac-url] +[![GitHub Sponsors][ghs-badge]][ghs-url] + +[crates-badge]: https://img.shields.io/crates/v/rama-rt.svg +[crates-url]: https://crates.io/crates/rama-rt +[docs-badge]: https://img.shields.io/docsrs/rama-rt/latest +[docs-url]: https://docs.rs/rama-rt/latest/rama_rt/index.html +[license-mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg +[license-mit-url]: https://github.com/plabayo/rama/blob/main/LICENSE-MIT +[license-apache-badge]: https://img.shields.io/badge/license-APACHE-blue.svg +[license-apache-url]: https://github.com/plabayo/rama/blob/main/LICENSE-APACHE +[actions-badge]: https://github.com/plabayo/rama/workflows/CI/badge.svg +[actions-url]: https://github.com/plabayo/rama/actions?query=workflow%3ACI+branch%main + +[discord-badge]: https://img.shields.io/badge/Discord-%235865F2.svg?style=for-the-badge&logo=discord&logoColor=white +[discord-url]: https://discord.gg/29EetaSYCD +[bmac-badge]: https://img.shields.io/badge/Buy%20Me%20a%20Coffee-ffdd00?style=for-the-badge&logo=buy-me-a-coffee&logoColor=black +[bmac-url]: https://www.buymeacoffee.com/plabayo +[ghs-badge]: https://img.shields.io/badge/sponsor-30363D?style=for-the-badge&logo=GitHub-Sponsors&logoColor=#EA4AAA +[ghs-url]: https://github.com/sponsors/plabayo + +# rama-rt + +Runtime definitions and implementation for Rama. + +> rama is early work in progress, use at your own risk. +> +> Not everything that exists is documented and not everything that is documented is implemented. + +Rama's full documentatuon can be found at . + + +## Contributing + +:balloon: Thanks for your help improving the project! We are so happy to have +you! We have a [contributing guide][contributing] to help you get involved in the +`rama` project. + +Should you want to contribure this project but you do not yet know how to program in Rust, you could start learning Rust with as goal to contribute as soon as possible to `rama` by using "[the Rust 101 Learning Guide](https://rust-lang.guide/)" as your study companion. Glen can also be hired as a mentor or teacher to give you paid 1-on-1 lessons and other similar consultancy services. You can find his contact details at . + +## License + +This project is dual-licensed under both the [MIT license][mit-license] and [Apache 2.0 License][apache-license]. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in `rama` by you, shall be licensed as both [MIT][mit-license] and [Apache 2.0][apache-license], +without any additional terms or conditions. + +[contributing]: https://github.com/plabayo/rama/blob/main/CONTRIBUTING.md +[mit-license]: https://github.com/plabayo/rama/blob/main/LICENSE-MIT +[apache-license]: https://github.com/plabayo/rama/blob/main/LICENSE-APACHE diff --git a/rama-rt/src/lib.rs b/rama-rt/src/lib.rs index 7d12d9af..1ebe10b6 100644 --- a/rama-rt/src/lib.rs +++ b/rama-rt/src/lib.rs @@ -1,14 +1,34 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +//! Rama Runtime +//! +//! This crate provides a runtime for Rama applications. +//! +//! For now only Tokio is implemented and supported. +//! There is no plan to support other runtimes. +//! If you want to use other runtimes, you can +//! provide feedback, input and motivation at +//! Result, std::io::Error> { - let stream = TokioTcpStream::connect(address).await?; - Ok(TcpStream::new(stream)) -} diff --git a/src/graceful.rs b/src/graceful.rs deleted file mode 100644 index f75b7481..00000000 --- a/src/graceful.rs +++ /dev/null @@ -1 +0,0 @@ -pub use tokio_graceful::{Shutdown, ShutdownGuard, WeakShutdownGuard}; diff --git a/src/net/http/header_value.rs b/src/http/header_value.rs similarity index 96% rename from src/net/http/header_value.rs rename to src/http/header_value.rs index d05ce4dc..657be343 100644 --- a/src/net/http/header_value.rs +++ b/src/http/header_value.rs @@ -1,4 +1,4 @@ -use crate::net::http::{header::AsHeaderName, HeaderMap, Request, Response}; +use crate::http::{header::AsHeaderName, HeaderMap, Request, Response}; pub trait HeaderValueGetter { fn header_str(&self, key: K) -> Result<&str, HeaderValueErr> diff --git a/src/net/http/headers/mod.rs b/src/http/headers/mod.rs similarity index 97% rename from src/net/http/headers/mod.rs rename to src/http/headers/mod.rs index 0389104a..d7a1bc1f 100644 --- a/src/net/http/headers/mod.rs +++ b/src/http/headers/mod.rs @@ -19,7 +19,7 @@ //! but it represents that via the numerals 1 and 0. //! //! ```rust -//! use rama::net::http::{headers::Header, HeaderName, HeaderValue}; +//! use rama::http::{headers::Header, HeaderName, HeaderValue}; //! struct Dnt(bool); //! //! impl Header for Dnt { diff --git a/src/server/http/layer/header_config.rs b/src/http/middleware/header_config.rs similarity index 95% rename from src/server/http/layer/header_config.rs rename to src/http/middleware/header_config.rs index e57e65fe..ffb2d548 100644 --- a/src/server/http/layer/header_config.rs +++ b/src/http/middleware/header_config.rs @@ -3,8 +3,9 @@ use std::marker::PhantomData; use serde::de::DeserializeOwned; use crate::{ - net::http::{HeaderValueGetter, Request}, - service::{BoxError, Layer, Service}, + http::{HeaderValueGetter, Request}, + service::{Layer, Service}, + BoxError, }; #[derive(Debug)] @@ -87,11 +88,11 @@ where mod test { use serde::Deserialize; - use crate::net::http::Method; + use crate::http::Method; use super::*; - #[tokio::test] + #[crate::rt::test] async fn test_header_config_happy_path() { let request = Request::builder() .method(Method::GET) @@ -116,7 +117,7 @@ mod test { service.call(request).await.unwrap(); } - #[tokio::test] + #[crate::rt::test] async fn test_header_config_missing_header() { let request = Request::builder() .method(Method::GET) @@ -135,7 +136,7 @@ mod test { assert!(result.is_err()); } - #[tokio::test] + #[crate::rt::test] async fn test_header_config_invalid_config() { let request = Request::builder() .method(Method::GET) diff --git a/src/server/http/layer/mod.rs b/src/http/middleware/mod.rs similarity index 100% rename from src/server/http/layer/mod.rs rename to src/http/middleware/mod.rs diff --git a/src/net/http/mod.rs b/src/http/mod.rs similarity index 84% rename from src/net/http/mod.rs rename to src/http/mod.rs index 200e911f..3435d94b 100644 --- a/src/net/http/mod.rs +++ b/src/http/mod.rs @@ -7,3 +7,6 @@ pub use http::{ }; pub mod headers; + +pub mod middleware; +pub mod server; diff --git a/src/server/tcp/http/conn.rs b/src/http/server/conn.rs similarity index 93% rename from src/server/tcp/http/conn.rs rename to src/http/server/conn.rs index bc7e2ffa..9faa203f 100644 --- a/src/server/tcp/http/conn.rs +++ b/src/http/server/conn.rs @@ -2,24 +2,25 @@ use std::error::Error as StdError; use hyper::server::conn::http1::Builder as Http1Builder; use hyper::server::conn::http2::Builder as Http2Builder; -use hyper_util::rt::TokioIo; use hyper_util::server::conn::auto::Builder as AutoBuilder; -use crate::net::TcpStream; +use crate::tcp::TcpStream; -type H2Executor = hyper_util::rt::TokioExecutor; +use super::{GlobalExecutor, HyperIo}; use crate::service::{ http::ServiceBuilderExt, hyper::{HyperBody, TowerHyperServiceExt}, util::Identity, - BoxError, Layer, Service, ServiceBuilder, + Layer, Service, ServiceBuilder, }; +use crate::BoxError; + pub type ServeResult = Result<(), BoxError>; -pub use crate::net::http::Response; -pub type Request = crate::net::http::Request; +pub use crate::http::Response; +pub type Request = crate::http::Request; #[derive(Debug)] pub struct HttpConnector { @@ -56,26 +57,26 @@ where } } -impl HttpConnector, S, Identity> +impl HttpConnector, S, Identity> where S: crate::stream::Stream + Send + 'static, { pub fn h2(stream: TcpStream) -> Self { Self { - builder: Http2Builder::new(H2Executor::new()), + builder: Http2Builder::new(GlobalExecutor::new()), stream, service_builder: ServiceBuilder::new(), } } } -impl HttpConnector, S, Identity> +impl HttpConnector, S, Identity> where S: crate::stream::Stream + Send + 'static, { pub fn auto(stream: TcpStream) -> Self { Self { - builder: AutoBuilder::new(H2Executor::new()), + builder: AutoBuilder::new(GlobalExecutor::new()), stream, service_builder: ServiceBuilder::new(), } @@ -163,7 +164,7 @@ impl HttpConnector { /// [`tower_async_http::propagate_header`]: crate::service::http::propagate_header pub fn propagate_header( self, - header: crate::net::http::HeaderName, + header: crate::http::HeaderName, ) -> HttpConnector< B, S, @@ -401,7 +402,7 @@ impl HttpConnector { >, > where - I: IntoIterator, + I: IntoIterator, { HttpConnector { builder: self.builder, @@ -418,7 +419,7 @@ impl HttpConnector { /// [`tower_async_http::sensitive_headers`]: crate::service::http::sensitive_headers pub fn sensitive_request_headers( self, - headers: std::sync::Arc<[crate::net::http::HeaderName]>, + headers: std::sync::Arc<[crate::http::HeaderName]>, ) -> HttpConnector< B, S, @@ -442,7 +443,7 @@ impl HttpConnector { /// [`tower_async_http::sensitive_headers`]: crate::service::http::sensitive_headers pub fn sensitive_response_headers( self, - headers: std::sync::Arc<[crate::net::http::HeaderName]>, + headers: std::sync::Arc<[crate::http::HeaderName]>, ) -> HttpConnector< B, S, @@ -468,7 +469,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn override_request_header( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -493,7 +494,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn append_request_header( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -516,7 +517,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn insert_request_header_if_not_present( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -542,7 +543,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn override_response_header( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -567,7 +568,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn append_response_header( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -590,7 +591,7 @@ impl HttpConnector { /// [`tower_async_http::set_header`]: crate::service::http::set_header pub fn insert_response_header_if_not_present( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make: M, ) -> HttpConnector< B, @@ -613,7 +614,7 @@ impl HttpConnector { /// [`tower_async_http::request_id`]: crate::service::http::request_id pub fn set_request_id( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, make_request_id: M, ) -> HttpConnector< B, @@ -662,7 +663,7 @@ impl HttpConnector { /// [`tower_async_http::request_id`]: crate::service::http::request_id pub fn propagate_request_id( self, - header_name: crate::net::http::HeaderName, + header_name: crate::http::HeaderName, ) -> HttpConnector< B, S, @@ -833,7 +834,7 @@ pub trait HyperConnServer { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -854,7 +855,7 @@ pub trait HyperConnWithUpgradesServer { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -876,7 +877,7 @@ impl HyperConnServer for Http1Builder { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -888,7 +889,7 @@ impl HyperConnServer for Http1Builder { Body::Error: Into>, { let io = Box::pin(io); - let stream = TokioIo::new(io); + let stream = HyperIo::new(io); self.serve_connection(stream, service).await?; Ok(()) } @@ -904,7 +905,7 @@ impl HyperConnWithUpgradesServer for Http1Builder { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -916,7 +917,7 @@ impl HyperConnWithUpgradesServer for Http1Builder { Body::Error: Into>, { let io = Box::pin(io); - let stream = TokioIo::new(io); + let stream = HyperIo::new(io); self.serve_connection(stream, service) .with_upgrades() .await?; @@ -924,7 +925,7 @@ impl HyperConnWithUpgradesServer for Http1Builder { } } -impl HyperConnServer for Http2Builder { +impl HyperConnServer for Http2Builder { #[inline] async fn hyper_serve_connection( &self, @@ -934,7 +935,7 @@ impl HyperConnServer for Http2Builder { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -946,13 +947,13 @@ impl HyperConnServer for Http2Builder { Body::Error: Into>, { let io = Box::pin(io); - let stream = TokioIo::new(io); + let stream = HyperIo::new(io); self.serve_connection(stream, service).await?; Ok(()) } } -impl HyperConnServer for AutoBuilder { +impl HyperConnServer for AutoBuilder { #[inline] async fn hyper_serve_connection( &self, @@ -962,7 +963,7 @@ impl HyperConnServer for AutoBuilder { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -974,13 +975,13 @@ impl HyperConnServer for AutoBuilder { Body::Error: Into>, { let io = Box::pin(io); - let stream = TokioIo::new(io); + let stream = HyperIo::new(io); self.serve_connection(stream, service).await?; Ok(()) } } -impl HyperConnWithUpgradesServer for AutoBuilder { +impl HyperConnWithUpgradesServer for AutoBuilder { #[inline] async fn hyper_serve_connection_with_upgrades( &self, @@ -990,7 +991,7 @@ impl HyperConnWithUpgradesServer for AutoBuilder { where S: crate::stream::Stream + Send + 'static, Service: hyper::service::Service< - crate::net::http::Request, + crate::http::Request, Response = Response, > + Send + Sync @@ -1002,7 +1003,7 @@ impl HyperConnWithUpgradesServer for AutoBuilder { Body::Error: Into>, { let io = Box::pin(io); - let stream = TokioIo::new(io); + let stream = HyperIo::new(io); self.serve_connection_with_upgrades(stream, service).await?; Ok(()) } diff --git a/src/http/server/executor.rs b/src/http/server/executor.rs new file mode 100644 index 00000000..07c46d97 --- /dev/null +++ b/src/http/server/executor.rs @@ -0,0 +1,39 @@ +/// Future executor that is used to spawn futures in hyper services, such as h2 web services. +#[non_exhaustive] +#[derive(Default, Debug, Clone)] +pub struct GlobalExecutor; + +impl hyper::rt::Executor for GlobalExecutor +where + Fut: std::future::Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + crate::rt::spawn(fut); + } +} + +impl GlobalExecutor { + /// Create new executor that relies on [`crate::rt::spawn`] to execute futures. + pub fn new() -> Self { + Self + } +} + +#[cfg(test)] +mod tests { + use super::GlobalExecutor; + + use crate::rt::sync::oneshot; + use hyper::rt::Executor; + + #[crate::rt::test] + async fn simple_execute() -> Result<(), Box> { + let (tx, rx) = oneshot::channel(); + let executor = GlobalExecutor::new(); + executor.execute(async move { + tx.send(()).unwrap(); + }); + rx.await.map_err(Into::into) + } +} diff --git a/src/http/server/io.rs b/src/http/server/io.rs new file mode 100644 index 00000000..7d1f0760 --- /dev/null +++ b/src/http/server/io.rs @@ -0,0 +1,161 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { + /// A wrapping implementing hyper IO traits for a type that + /// implements crate::rt::io's traits. + #[derive(Debug)] + pub struct HyperIo { + #[pin] + inner: T, + } +} + +impl HyperIo { + /// Wrap a type implementing crate::rt::io's traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl hyper::rt::Read for HyperIo +where + T: crate::rt::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = crate::rt::io::ReadBuf::uninit(buf.as_mut()); + match crate::rt::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for HyperIo +where + T: crate::rt::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + crate::rt::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + crate::rt::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + crate::rt::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + crate::rt::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + crate::rt::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl crate::rt::io::AsyncRead for HyperIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut crate::rt::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl crate::rt::io::AsyncWrite for HyperIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +} diff --git a/src/http/server/mod.rs b/src/http/server/mod.rs new file mode 100644 index 00000000..2dcf9a13 --- /dev/null +++ b/src/http/server/mod.rs @@ -0,0 +1,8 @@ +mod conn; +pub use conn::{HttpConnector, Request, Response, ServeResult}; + +mod executor; +pub use executor::GlobalExecutor; + +mod io; +pub use io::HyperIo; diff --git a/src/io.rs b/src/io.rs deleted file mode 100644 index 4c5620ce..00000000 --- a/src/io.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub use tokio::io::{ - copy, copy_bidirectional, copy_buf, duplex, empty, repeat, sink, split, AsyncBufReadExt, - AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, - Empty, Lines, ReadHalf, Repeat, Sink, Split, Take, WriteHalf, -}; diff --git a/src/lib.rs b/src/lib.rs index 7353c0bb..5108fdeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,24 @@ #![feature(return_type_notation)] #![allow(incomplete_features)] -pub mod client; -pub mod graceful; -pub mod io; -pub mod net; -pub mod runtime; -pub mod server; +extern crate self as rama; + +pub mod rt; + pub mod service; pub mod state; + pub mod stream; -pub mod sync; -pub use tokio::main; +pub mod tcp; + +pub mod http; +pub mod tls; -pub use tokio::pin; +#[allow(unreachable_pub)] +mod sealed { + pub trait Sealed {} +} -pub use tokio::{select, spawn}; +/// Alias for a type-erased error type. +pub type BoxError = Box; diff --git a/src/net/mod.rs b/src/net/mod.rs deleted file mode 100644 index 6c8b1086..00000000 --- a/src/net/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -mod tcp; -pub use tcp::TcpStream; - -pub mod http; - -pub use tokio::net::ToSocketAddrs; diff --git a/src/rt.rs b/src/rt.rs new file mode 100644 index 00000000..3544d97c --- /dev/null +++ b/src/rt.rs @@ -0,0 +1,13 @@ +//! Rama Runtime +//! +//! This crate provides a runtime for Rama applications. +//! +//! For now only Tokio is implemented and supported. +//! There is no plan to support other runtimes. +//! If you want to use other runtimes, you can +//! provide feedback, input and motivation at +//! , -// } - -// struct Router { - -// } - -// struct Endpoint { -// method: Method, -// service: Box, Response = Response, Error = E>>, -// } diff --git a/src/server/mod.rs b/src/server/mod.rs deleted file mode 100644 index 093439b1..00000000 --- a/src/server/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod http; -pub mod proxy; -pub mod tcp; -pub mod tls; diff --git a/src/server/proxy/mod.rs b/src/server/proxy/mod.rs deleted file mode 100644 index b998dd64..00000000 --- a/src/server/proxy/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod http; -pub use http::{HttpProxyConfig, HttpProxyError, HttpProxyLayer, HttpProxyService}; diff --git a/src/server/tcp/http/mod.rs b/src/server/tcp/http/mod.rs deleted file mode 100644 index 9011c604..00000000 --- a/src/server/tcp/http/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod conn; -pub use conn::{HttpConnector, Request, Response, ServeResult}; diff --git a/src/server/tls/rustls/mod.rs b/src/server/tls/rustls/mod.rs deleted file mode 100644 index 6a6301ba..00000000 --- a/src/server/tls/rustls/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub use rustls::server::WebPkiClientVerifier; -pub use rustls::ServerConfig as RustlsServerConfig; - -mod service; -pub use service::{RustlsAcceptorError, RustlsAcceptorLayer, RustlsAcceptorService}; diff --git a/src/service/hyper/service.rs b/src/service/hyper/service.rs index 3a386b5d..32c26caa 100644 --- a/src/service/hyper/service.rs +++ b/src/service/hyper/service.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use hyper::service::Service as HyperService; -use tower_async::Service; +use crate::service::Service; /// Trait to convert a [`tower_async::Service`] into a [`hyper::service::Service`]. /// @@ -69,7 +69,7 @@ mod test { s } - #[tokio::test] + #[crate::rt::test] async fn test_into_hyper_service() { let service = tower_async::service_fn(|req: &'static str| async move { Ok::<_, Infallible>(req) }); @@ -78,7 +78,7 @@ mod test { inner_test_hyper_service(hyper_service).await; } - #[tokio::test] + #[crate::rt::test] async fn test_into_layered_hyper_service() { let service = tower_async::ServiceBuilder::new() .timeout(std::time::Duration::from_secs(5)) diff --git a/src/service/mod.rs b/src/service/mod.rs index fc4ce644..ef92a407 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,6 +1,6 @@ //! `async fn(&self, Request) -> Result` -pub use tower_async::{service_fn, util::ServiceFn, BoxError, Layer, Service, ServiceBuilder}; +pub use tower_async::{service_fn, util::ServiceFn, Layer, Service, ServiceBuilder}; pub mod util { //! Various utility types and functions that are generally used with a `Service`. diff --git a/src/service/spawn.rs b/src/service/spawn.rs index 7054ed52..fbfc077f 100644 --- a/src/service/spawn.rs +++ b/src/service/spawn.rs @@ -1,7 +1,8 @@ use crate::{ - graceful::ShutdownGuard, - service::{BoxError, Layer, Service}, + rt::graceful::ShutdownGuard, + service::{Layer, Service}, state::Extendable, + BoxError, }; pub struct SpawnService { @@ -33,7 +34,9 @@ where } }); } else { - crate::spawn(async move { + // TODO: ideally we spawn not using this global spawn handle, + // and instead get it from the request. + crate::rt::spawn(async move { if let Err(err) = service.call(request).await { let err = err.into(); tracing::error!(error = err, "service error"); diff --git a/src/state.rs b/src/state.rs index 30f75306..d9cd80c2 100644 --- a/src/state.rs +++ b/src/state.rs @@ -15,7 +15,7 @@ impl Extendable for Extensions { } } -impl Extendable for crate::net::http::Request { +impl Extendable for crate::http::Request { fn extensions(&self) -> &Extensions { self.extensions() } @@ -25,7 +25,7 @@ impl Extendable for crate::net::http::Request { } } -impl Extendable for crate::net::http::Response { +impl Extendable for crate::http::Response { fn extensions(&self) -> &Extensions { self.extensions() } diff --git a/src/stream/layer/tracker/bytes.rs b/src/stream/layer/tracker/bytes.rs index 20512ee9..976d8cbc 100644 --- a/src/stream/layer/tracker/bytes.rs +++ b/src/stream/layer/tracker/bytes.rs @@ -19,7 +19,7 @@ use std::{ task::{Context, Poll}, }; -use crate::stream::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::rt::io::{AsyncRead, AsyncWrite, ReadBuf}; use pin_project_lite::pin_project; @@ -177,10 +177,10 @@ impl BytesRWTrackerHandle { mod tests { use super::*; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use tokio_test::io::Builder; + use crate::rt::io::{AsyncReadExt, AsyncWriteExt}; + use crate::rt::test_util::io::Builder; - #[tokio::test] + #[crate::rt::test] async fn test_read_tracker() { let stream = Builder::new() .read(b"foo") @@ -204,7 +204,7 @@ mod tests { assert_eq!(tracker.written(), 0); } - #[tokio::test] + #[crate::rt::test] async fn test_written_tracker() { let stream = Builder::new() .write(b"foo") @@ -227,7 +227,7 @@ mod tests { assert_eq!(tracker.written(), 9); } - #[tokio::test] + #[crate::rt::test] async fn test_rw_tracker() { let stream = Builder::new() .read(b"foo") @@ -263,7 +263,7 @@ mod tests { assert_eq!(tracker.written(), 9); } - #[tokio::test] + #[crate::rt::test] async fn test_rw_handle_tracker() { let stream = Builder::new() .read(b"foo") @@ -280,11 +280,11 @@ mod tests { assert_eq!(handle.read(), 0); assert_eq!(handle.written(), 0); - let (action_tx, mut action_rx) = tokio::sync::mpsc::channel(1); - let (check_tx, mut check_rx) = tokio::sync::broadcast::channel(1); + let (action_tx, mut action_rx) = crate::rt::sync::mpsc::channel(1); + let (check_tx, mut check_rx) = crate::rt::sync::broadcast::channel(1); let check_rx_2 = check_tx.subscribe(); - let task_1 = crate::spawn(async move { + let task_1 = crate::rt::spawn(async move { let mut tracker = tracker; let mut buf = [0u8; 3]; @@ -316,7 +316,7 @@ mod tests { let task_2 = { let handle = handle.clone(); let mut check_rx = check_rx_2; - crate::spawn(async move { + crate::rt::spawn(async move { check_rx.recv().await.unwrap(); assert_eq!(handle.read(), 3); diff --git a/src/stream/layer/tracker/mod.rs b/src/stream/layer/tracker/mod.rs index 247ff327..39eb21dc 100644 --- a/src/stream/layer/tracker/mod.rs +++ b/src/stream/layer/tracker/mod.rs @@ -1,6 +1,6 @@ use crate::{ - net::TcpStream, service::{Layer, Service}, + tcp::TcpStream, }; mod bytes; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 52157700..cd3fa0b4 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,4 +1,4 @@ -pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::rt::io::{AsyncRead, AsyncWrite}; pub mod layer; pub mod service; @@ -6,5 +6,3 @@ pub mod service; pub trait Stream: AsyncRead + AsyncWrite {} impl Stream for T where T: AsyncRead + AsyncWrite {} - -pub use tokio::io::{AsyncReadExt, AsyncWriteExt}; diff --git a/src/stream/service/echo.rs b/src/stream/service/echo.rs index 5cb49b78..1f505057 100644 --- a/src/stream/service/echo.rs +++ b/src/stream/service/echo.rs @@ -11,7 +11,7 @@ use crate::{service::Service, stream::Stream}; /// /// # #[rama::main] /// # async fn main() -> Result<(), Box> { -/// # let stream = tokio_test::io::Builder::new().read(b"hello world").write(b"hello world").build(); +/// # let stream = crate::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); /// let service = EchoService::new(); /// /// let bytes_copied = service.call(stream).await?; @@ -45,8 +45,8 @@ where type Error = Error; async fn call(&self, stream: S) -> Result { - let (mut reader, mut writer) = tokio::io::split(stream); - tokio::io::copy(&mut reader, &mut writer).await + let (mut reader, mut writer) = crate::rt::io::split(stream); + crate::rt::io::copy(&mut reader, &mut writer).await } } @@ -54,9 +54,9 @@ where mod tests { use super::*; - use tokio_test::io::Builder; + use crate::rt::test_util::io::Builder; - #[tokio::test] + #[crate::rt::test] async fn test_echo() { let stream = Builder::new() .read(b"one") diff --git a/src/stream/service/forward.rs b/src/stream/service/forward.rs index 95f81390..022f5f44 100644 --- a/src/stream/service/forward.rs +++ b/src/stream/service/forward.rs @@ -1,10 +1,6 @@ -use std::{io::Error, pin::Pin}; +use std::{io::Error, pin::Pin, sync::Arc}; -use crate::{ - service::Service, - stream::Stream, - sync::{Arc, AsyncMutex}, -}; +use crate::{rt::sync::Mutex as AsyncMutex, service::Service, stream::Stream}; /// Async service which forwards the incoming connection bytes to the given destination, /// and forwards the response back from the destination to the incoming connection. @@ -14,10 +10,10 @@ use crate::{ /// ```rust /// use rama::{service::Service, stream::service::ForwardService}; /// -/// # #[tokio::main] +/// # #[crate::rt::main] /// # async fn main() -> Result<(), Box> { -/// # let destination = tokio_test::io::Builder::new().write(b"hello world").read(b"hello world").build(); -/// # let stream = tokio_test::io::Builder::new().read(b"hello world").write(b"hello world").build(); +/// # let destination = crate::rt::test_util::io::Builder::new().write(b"hello world").read(b"hello world").build(); +/// # let stream = crate::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); /// let service = ForwardService::new(destination); /// /// let (bytes_copied_to, bytes_copied_from) = service.call(stream).await?; @@ -60,10 +56,10 @@ where type Error = Error; async fn call(&self, source: S) -> Result { - crate::pin!(source); + crate::rt::pin!(source); let mut destination_guard = self.destination.lock().await; let mut destination = destination_guard.as_mut(); - tokio::io::copy_bidirectional(&mut source, &mut destination).await + crate::rt::io::copy_bidirectional(&mut source, &mut destination).await } } @@ -71,9 +67,9 @@ where mod tests { use super::*; - use tokio_test::io::Builder; + use crate::rt::test_util::io::Builder; - #[tokio::test] + #[crate::rt::test] async fn test_forwarder() { let destination = Builder::new() .write(b"to(1)") diff --git a/src/sync.rs b/src/sync.rs deleted file mode 100644 index 3059579b..00000000 --- a/src/sync.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub use std::sync::{ - atomic::{ - AtomicBool, AtomicI16, AtomicI32, AtomicI64, AtomicI8, AtomicIsize, AtomicPtr, AtomicU16, - AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering as AtomicOrdering, - }, - Arc, Mutex, -}; - -pub use tokio::sync::Mutex as AsyncMutex; diff --git a/src/client/tcp/forward.rs b/src/tcp/client/forward.rs similarity index 94% rename from src/client/tcp/forward.rs rename to src/tcp/client/forward.rs index fc3034b5..11124f92 100644 --- a/src/client/tcp/forward.rs +++ b/src/tcp/client/forward.rs @@ -26,9 +26,9 @@ impl Service> for ForwardService { mod tests { use super::*; - use tokio_test::io::Builder; + use crate::rt::test_util::io::Builder; - #[tokio::test] + #[rama::rt::test] async fn test_forwarder() { let destination = Builder::new() .write(b"to(1)") diff --git a/src/tcp/client/mod.rs b/src/tcp/client/mod.rs new file mode 100644 index 00000000..c4d13903 --- /dev/null +++ b/src/tcp/client/mod.rs @@ -0,0 +1,10 @@ +use super::TcpStream; + +use crate::rt::net::{TcpStream as AsyncTcpStream, ToSocketAddrs}; + +pub async fn connect( + address: impl ToSocketAddrs, +) -> Result, std::io::Error> { + let stream = AsyncTcpStream::connect(address).await?; + Ok(TcpStream::new(stream)) +} diff --git a/src/net/tcp.rs b/src/tcp/mod.rs similarity index 94% rename from src/net/tcp.rs rename to src/tcp/mod.rs index d471bc31..fa8faa06 100644 --- a/src/net/tcp.rs +++ b/src/tcp/mod.rs @@ -5,13 +5,17 @@ use std::{ task::{Context, Poll}, }; -use tokio::net::TcpStream as TokioTcpStream; - use crate::{ + rt::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream as TokioTcpStream, + }, state::{Extendable, Extensions}, - stream::{AsyncRead, AsyncWrite, ReadBuf}, }; +pub mod client; +pub mod server; + pin_project_lite::pin_project! { #[derive(Debug)] pub struct TcpStream { diff --git a/src/server/tcp/listener.rs b/src/tcp/server/listener.rs similarity index 94% rename from src/server/tcp/listener.rs rename to src/tcp/server/listener.rs index b414b293..2ccfdded 100644 --- a/src/server/tcp/listener.rs +++ b/src/tcp/server/listener.rs @@ -1,19 +1,22 @@ use std::{future::Future, io, net::SocketAddr}; -use tokio::net::{TcpListener as TokioTcpListener, TcpStream as TokioTcpStream}; +use crate::rt::{ + graceful::ShutdownGuard, + net::{TcpListener as AsyncTcpListener, TcpStream as AsyncTcpStream, ToSocketAddrs}, +}; use crate::{ - graceful::ShutdownGuard, - net::{TcpStream, ToSocketAddrs}, service::{ util::{Identity, Stack}, - BoxError, Layer, Service, ServiceBuilder, + Layer, Service, ServiceBuilder, }, state::Extendable, + tcp::TcpStream, + BoxError, }; pub struct TcpListener { - inner: TokioTcpListener, + inner: AsyncTcpListener, builder: ServiceBuilder, } @@ -26,7 +29,7 @@ impl TcpListener { /// to this listener. The port allocated can be queried via the `local_addr` /// method. pub async fn bind(addr: A) -> io::Result { - let inner = TokioTcpListener::bind(addr).await?; + let inner = AsyncTcpListener::bind(addr).await?; let builder = ServiceBuilder::new(); Ok(TcpListener { inner, builder }) } @@ -166,7 +169,7 @@ impl TcpListener { pub async fn serve(self, service: S) -> TcpServeResult<()> where L: Layer, - L::Service: Service, Response = T, Error = E>, + L::Service: Service, Response = T, Error = E>, E: Into, { let service = self.builder.service(service); @@ -187,7 +190,7 @@ impl TcpListener { pub async fn serve_fn(self, service: F) -> TcpServeResult<()> where L: Layer>, - L::Service: Service, Response = T, Error = E>, + L::Service: Service, Response = T, Error = E>, E: Into, F: Fn(TcpStream) -> Fut, Fut: Future>, @@ -208,14 +211,14 @@ impl TcpListener { ) -> TcpServeResult<()> where L: Layer, - L::Service: Service, Response = T, Error = E>, + L::Service: Service, Response = T, Error = E>, E: Into, { let service = self.builder.service(service); loop { let guard = guard.clone(); - crate::select! { + crate::rt::select! { _ = guard.cancelled() => { tracing::info!("signal received: initiate graceful shutdown"); break Ok(()); @@ -246,7 +249,7 @@ impl TcpListener { ) -> TcpServeResult<()> where L: Layer>, - L::Service: Service, Response = T, Error = E>, + L::Service: Service, Response = T, Error = E>, E: Into, F: Fn(TcpStream) -> Fut, Fut: Future>, diff --git a/src/server/tcp/mod.rs b/src/tcp/server/mod.rs similarity index 83% rename from src/server/tcp/mod.rs rename to src/tcp/server/mod.rs index b446e6e7..9c14cd02 100644 --- a/src/server/tcp/mod.rs +++ b/src/tcp/server/mod.rs @@ -1,4 +1,2 @@ mod listener; pub use listener::{TcpListener, TcpServeError, TcpServeResult}; - -pub mod http; diff --git a/src/tls/mod.rs b/src/tls/mod.rs new file mode 100644 index 00000000..74f47ad3 --- /dev/null +++ b/src/tls/mod.rs @@ -0,0 +1 @@ +pub mod server; diff --git a/src/tls/server/mod.rs b/src/tls/server/mod.rs new file mode 100644 index 00000000..3480c12e --- /dev/null +++ b/src/tls/server/mod.rs @@ -0,0 +1 @@ +pub mod rustls; diff --git a/src/server/tls/rustls/service.rs b/src/tls/server/rustls.rs similarity index 88% rename from src/server/tls/rustls/service.rs rename to src/tls/server/rustls.rs index 55f97856..cfe965ab 100644 --- a/src/server/tls/rustls/service.rs +++ b/src/tls/server/rustls.rs @@ -1,15 +1,17 @@ use std::sync::Arc; -use tokio_rustls::TlsAcceptor; - use crate::{ - net::TcpStream, + rt::tls::rustls::{server::TlsStream, TlsAcceptor}, service::{Layer, Service}, stream::Stream, + tcp::TcpStream, }; +pub use rustls::server::WebPkiClientVerifier; +pub use rustls::ServerConfig as RustlsServerConfig; + pub struct RustlsAcceptorService { - acceptor: tokio_rustls::TlsAcceptor, + acceptor: TlsAcceptor, inner: S, } @@ -33,7 +35,7 @@ where impl Service> for RustlsAcceptorService where - S: Service>>, + S: Service>>, I: Stream + Unpin, { type Response = S::Response; @@ -80,7 +82,7 @@ pub struct RustlsAcceptorLayer { } impl RustlsAcceptorLayer { - pub fn new(config: super::RustlsServerConfig) -> Self { + pub fn new(config: RustlsServerConfig) -> Self { Self { acceptor: TlsAcceptor::from(Arc::new(config)), } From 2e4cd0888043875ab0f13b97d6bab75726fbe6c2 Mon Sep 17 00:00:00 2001 From: glendc Date: Tue, 28 Nov 2023 22:32:09 +0100 Subject: [PATCH 2/3] clean up rama-rt-macros --- Cargo.lock | 4 +- rama-rt-macros/src/entry.rs | 200 +----------- rama-rt-macros/src/lib.rs | 449 +-------------------------- rama-rt-macros/src/select.rs | 109 ------- snippets/README.md | 5 + src/http/middleware/header_config.rs | 6 +- src/http/server/executor.rs | 2 +- src/lib.rs | 2 - src/rt.rs | 2 +- src/service/http/mod.rs | 2 +- src/service/hyper/service.rs | 4 +- src/stream/layer/tracker/bytes.rs | 8 +- src/stream/service/echo.rs | 6 +- src/stream/service/forward.rs | 8 +- src/tcp/client/forward.rs | 2 +- src/tcp/server/listener.rs | 2 +- 16 files changed, 42 insertions(+), 769 deletions(-) delete mode 100644 rama-rt-macros/src/select.rs diff --git a/Cargo.lock b/Cargo.lock index 3de6b32e..67ad86ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1546,9 +1546,9 @@ checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "web-sys" -version = "0.3.65" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" +checksum = "50c24a44ec86bb68fbecd1b3efed7e85ea5621b39b35ef2766b66cd984f8010f" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/rama-rt-macros/src/entry.rs b/rama-rt-macros/src/entry.rs index d1c760c6..641b649a 100644 --- a/rama-rt-macros/src/entry.rs +++ b/rama-rt-macros/src/entry.rs @@ -6,78 +6,30 @@ use syn::{braced, Attribute, Ident, Path, Signature, Visibility}; // syn::AttributeArgs does not implement syn::Parse type AttributeArgs = syn::punctuated::Punctuated; -#[derive(Clone, Copy, PartialEq)] -enum RuntimeFlavor { - CurrentThread, - Threaded, -} - -impl RuntimeFlavor { - fn from_str(s: &str) -> Result { - match s { - "current_thread" => Ok(RuntimeFlavor::CurrentThread), - "multi_thread" => Ok(RuntimeFlavor::Threaded), - "single_thread" => Err("The single threaded runtime flavor is called `current_thread`.".to_string()), - "basic_scheduler" => Err("The `basic_scheduler` runtime flavor has been renamed to `current_thread`.".to_string()), - "threaded_scheduler" => Err("The `threaded_scheduler` runtime flavor has been renamed to `multi_thread`.".to_string()), - _ => Err(format!("No such runtime flavor `{}`. The runtime flavors are `current_thread` and `multi_thread`.", s)), - } - } -} - struct FinalConfig { - flavor: RuntimeFlavor, worker_threads: Option, - start_paused: Option, crate_name: Option, } /// Config used in case of the attribute not being able to build a valid config const DEFAULT_ERROR_CONFIG: FinalConfig = FinalConfig { - flavor: RuntimeFlavor::CurrentThread, worker_threads: None, - start_paused: None, crate_name: None, }; struct Configuration { - rt_multi_thread_available: bool, - default_flavor: RuntimeFlavor, - flavor: Option, worker_threads: Option<(usize, Span)>, - start_paused: Option<(bool, Span)>, - is_test: bool, crate_name: Option, } impl Configuration { - fn new(is_test: bool, rt_multi_thread: bool) -> Self { + fn new() -> Self { Configuration { - rt_multi_thread_available: rt_multi_thread, - default_flavor: match is_test { - true => RuntimeFlavor::CurrentThread, - false => RuntimeFlavor::Threaded, - }, - flavor: None, worker_threads: None, - start_paused: None, - is_test, crate_name: None, } } - fn set_flavor(&mut self, runtime: syn::Lit, span: Span) -> Result<(), syn::Error> { - if self.flavor.is_some() { - return Err(syn::Error::new(span, "`flavor` set multiple times.")); - } - - let runtime_str = parse_string(runtime, span, "flavor")?; - let runtime = - RuntimeFlavor::from_str(&runtime_str).map_err(|err| syn::Error::new(span, err))?; - self.flavor = Some(runtime); - Ok(()) - } - fn set_worker_threads( &mut self, worker_threads: syn::Lit, @@ -98,16 +50,6 @@ impl Configuration { Ok(()) } - fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> { - if self.start_paused.is_some() { - return Err(syn::Error::new(span, "`start_paused` set multiple times.")); - } - - let start_paused = parse_bool(start_paused, span, "start_paused")?; - self.start_paused = Some((start_paused, span)); - Ok(()) - } - fn set_crate_name(&mut self, name: syn::Lit, span: Span) -> Result<(), syn::Error> { if self.crate_name.is_some() { return Err(syn::Error::new(span, "`crate` set multiple times.")); @@ -117,57 +59,12 @@ impl Configuration { Ok(()) } - fn macro_name(&self) -> &'static str { - if self.is_test { - "rama::rt::test" - } else { - "rama::rt::main" - } - } - fn build(&self) -> Result { - use RuntimeFlavor as F; - - let flavor = self.flavor.unwrap_or(self.default_flavor); - let worker_threads = match (flavor, self.worker_threads) { - (F::CurrentThread, Some((_, worker_threads_span))) => { - let msg = format!( - "The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[{}(flavor = \"multi_thread\")]`", - self.macro_name(), - ); - return Err(syn::Error::new(worker_threads_span, msg)); - } - (F::CurrentThread, None) => None, - (F::Threaded, worker_threads) if self.rt_multi_thread_available => { - worker_threads.map(|(val, _span)| val) - } - (F::Threaded, _) => { - let msg = if self.flavor.is_none() { - "The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled." - } else { - "The runtime flavor `multi_thread` requires the `rt-multi-thread` feature." - }; - return Err(syn::Error::new(Span::call_site(), msg)); - } - }; - - let start_paused = match (flavor, self.start_paused) { - (F::Threaded, Some((_, start_paused_span))) => { - let msg = format!( - "The `start_paused` option requires the `current_thread` runtime flavor. Use `#[{}(flavor = \"current_thread\")]`", - self.macro_name(), - ); - return Err(syn::Error::new(start_paused_span, msg)); - } - (F::CurrentThread, Some((start_paused, _))) => Some(start_paused), - (_, None) => None, - }; + let worker_threads = self.worker_threads.map(|t| t.0); Ok(FinalConfig { crate_name: self.crate_name.clone(), - flavor, worker_threads, - start_paused, }) } } @@ -188,17 +85,6 @@ fn parse_int(int: syn::Lit, span: Span, field: &str) -> Result Result { - match int { - syn::Lit::Str(s) => Ok(s.value()), - syn::Lit::Verbatim(s) => Ok(s.to_string()), - _ => Err(syn::Error::new( - span, - format!("Failed to parse value of `{}` as string.", field), - )), - } -} - fn parse_path(lit: syn::Lit, span: Span, field: &str) -> Result { match lit { syn::Lit::Str(s) => { @@ -219,29 +105,13 @@ fn parse_path(lit: syn::Lit, span: Span, field: &str) -> Result Result { - match bool { - syn::Lit::Bool(b) => Ok(b.value), - _ => Err(syn::Error::new( - span, - format!("Failed to parse value of `{}` as bool.", field), - )), - } -} - -fn build_config( - input: &ItemFn, - args: AttributeArgs, - is_test: bool, - rt_multi_thread: bool, -) -> Result { +fn build_config(input: &ItemFn, args: AttributeArgs) -> Result { if input.sig.asyncness.is_none() { let msg = "the `async` keyword is missing from the function declaration"; return Err(syn::Error::new_spanned(input.sig.fn_token, msg)); } - let mut config = Configuration::new(is_test, rt_multi_thread); - let macro_name = config.macro_name(); + let mut config = Configuration::new(); for arg in args { match arg { @@ -262,56 +132,18 @@ fn build_config( "worker_threads" => { config.set_worker_threads(lit.clone(), syn::spanned::Spanned::span(lit))?; } - "flavor" => { - config.set_flavor(lit.clone(), syn::spanned::Spanned::span(lit))?; - } - "start_paused" => { - config.set_start_paused(lit.clone(), syn::spanned::Spanned::span(lit))?; - } - "core_threads" => { - let msg = "Attribute `core_threads` is renamed to `worker_threads`"; - return Err(syn::Error::new_spanned(namevalue, msg)); - } "crate" => { config.set_crate_name(lit.clone(), syn::spanned::Spanned::span(lit))?; } name => { let msg = format!( - "Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`, `crate`", + "Unknown attribute {} is specified; expected one of: `worker_threads``, `crate`", name, ); return Err(syn::Error::new_spanned(namevalue, msg)); } } } - syn::Meta::Path(path) => { - let name = path - .get_ident() - .ok_or_else(|| syn::Error::new_spanned(&path, "Must have specified ident"))? - .to_string() - .to_lowercase(); - let msg = match name.as_str() { - "threaded_scheduler" | "multi_thread" => { - format!( - "Set the runtime flavor with #[{}(flavor = \"multi_thread\")].", - macro_name - ) - } - "basic_scheduler" | "current_thread" | "single_threaded" => { - format!( - "Set the runtime flavor with #[{}(flavor = \"current_thread\")].", - macro_name - ) - } - "flavor" | "worker_threads" | "start_paused" => { - format!("The `{}` attribute requires an argument.", name) - } - name => { - format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`, `crate`", name) - } - }; - return Err(syn::Error::new_spanned(path, msg)); - } other => { return Err(syn::Error::new_spanned( other, @@ -345,20 +177,12 @@ fn parse_knobs(mut input: ItemFn, is_test: bool, config: FinalConfig) -> TokenSt .map(ToTokens::into_token_stream) .unwrap_or_else(|| Ident::new("rama", last_stmt_start_span).into_token_stream()); - let mut rt = match config.flavor { - RuntimeFlavor::CurrentThread => quote_spanned! {last_stmt_start_span=> - #crate_path::rt::Builder::new_current_thread() - }, - RuntimeFlavor::Threaded => quote_spanned! {last_stmt_start_span=> - #crate_path::rt::Builder::new_multi_thread() - }, + let mut rt = quote_spanned! { + last_stmt_start_span => #crate_path::rt::Builder::new_multi_thread() }; if let Some(v) = config.worker_threads { rt = quote_spanned! {last_stmt_start_span=> #rt.worker_threads(#v) }; } - if let Some(v) = config.start_paused { - rt = quote_spanned! {last_stmt_start_span=> #rt.start_paused(#v) }; - } let header = if is_test { quote! { @@ -401,7 +225,7 @@ fn parse_knobs(mut input: ItemFn, is_test: bool, config: FinalConfig) -> TokenSt }; quote! { let body = async #body; - #crate_path::pin!(body); + #crate_path::rt::pin!(body); let body: ::core::pin::Pin<&mut dyn ::core::future::Future> = body; } } else { @@ -419,7 +243,7 @@ fn token_stream_with_error(mut tokens: TokenStream, error: syn::Error) -> TokenS } #[cfg(not(test))] // Work around for rust-lang/rust#62127 -pub(crate) fn main(args: TokenStream, item: TokenStream, rt_multi_thread: bool) -> TokenStream { +pub(crate) fn main(args: TokenStream, item: TokenStream) -> TokenStream { // If any of the steps for this macro fail, we still want to expand to an item that is as close // to the expected output as possible. This helps out IDEs such that completions and other // related features keep working. @@ -434,7 +258,7 @@ pub(crate) fn main(args: TokenStream, item: TokenStream, rt_multi_thread: bool) } else { AttributeArgs::parse_terminated .parse2(args) - .and_then(|args| build_config(&input, args, false, rt_multi_thread)) + .and_then(|args| build_config(&input, args)) }; match config { @@ -443,7 +267,7 @@ pub(crate) fn main(args: TokenStream, item: TokenStream, rt_multi_thread: bool) } } -pub(crate) fn test(args: TokenStream, item: TokenStream, rt_multi_thread: bool) -> TokenStream { +pub(crate) fn test(args: TokenStream, item: TokenStream) -> TokenStream { // If any of the steps for this macro fail, we still want to expand to an item that is as close // to the expected output as possible. This helps out IDEs such that completions and other // related features keep working. @@ -457,7 +281,7 @@ pub(crate) fn test(args: TokenStream, item: TokenStream, rt_multi_thread: bool) } else { AttributeArgs::parse_terminated .parse2(args) - .and_then(|args| build_config(&input, args, true, rt_multi_thread)) + .and_then(|args| build_config(&input, args)) }; match config { diff --git a/rama-rt-macros/src/lib.rs b/rama-rt-macros/src/lib.rs index 669043eb..62aad7e5 100644 --- a/rama-rt-macros/src/lib.rs +++ b/rama-rt-macros/src/lib.rs @@ -18,467 +18,22 @@ extern crate proc_macro; mod entry; -mod select; use proc_macro::TokenStream; /// Marks async function to be executed by the selected runtime. This macro /// helps set up a `Runtime` without requiring the user to use /// Runtime or Builder directly. -/// -/// Note: This macro is designed to be simplistic and targets applications that -/// do not require a complex setup. If the provided functionality is not -/// sufficient, you may be interested in using Builder, which provides a more -/// powerful interface. -/// -/// Note: This macro can be used on any function and not just the `main` -/// function. Using it on a non-main function makes the function behave as if it -/// was synchronous by starting a new runtime each time it is called. If the -/// function is called often, it is preferable to create the runtime using the -/// runtime builder so the runtime can be reused across calls. -/// -/// # Non-worker async function -/// -/// Note that the async function marked with this macro does not run as a -/// worker. The expectation is that other tasks are spawned by the function here. -/// Awaiting on other futures from the function provided here will not -/// perform as fast as those spawned as workers. -/// -/// # Multi-threaded runtime -/// -/// To use the multi-threaded runtime, the macro can be configured using -/// -/// ``` -/// #[rama::rt::main(flavor = "multi_thread", worker_threads = 10)] -/// # async fn main() {} -/// ``` -/// -/// The `worker_threads` option configures the number of worker threads, and -/// defaults to the number of cpus on the system. This is the default flavor. -/// -/// Note: The multi-threaded runtime requires the `rt-multi-thread` feature -/// flag. -/// -/// # Current thread runtime -/// -/// To use the single-threaded runtime known as the `current_thread` runtime, -/// the macro can be configured using -/// -/// ``` -/// #[rama::rt::main(flavor = "current_thread")] -/// # async fn main() {} -/// ``` -/// -/// ## Function arguments: -/// -/// Arguments are allowed for any functions aside from `main` which is special -/// -/// ## Usage -/// -/// ### Using the multi-thread runtime -/// -/// ```rust -/// #[rama::rt::main] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// fn main() { -/// rama::rt::Builder::new_multi_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### Using current thread runtime -/// -/// The basic scheduler is single-threaded. -/// -/// ```rust -/// #[rama::rt::main(flavor = "current_thread")] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// fn main() { -/// rama::rt::Builder::new_current_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### Set number of worker threads -/// -/// ```rust -/// #[rama::rt::main(worker_threads = 2)] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// fn main() { -/// rama::rt::Builder::new_multi_thread() -/// .worker_threads(2) -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### Configure the runtime to start with time paused -/// -/// ```rust -/// #[rama::rt::main(flavor = "current_thread", start_paused = true)] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// fn main() { -/// rama::rt::Builder::new_current_thread() -/// .enable_all() -/// .start_paused(true) -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// Note that `start_paused` requires the `test-util` feature to be enabled. -/// -/// ### Rename package -/// -/// ```rust -/// use rama as rama1; -/// -/// #[rama1::main(crate = "rama1")] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// use rama as rama1; -/// -/// fn main() { -/// rama1::rt::Builder::new_multi_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` #[proc_macro_attribute] #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { - entry::main(args.into(), item.into(), true).into() -} - -/// Marks async function to be executed by selected runtime. This macro helps set up a `Runtime` -/// without requiring the user to use Runtime or Builder directly. -/// -/// ## Function arguments: -/// -/// Arguments are allowed for any functions aside from `main` which is special -/// -/// ## Usage -/// -/// ### Using default -/// -/// ```rust -/// #[rama::rt::main(flavor = "current_thread")] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// fn main() { -/// rama::rt::Builder::new_current_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -/// -/// ### Rename package -/// -/// ```rust -/// use rama as rama1; -/// -/// #[rama1::main(crate = "rama")] -/// async fn main() { -/// println!("Hello world"); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::main]` -/// -/// ```rust -/// use rama as rama1; -/// -/// fn main() { -/// rama1::rt::Builder::new_multi_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// println!("Hello world"); -/// }) -/// } -/// ``` -#[proc_macro_attribute] -#[cfg(not(test))] // Work around for rust-lang/rust#62127 -pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { - entry::main(args.into(), item.into(), false).into() + entry::main(args.into(), item.into()).into() } /// Marks async function to be executed by runtime, suitable to test environment. /// This macro helps set up a `Runtime` without requiring the user to use /// Runtime or Builder directly. -/// -/// Note: This macro is designed to be simplistic and targets applications that -/// do not require a complex setup. If the provided functionality is not -/// sufficient, you may be interested in using Builder, which provides a more -/// powerful interface. -/// -/// # Multi-threaded runtime -/// -/// To use the multi-threaded runtime, the macro can be configured using -/// -/// ```no_run -/// #[rama::rt::test(flavor = "multi_thread", worker_threads = 1)] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// The `worker_threads` option configures the number of worker threads, and -/// defaults to the number of cpus on the system. -/// -/// Note: The multi-threaded runtime requires the `rt-multi-thread` feature -/// flag. -/// -/// # Current thread runtime -/// -/// The default test runtime is single-threaded. Each test gets a -/// separate current-thread runtime. -/// -/// ```no_run -/// #[rama::rt::test] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// ## Usage -/// -/// ### Using the multi-thread runtime -/// -/// ```no_run -/// #[rama::rt::test(flavor = "multi_thread")] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::test]` -/// -/// ```no_run -/// #[test] -/// fn my_test() { -/// rama::rt::Builder::new_multi_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// assert!(true); -/// }) -/// } -/// ``` -/// -/// ### Using current thread runtime -/// -/// ```no_run -/// #[rama::rt::test] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::test]` -/// -/// ```no_run -/// #[test] -/// fn my_test() { -/// rama::rt::Builder::new_current_thread() -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// assert!(true); -/// }) -/// } -/// ``` -/// -/// ### Set number of worker threads -/// -/// ```no_run -/// #[rama::rt::test(flavor ="multi_thread", worker_threads = 2)] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::test]` -/// -/// ```no_run -/// #[test] -/// fn my_test() { -/// rama::rt::Builder::new_multi_thread() -/// .worker_threads(2) -/// .enable_all() -/// .build() -/// .unwrap() -/// .block_on(async { -/// assert!(true); -/// }) -/// } -/// ``` -/// -/// ### Configure the runtime to start with time paused -/// -/// ```no_run -/// #[rama::rt::test(start_paused = true)] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -/// -/// Equivalent code not using `#[rama::rt::test]` -/// -/// ```no_run -/// #[test] -/// fn my_test() { -/// rama::rt::Builder::new_current_thread() -/// .enable_all() -/// .start_paused(true) -/// .build() -/// .unwrap() -/// .block_on(async { -/// assert!(true); -/// }) -/// } -/// ``` -/// -/// Note that `start_paused` requires the `test-util` feature to be enabled. -/// -/// ### Rename package -/// -/// ```rust -/// use rama as rama1; -/// -/// #[rama1::test(crate = "rama1")] -/// async fn my_test() { -/// println!("Hello world"); -/// } -/// ``` #[proc_macro_attribute] pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { - entry::test(args.into(), item.into(), true).into() -} - -/// Marks async function to be executed by runtime, suitable to test environment -/// -/// ## Usage -/// -/// ```no_run -/// #[rama::rt::test] -/// async fn my_test() { -/// assert!(true); -/// } -/// ``` -#[proc_macro_attribute] -pub fn test_rt(args: TokenStream, item: TokenStream) -> TokenStream { - entry::test(args.into(), item.into(), false).into() -} - -/// Always fails with the error message below. -/// ```text -/// The #[rama::rt::main] macro requires rt or rt-multi-thread. -/// ``` -#[proc_macro_attribute] -pub fn main_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { - syn::Error::new( - proc_macro2::Span::call_site(), - "The #[rama::rt::main] macro requires rt or rt-multi-thread.", - ) - .to_compile_error() - .into() -} - -/// Always fails with the error message below. -/// ```text -/// The #[rama::rt::test] macro requires rt or rt-multi-thread. -/// ``` -#[proc_macro_attribute] -pub fn test_fail(_args: TokenStream, _item: TokenStream) -> TokenStream { - syn::Error::new( - proc_macro2::Span::call_site(), - "The #[rama::rt::test] macro requires rt or rt-multi-thread.", - ) - .to_compile_error() - .into() -} - -/// Implementation detail of the `select!` macro. This macro is **not** intended -/// to be used as part of the public API and is permitted to change. -#[proc_macro] -#[doc(hidden)] -pub fn select_priv_declare_output_enum(input: TokenStream) -> TokenStream { - select::declare_output_enum(input) -} - -/// Implementation detail of the `select!` macro. This macro is **not** intended -/// to be used as part of the public API and is permitted to change. -#[proc_macro] -#[doc(hidden)] -pub fn select_priv_clean_pattern(input: TokenStream) -> TokenStream { - select::clean_pattern_macro(input) + entry::test(args.into(), item.into()).into() } diff --git a/rama-rt-macros/src/select.rs b/rama-rt-macros/src/select.rs deleted file mode 100644 index 324b8f94..00000000 --- a/rama-rt-macros/src/select.rs +++ /dev/null @@ -1,109 +0,0 @@ -use proc_macro::{TokenStream, TokenTree}; -use proc_macro2::Span; -use quote::quote; -use syn::{parse::Parser, Ident}; - -pub(crate) fn declare_output_enum(input: TokenStream) -> TokenStream { - // passed in is: `(_ _ _)` with one `_` per branch - let branches = match input.into_iter().next() { - Some(TokenTree::Group(group)) => group.stream().into_iter().count(), - _ => panic!("unexpected macro input"), - }; - - let variants = (0..branches) - .map(|num| Ident::new(&format!("_{}", num), Span::call_site())) - .collect::>(); - - // Use a bitfield to track which futures completed - let mask = Ident::new( - if branches <= 8 { - "u8" - } else if branches <= 16 { - "u16" - } else if branches <= 32 { - "u32" - } else if branches <= 64 { - "u64" - } else { - panic!("up to 64 branches supported"); - }, - Span::call_site(), - ); - - TokenStream::from(quote! { - pub(super) enum Out<#( #variants ),*> { - #( #variants(#variants), )* - // Include a `Disabled` variant signifying that all select branches - // failed to resolve. - Disabled, - } - - pub(super) type Mask = #mask; - }) -} - -pub(crate) fn clean_pattern_macro(input: TokenStream) -> TokenStream { - // If this isn't a pattern, we return the token stream as-is. The select! - // macro is using it in a location requiring a pattern, so an error will be - // emitted there. - let mut input: syn::Pat = match syn::Pat::parse_single.parse(input.clone()) { - Ok(it) => it, - Err(_) => return input, - }; - - clean_pattern(&mut input); - quote::ToTokens::into_token_stream(input).into() -} - -// Removes any occurrences of ref or mut in the provided pattern. -fn clean_pattern(pat: &mut syn::Pat) { - match pat { - syn::Pat::Lit(_literal) => {} - syn::Pat::Macro(_macro) => {} - syn::Pat::Path(_path) => {} - syn::Pat::Range(_range) => {} - syn::Pat::Rest(_rest) => {} - syn::Pat::Verbatim(_tokens) => {} - syn::Pat::Wild(_underscore) => {} - syn::Pat::Ident(ident) => { - ident.by_ref = None; - ident.mutability = None; - if let Some((_at, pat)) = &mut ident.subpat { - clean_pattern(&mut *pat); - } - } - syn::Pat::Or(or) => { - for case in &mut or.cases { - clean_pattern(case); - } - } - syn::Pat::Slice(slice) => { - for elem in &mut slice.elems { - clean_pattern(elem); - } - } - syn::Pat::Struct(struct_pat) => { - for field in &mut struct_pat.fields { - clean_pattern(&mut field.pat); - } - } - syn::Pat::Tuple(tuple) => { - for elem in &mut tuple.elems { - clean_pattern(elem); - } - } - syn::Pat::TupleStruct(tuple) => { - for elem in &mut tuple.elems { - clean_pattern(elem); - } - } - syn::Pat::Reference(reference) => { - reference.mutability = None; - clean_pattern(&mut reference.pat); - } - syn::Pat::Type(type_pat) => { - clean_pattern(&mut type_pat.pat); - } - _ => {} - } -} diff --git a/snippets/README.md b/snippets/README.md index e69de29b..7b4d978e 100644 --- a/snippets/README.md +++ b/snippets/README.md @@ -0,0 +1,5 @@ +# Snippets + +Snippets / files that were taken out in a refactor, +and that might be either removed at some point, +or reintegrated into an active codebase in one way or another. diff --git a/src/http/middleware/header_config.rs b/src/http/middleware/header_config.rs index ffb2d548..6b4a061b 100644 --- a/src/http/middleware/header_config.rs +++ b/src/http/middleware/header_config.rs @@ -92,7 +92,7 @@ mod test { use super::*; - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_header_config_happy_path() { let request = Request::builder() .method(Method::GET) @@ -117,7 +117,7 @@ mod test { service.call(request).await.unwrap(); } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_header_config_missing_header() { let request = Request::builder() .method(Method::GET) @@ -136,7 +136,7 @@ mod test { assert!(result.is_err()); } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_header_config_invalid_config() { let request = Request::builder() .method(Method::GET) diff --git a/src/http/server/executor.rs b/src/http/server/executor.rs index 07c46d97..6322709b 100644 --- a/src/http/server/executor.rs +++ b/src/http/server/executor.rs @@ -27,7 +27,7 @@ mod tests { use crate::rt::sync::oneshot; use hyper::rt::Executor; - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn simple_execute() -> Result<(), Box> { let (tx, rx) = oneshot::channel(); let executor = GlobalExecutor::new(); diff --git a/src/lib.rs b/src/lib.rs index 5108fdeb..b1329bda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,6 @@ #![feature(return_type_notation)] #![allow(incomplete_features)] -extern crate self as rama; - pub mod rt; pub mod service; diff --git a/src/rt.rs b/src/rt.rs index 3544d97c..0c8a6f0c 100644 --- a/src/rt.rs +++ b/src/rt.rs @@ -6,7 +6,7 @@ //! There is no plan to support other runtimes. //! If you want to use other runtimes, you can //! provide feedback, input and motivation at -//! . pub use rama_rt::*; diff --git a/src/service/http/mod.rs b/src/service/http/mod.rs index 683b183a..02384ec0 100644 --- a/src/service/http/mod.rs +++ b/src/service/http/mod.rs @@ -177,7 +177,7 @@ pub mod timeout { //! # Differences from [`crate::service::timeout`] //! //! [`crate::service::timeout::Timeout`] middleware uses an error to signal timeout, i.e. - //! it changes the error type to [`crate::service::BoxError`]`. For HTTP services that is rarely + //! it changes the error type to [`crate::BoxError`]`. For HTTP services that is rarely //! what you want as returning errors will terminate the connection without sending a response. //! //! This middleware won't change the error type and instead return a `408 Request Timeout` diff --git a/src/service/hyper/service.rs b/src/service/hyper/service.rs index 32c26caa..1aefa20f 100644 --- a/src/service/hyper/service.rs +++ b/src/service/hyper/service.rs @@ -69,7 +69,7 @@ mod test { s } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_into_hyper_service() { let service = tower_async::service_fn(|req: &'static str| async move { Ok::<_, Infallible>(req) }); @@ -78,7 +78,7 @@ mod test { inner_test_hyper_service(hyper_service).await; } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_into_layered_hyper_service() { let service = tower_async::ServiceBuilder::new() .timeout(std::time::Duration::from_secs(5)) diff --git a/src/stream/layer/tracker/bytes.rs b/src/stream/layer/tracker/bytes.rs index 976d8cbc..17e5562f 100644 --- a/src/stream/layer/tracker/bytes.rs +++ b/src/stream/layer/tracker/bytes.rs @@ -180,7 +180,7 @@ mod tests { use crate::rt::io::{AsyncReadExt, AsyncWriteExt}; use crate::rt::test_util::io::Builder; - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_read_tracker() { let stream = Builder::new() .read(b"foo") @@ -204,7 +204,7 @@ mod tests { assert_eq!(tracker.written(), 0); } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_written_tracker() { let stream = Builder::new() .write(b"foo") @@ -227,7 +227,7 @@ mod tests { assert_eq!(tracker.written(), 9); } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_rw_tracker() { let stream = Builder::new() .read(b"foo") @@ -263,7 +263,7 @@ mod tests { assert_eq!(tracker.written(), 9); } - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_rw_handle_tracker() { let stream = Builder::new() .read(b"foo") diff --git a/src/stream/service/echo.rs b/src/stream/service/echo.rs index 1f505057..8d3324d9 100644 --- a/src/stream/service/echo.rs +++ b/src/stream/service/echo.rs @@ -9,9 +9,9 @@ use crate::{service::Service, stream::Stream}; /// ```rust /// use rama::{service::Service, stream::service::EchoService}; /// -/// # #[rama::main] +/// # #[rama::rt::main] /// # async fn main() -> Result<(), Box> { -/// # let stream = crate::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); +/// # let stream = rama::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); /// let service = EchoService::new(); /// /// let bytes_copied = service.call(stream).await?; @@ -56,7 +56,7 @@ mod tests { use crate::rt::test_util::io::Builder; - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_echo() { let stream = Builder::new() .read(b"one") diff --git a/src/stream/service/forward.rs b/src/stream/service/forward.rs index 022f5f44..d115b9cb 100644 --- a/src/stream/service/forward.rs +++ b/src/stream/service/forward.rs @@ -10,10 +10,10 @@ use crate::{rt::sync::Mutex as AsyncMutex, service::Service, stream::Stream}; /// ```rust /// use rama::{service::Service, stream::service::ForwardService}; /// -/// # #[crate::rt::main] +/// # #[rama::rt::main] /// # async fn main() -> Result<(), Box> { -/// # let destination = crate::rt::test_util::io::Builder::new().write(b"hello world").read(b"hello world").build(); -/// # let stream = crate::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); +/// # let destination = rama::rt::test_util::io::Builder::new().write(b"hello world").read(b"hello world").build(); +/// # let stream = rama::rt::test_util::io::Builder::new().read(b"hello world").write(b"hello world").build(); /// let service = ForwardService::new(destination); /// /// let (bytes_copied_to, bytes_copied_from) = service.call(stream).await?; @@ -69,7 +69,7 @@ mod tests { use crate::rt::test_util::io::Builder; - #[crate::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_forwarder() { let destination = Builder::new() .write(b"to(1)") diff --git a/src/tcp/client/forward.rs b/src/tcp/client/forward.rs index 11124f92..8cf575e7 100644 --- a/src/tcp/client/forward.rs +++ b/src/tcp/client/forward.rs @@ -28,7 +28,7 @@ mod tests { use crate::rt::test_util::io::Builder; - #[rama::rt::test] + #[crate::rt::test(crate = "crate")] async fn test_forwarder() { let destination = Builder::new() .write(b"to(1)") diff --git a/src/tcp/server/listener.rs b/src/tcp/server/listener.rs index 2ccfdded..e527e49b 100644 --- a/src/tcp/server/listener.rs +++ b/src/tcp/server/listener.rs @@ -202,7 +202,7 @@ impl TcpListener { /// Serve gracefully connections from this listener with the given service. /// /// This method does the same as [`Self::serve`] but it - /// will respect the given [`crate::graceful::ShutdownGuard`], and also pass + /// will respect the given [`crate::rt::graceful::ShutdownGuard`], and also pass /// it to the service. pub async fn serve_graceful( self, From 1d75eacc77cd369bb783886c5bd168448b706217 Mon Sep 17 00:00:00 2001 From: glendc Date: Tue, 28 Nov 2023 22:35:41 +0100 Subject: [PATCH 3/3] update READMES of crates --- rama-rt-macros/README.md | 3 +++ rama-rt/README.md | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/rama-rt-macros/README.md b/rama-rt-macros/README.md index 87ae11e0..8adb8c25 100644 --- a/rama-rt-macros/README.md +++ b/rama-rt-macros/README.md @@ -32,6 +32,9 @@ Macros for the runtime definitions and implementation in function of Rama. +This crate and its macros are not created to be used on itself. Please make use +of these macros by using `rama::rt::*` as part of your _rama_ usage. E.g. `rama::rt::main` and `rama::rt::test`. + > rama is early work in progress, use at your own risk. > > Not everything that exists is documented and not everything that is documented is implemented. diff --git a/rama-rt/README.md b/rama-rt/README.md index 1e618726..c83debb0 100644 --- a/rama-rt/README.md +++ b/rama-rt/README.md @@ -32,13 +32,15 @@ Runtime definitions and implementation for Rama. +This crate is not created to be used on itself. Please make use +of this crate's functionality by using `rama::rt::*` as part of your _rama_ usage. + > rama is early work in progress, use at your own risk. > > Not everything that exists is documented and not everything that is documented is implemented. Rama's full documentatuon can be found at . - ## Contributing :balloon: Thanks for your help improving the project! We are so happy to have