Skip to content

Commit

Permalink
refactor: remove req_res_handler()
Browse files Browse the repository at this point in the history
  • Loading branch information
nwtgck committed Jan 14, 2024
1 parent ebcee50 commit e9adf3b
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 59 deletions.
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod dynamic_resources;
mod macros;
pub mod piping_server;
pub mod req_res_handler;
pub mod util;
14 changes: 6 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;

use piping_server::piping_server::PipingServer;
use piping_server::req_res_handler::req_res_handler;
use piping_server::util;

/// Piping Server in Rust
Expand Down Expand Up @@ -86,10 +85,9 @@ async fn main() -> std::io::Result<()> {
});
let https_svc = make_service_fn(move |_| {
let piping_server = piping_server.clone();
let handler = req_res_handler(move |req, res_sender| {
piping_server.handler(true, req, res_sender)
});
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(true, req)
}))
});
let https_server = Server::builder(util::HyperAcceptor {
acceptor: incoming_tls_stream,
Expand All @@ -107,9 +105,9 @@ async fn main() -> std::io::Result<()> {

let http_svc = make_service_fn(|_| {
let piping_server = piping_server.clone();
let handler =
req_res_handler(move |req, res_sender| piping_server.handler(false, req, res_sender));
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(false, req)
}))
});
let http_server = Server::bind(&(args.host, args.http_port).into()).serve(http_svc);

Expand Down
42 changes: 22 additions & 20 deletions src/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ impl PipingServer {
&self,
uses_https: bool,
req: Request<Body>,
res_sender: oneshot::Sender<Response<Body>>,
) -> impl std::future::Future<Output = ()> {
) -> impl std::future::Future<Output = Result<Response<Body>, oneshot::Canceled>> /* TODO: use better Error instead of oneshot::Canceled */
{
let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let path_to_sender = Arc::clone(&self.path_to_sender);
let path_to_receiver = Arc::clone(&self.path_to_receiver);
async move {
Expand All @@ -81,7 +82,7 @@ impl PipingServer {
.body(Body::from(dynamic_resources::index()))
.unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
reserved_paths::NO_SCRIPT => {
let query_params = query_param_to_hash_map(req.uri().query());
Expand All @@ -102,7 +103,7 @@ impl PipingServer {
.body(Body::from(html))
.unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
reserved_paths::VERSION => {
let version: &'static str = env!("CARGO_PKG_VERSION");
Expand All @@ -113,7 +114,7 @@ impl PipingServer {
.body(Body::from(format!("{version} (Rust)\n")))
.unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
reserved_paths::HELP => {
let host: &str = req
Expand Down Expand Up @@ -142,12 +143,12 @@ impl PipingServer {
.body(Body::from(help))
.unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
reserved_paths::FAVICON_ICO => {
let res = Response::builder().status(204).body(Body::empty()).unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
reserved_paths::ROBOTS_TXT => {
let res = Response::builder()
Expand All @@ -157,7 +158,7 @@ impl PipingServer {
.body(Body::empty())
.unwrap();
res_sender.send(res).unwrap();
return;
return res_receiver.await;
}
_ => {}
}
Expand All @@ -173,7 +174,7 @@ impl PipingServer {
"[ERROR] Service Worker registration is rejected.\n",
)))
.unwrap();
return;
return res_receiver.await;
}
}
let query_params = query_param_to_hash_map(req.uri().query());
Expand All @@ -184,7 +185,7 @@ impl PipingServer {
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return;
return res_receiver.await;
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
Expand All @@ -193,15 +194,15 @@ impl PipingServer {
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return;
return res_receiver.await;
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return;
return res_receiver.await;
}
let receiver_connected: bool = path_to_receiver.contains_key(path);
// If a receiver has been connected already
Expand All @@ -211,7 +212,7 @@ impl PipingServer {
"[ERROR] Another receiver has been connected on '{path}'.\n",
))))
.unwrap();
return;
return res_receiver.await;
}
let sender = path_to_sender.remove(path);
match sender {
Expand All @@ -236,7 +237,7 @@ impl PipingServer {
if reserved_paths::VALUES.contains(&path) {
// Reject reserved path sending
res_sender.send(rejection_response(Body::from(format!("[ERROR] Cannot send to the reserved path '{path}'. (e.g. '/mypath123')\n")))).unwrap();
return;
return res_receiver.await;
}
// Notify that Content-Range is not supported
// In the future, resumable upload using Content-Range might be supported
Expand All @@ -249,7 +250,7 @@ impl PipingServer {
req.method()
))))
.unwrap();
return;
return res_receiver.await;
}
let query_params = query_param_to_hash_map(req.uri().query());
let n_receivers_result: Result<u32, _> = get_n_receivers_result(&query_params);
Expand All @@ -259,7 +260,7 @@ impl PipingServer {
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return;
return res_receiver.await;
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
Expand All @@ -268,15 +269,15 @@ impl PipingServer {
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return;
return res_receiver.await;
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return;
return res_receiver.await;
}
let sender_connected: bool = path_to_sender.contains_key(path);
// If a sender has been connected already
Expand All @@ -286,7 +287,7 @@ impl PipingServer {
"[ERROR] Another sender has been connected on '{path}'.\n",
))))
.unwrap();
return;
return res_receiver.await;
}

let (mut res_body_sender, body) = Body::channel();
Expand Down Expand Up @@ -381,7 +382,8 @@ impl PipingServer {
.unwrap();
res_sender.send(res).unwrap();
}
}
};
return res_receiver.await;
}
}
}
Expand Down
25 changes: 0 additions & 25 deletions src/req_res_handler.rs

This file was deleted.

8 changes: 3 additions & 5 deletions tests/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use specit::tokio_it as it;
use std::convert::Infallible;

use piping_server::piping_server::PipingServer;
use piping_server::req_res_handler::req_res_handler;
use std::net::SocketAddr;
use std::time;

Expand Down Expand Up @@ -52,10 +51,9 @@ async fn serve() -> Serve {
tokio::spawn(async move {
let http_svc = make_service_fn(|_| {
let piping_server = piping_server.clone();
let handler = req_res_handler(move |req, res_sender| {
piping_server.handler(false, req, res_sender)
});
futures::future::ok::<_, Infallible>(service_fn(handler))
futures::future::ok::<_, Infallible>(service_fn(move |req| {
piping_server.handler(false, req)
}))
});
let http_server = Server::bind(&([127, 0, 0, 1], 0).into()).serve(http_svc);
addr_tx
Expand Down

0 comments on commit e9adf3b

Please sign in to comment.