Skip to content

Commit

Permalink
Merge branch 'feature/remove-req-res-handler' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
nwtgck committed Jan 16, 2024
2 parents ebcee50 + aee492a commit 2cf1c3b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 134 deletions.
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod dynamic_resources;
mod macros;
pub mod piping_server;
pub mod req_res_handler;
pub mod util;
14 changes: 6 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;

use piping_server::piping_server::PipingServer;
use piping_server::req_res_handler::req_res_handler;
use piping_server::util;

/// Piping Server in Rust
Expand Down Expand Up @@ -86,10 +85,9 @@ async fn main() -> std::io::Result<()> {
});
let https_svc = make_service_fn(move |_| {
let piping_server = piping_server.clone();
let handler = req_res_handler(move |req, res_sender| {
piping_server.handler(true, req, res_sender)
});
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(true, req)
}))
});
let https_server = Server::builder(util::HyperAcceptor {
acceptor: incoming_tls_stream,
Expand All @@ -107,9 +105,9 @@ async fn main() -> std::io::Result<()> {

let http_svc = make_service_fn(|_| {
let piping_server = piping_server.clone();
let handler =
req_res_handler(move |req, res_sender| piping_server.handler(false, req, res_sender));
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(false, req)
}))
});
let http_server = Server::bind(&(args.host, args.http_port).into()).serve(http_svc);

Expand Down
149 changes: 54 additions & 95 deletions src/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl PipingServer {
&self,
uses_https: bool,
req: Request<Body>,
res_sender: oneshot::Sender<Response<Body>>,
) -> impl std::future::Future<Output = ()> {
) -> impl std::future::Future<Output = Result<Response<Body>, oneshot::Canceled>> + Send /* TODO: use better Error instead of oneshot::Canceled */
{
let path_to_sender = Arc::clone(&self.path_to_sender);
let path_to_receiver = Arc::clone(&self.path_to_receiver);
async move {
Expand All @@ -74,14 +74,12 @@ impl PipingServer {
if req.method() == Method::GET || req.method() == Method::HEAD {
match path {
reserved_paths::INDEX => {
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/html")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(dynamic_resources::index()))
.unwrap();
res_sender.send(res).unwrap();
return;
.unwrap());
}
reserved_paths::NO_SCRIPT => {
let query_params = query_param_to_hash_map(req.uri().query());
Expand All @@ -91,7 +89,7 @@ impl PipingServer {
base64::engine::general_purpose::STANDARD.encode(nonce_bytes)
};
let html = dynamic_resources::no_script_html(&query_params, &style_nonce);
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/html")
.header("Access-Control-Allow-Origin", "*")
Expand All @@ -100,20 +98,16 @@ impl PipingServer {
format!("default-src 'none'; style-src 'nonce-{style_nonce}'"),
)
.body(Body::from(html))
.unwrap();
res_sender.send(res).unwrap();
return;
.unwrap());
}
reserved_paths::VERSION => {
let version: &'static str = env!("CARGO_PKG_VERSION");
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(format!("{version} (Rust)\n")))
.unwrap();
res_sender.send(res).unwrap();
return;
.unwrap());
}
reserved_paths::HELP => {
let host: &str = req
Expand All @@ -135,29 +129,23 @@ impl PipingServer {
let base_url = Url::parse(format!("{schema}://{host}").as_str())
.unwrap_or_else(|_| "http://hostname/".parse().unwrap());
let help = dynamic_resources::help(&base_url);
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(help))
.unwrap();
res_sender.send(res).unwrap();
return;
.unwrap());
}
reserved_paths::FAVICON_ICO => {
let res = Response::builder().status(204).body(Body::empty()).unwrap();
res_sender.send(res).unwrap();
return;
return Ok(Response::builder().status(204).body(Body::empty()).unwrap());
}
reserved_paths::ROBOTS_TXT => {
let res = Response::builder()
return Ok(Response::builder()
.status(404)
// explicit `content-length: 0`: https://github.com/hyperium/hyper/pull/2836
.header("Content-Length", 0)
.body(Body::empty())
.unwrap();
res_sender.send(res).unwrap();
return;
.unwrap());
}
_ => {}
}
Expand All @@ -168,51 +156,37 @@ impl PipingServer {
if let Some(value) = req.headers().get("service-worker") {
if value == http::HeaderValue::from_static("script") {
// Reject Service Worker registration
res_sender
.send(rejection_response(Body::from(
"[ERROR] Service Worker registration is rejected.\n",
)))
.unwrap();
return;
return Ok(rejection_response(Body::from(
"[ERROR] Service Worker registration is rejected.\n",
)));
}
}
let query_params = query_param_to_hash_map(req.uri().query());
let n_receivers_result: Result<u32, _> = get_n_receivers_result(&query_params);
if let Err(_) = n_receivers_result {
res_sender
.send(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return;
return Ok(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)));
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return;
return Ok(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))));
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return;
return Ok(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)));
}
let receiver_connected: bool = path_to_receiver.contains_key(path);
// If a receiver has been connected already
if receiver_connected {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Another receiver has been connected on '{path}'.\n",
))))
.unwrap();
return;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Another receiver has been connected on '{path}'.\n",
))));
}
let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let sender = path_to_sender.remove(path);
match sender {
// If sender is found
Expand All @@ -230,65 +204,51 @@ impl PipingServer {
None => {
path_to_receiver.insert(path.to_string(), DataReceiver { res_sender });
}
}
};
res_receiver.await
}
&Method::POST | &Method::PUT => {
if reserved_paths::VALUES.contains(&path) {
// Reject reserved path sending
res_sender.send(rejection_response(Body::from(format!("[ERROR] Cannot send to the reserved path '{path}'. (e.g. '/mypath123')\n")))).unwrap();
return;
return Ok(rejection_response(Body::from(format!("[ERROR] Cannot send to the reserved path '{path}'. (e.g. '/mypath123')\n"))));
}
// Notify that Content-Range is not supported
// In the future, resumable upload using Content-Range might be supported
// ref: https://github.com/httpwg/http-core/pull/653
if req.headers().contains_key("content-range") {
// Reject reserved path sending
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Content-Range is not supported for now in {}\n",
req.method()
))))
.unwrap();
return;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Content-Range is not supported for now in {}\n",
req.method()
))));
}
let query_params = query_param_to_hash_map(req.uri().query());
let n_receivers_result: Result<u32, _> = get_n_receivers_result(&query_params);
if let Err(_) = n_receivers_result {
res_sender
.send(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return;
return Ok(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)));
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return;
return Ok(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))));
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return;
return Ok(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)));
}
let sender_connected: bool = path_to_sender.contains_key(path);
// If a sender has been connected already
if sender_connected {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Another sender has been connected on '{path}'.\n",
))))
.unwrap();
return;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Another sender has been connected on '{path}'.\n",
))));
}

let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let (mut res_body_sender, body) = Body::channel();
let sender_res = Response::builder()
.header("Content-Type", "text/plain")
Expand Down Expand Up @@ -333,10 +293,11 @@ impl PipingServer {
);
}
}
res_receiver.await
}
&Method::OPTIONS => {
// Response for Preflight request
let res = Response::builder()
Ok(Response::builder()
.status(200)
.header("Access-Control-Allow-Origin", "*")
.header(
Expand Down Expand Up @@ -366,20 +327,18 @@ impl PipingServer {
.header("Access-Control-Max-Age", 86400)
.header("Content-Length", 0)
.body(Body::empty())
.unwrap();
res_sender.send(res).unwrap();
.unwrap())
}
_ => {
log::info!("Unsupported method: {}", req.method());
let res = Response::builder()
Ok(Response::builder()
.status(405)
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(format!(
"[ERROR] Unsupported method: {}.\n",
req.method()
)))
.unwrap();
res_sender.send(res).unwrap();
.unwrap())
}
}
}
Expand Down
25 changes: 0 additions & 25 deletions src/req_res_handler.rs

This file was deleted.

8 changes: 3 additions & 5 deletions tests/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use specit::tokio_it as it;
use std::convert::Infallible;

use piping_server::piping_server::PipingServer;
use piping_server::req_res_handler::req_res_handler;
use std::net::SocketAddr;
use std::time;

Expand Down Expand Up @@ -52,10 +51,9 @@ async fn serve() -> Serve {
tokio::spawn(async move {
let http_svc = make_service_fn(|_| {
let piping_server = piping_server.clone();
let handler = req_res_handler(move |req, res_sender| {
piping_server.handler(false, req, res_sender)
});
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(false, req)
}))
});
let http_server = Server::bind(&([127, 0, 0, 1], 0).into()).serve(http_svc);
addr_tx
Expand Down

0 comments on commit 2cf1c3b

Please sign in to comment.