Skip to content

Commit

Permalink
refactor: use res_sender and res_receiver in only necessary scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
nwtgck committed Jan 14, 2024
1 parent e9adf3b commit 08508a8
Showing 1 changed file with 53 additions and 96 deletions.
149 changes: 53 additions & 96 deletions src/piping_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ impl PipingServer {
req: Request<Body>,
) -> impl std::future::Future<Output = Result<Response<Body>, oneshot::Canceled>> /* TODO: use better Error instead of oneshot::Canceled */
{
let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let path_to_sender = Arc::clone(&self.path_to_sender);
let path_to_receiver = Arc::clone(&self.path_to_receiver);
async move {
Expand All @@ -75,14 +74,12 @@ impl PipingServer {
if req.method() == Method::GET || req.method() == Method::HEAD {
match path {
reserved_paths::INDEX => {
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/html")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(dynamic_resources::index()))
.unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
.unwrap());
}
reserved_paths::NO_SCRIPT => {
let query_params = query_param_to_hash_map(req.uri().query());
Expand All @@ -92,7 +89,7 @@ impl PipingServer {
base64::engine::general_purpose::STANDARD.encode(nonce_bytes)
};
let html = dynamic_resources::no_script_html(&query_params, &style_nonce);
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/html")
.header("Access-Control-Allow-Origin", "*")
Expand All @@ -101,20 +98,16 @@ impl PipingServer {
format!("default-src 'none'; style-src 'nonce-{style_nonce}'"),
)
.body(Body::from(html))
.unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
.unwrap());
}
reserved_paths::VERSION => {
let version: &'static str = env!("CARGO_PKG_VERSION");
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(format!("{version} (Rust)\n")))
.unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
.unwrap());
}
reserved_paths::HELP => {
let host: &str = req
Expand All @@ -136,29 +129,23 @@ impl PipingServer {
let base_url = Url::parse(format!("{schema}://{host}").as_str())
.unwrap_or_else(|_| "http://hostname/".parse().unwrap());
let help = dynamic_resources::help(&base_url);
let res = Response::builder()
return Ok(Response::builder()
.status(200)
.header("Content-Type", "text/plain")
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(help))
.unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
.unwrap());
}
reserved_paths::FAVICON_ICO => {
let res = Response::builder().status(204).body(Body::empty()).unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
return Ok(Response::builder().status(204).body(Body::empty()).unwrap());
}
reserved_paths::ROBOTS_TXT => {
let res = Response::builder()
return Ok(Response::builder()
.status(404)
// explicit `content-length: 0`: https://github.com/hyperium/hyper/pull/2836
.header("Content-Length", 0)
.body(Body::empty())
.unwrap();
res_sender.send(res).unwrap();
return res_receiver.await;
.unwrap());
}
_ => {}
}
Expand All @@ -169,51 +156,37 @@ impl PipingServer {
if let Some(value) = req.headers().get("service-worker") {
if value == http::HeaderValue::from_static("script") {
// Reject Service Worker registration
res_sender
.send(rejection_response(Body::from(
"[ERROR] Service Worker registration is rejected.\n",
)))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(
"[ERROR] Service Worker registration is rejected.\n",
)));
}
}
let query_params = query_param_to_hash_map(req.uri().query());
let n_receivers_result: Result<u32, _> = get_n_receivers_result(&query_params);
if let Err(_) = n_receivers_result {
res_sender
.send(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)));
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))));
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)));
}
let receiver_connected: bool = path_to_receiver.contains_key(path);
// If a receiver has been connected already
if receiver_connected {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Another receiver has been connected on '{path}'.\n",
))))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Another receiver has been connected on '{path}'.\n",
))));
}
let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let sender = path_to_sender.remove(path);
match sender {
// If sender is found
Expand All @@ -231,65 +204,51 @@ impl PipingServer {
None => {
path_to_receiver.insert(path.to_string(), DataReceiver { res_sender });
}
}
};
res_receiver.await
}
&Method::POST | &Method::PUT => {
if reserved_paths::VALUES.contains(&path) {
// Reject reserved path sending
res_sender.send(rejection_response(Body::from(format!("[ERROR] Cannot send to the reserved path '{path}'. (e.g. '/mypath123')\n")))).unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!("[ERROR] Cannot send to the reserved path '{path}'. (e.g. '/mypath123')\n"))));
}
// Notify that Content-Range is not supported
// In the future, resumable upload using Content-Range might be supported
// ref: https://github.com/httpwg/http-core/pull/653
if req.headers().contains_key("content-range") {
// Reject reserved path sending
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Content-Range is not supported for now in {}\n",
req.method()
))))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Content-Range is not supported for now in {}\n",
req.method()
))));
}
let query_params = query_param_to_hash_map(req.uri().query());
let n_receivers_result: Result<u32, _> = get_n_receivers_result(&query_params);
if let Err(_) = n_receivers_result {
res_sender
.send(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(
"[ERROR] Invalid \"n\" query parameter\n",
)));
}
let n_receivers = n_receivers_result.unwrap();
if n_receivers <= 0 {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!(
"[ERROR] n should > 0, but n = {n_receivers}.\n"
))));
}
if n_receivers > 1 {
res_sender
.send(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(
"[ERROR] n > 1 not supported yet.\n",
)));
}
let sender_connected: bool = path_to_sender.contains_key(path);
// If a sender has been connected already
if sender_connected {
res_sender
.send(rejection_response(Body::from(format!(
"[ERROR] Another sender has been connected on '{path}'.\n",
))))
.unwrap();
return res_receiver.await;
return Ok(rejection_response(Body::from(format!(
"[ERROR] Another sender has been connected on '{path}'.\n",
))));
}

let (res_sender, res_receiver) = oneshot::channel::<Response<Body>>();
let (mut res_body_sender, body) = Body::channel();
let sender_res = Response::builder()
.header("Content-Type", "text/plain")
Expand Down Expand Up @@ -334,10 +293,11 @@ impl PipingServer {
);
}
}
res_receiver.await
}
&Method::OPTIONS => {
// Response for Preflight request
let res = Response::builder()
Ok(Response::builder()
.status(200)
.header("Access-Control-Allow-Origin", "*")
.header(
Expand Down Expand Up @@ -367,23 +327,20 @@ impl PipingServer {
.header("Access-Control-Max-Age", 86400)
.header("Content-Length", 0)
.body(Body::empty())
.unwrap();
res_sender.send(res).unwrap();
.unwrap())
}
_ => {
log::info!("Unsupported method: {}", req.method());
let res = Response::builder()
Ok(Response::builder()
.status(405)
.header("Access-Control-Allow-Origin", "*")
.body(Body::from(format!(
"[ERROR] Unsupported method: {}.\n",
req.method()
)))
.unwrap();
res_sender.send(res).unwrap();
.unwrap())
}
};
return res_receiver.await;
}
}
}
}
Expand Down

0 comments on commit 08508a8

Please sign in to comment.