From 25f71ca51867778d13a703daecf9ba3128d3cde3 Mon Sep 17 00:00:00 2001 From: Connor Slade Date: Tue, 31 Oct 2023 08:14:51 -0400 Subject: [PATCH 1/3] Dynamic dispatch on socket type --- lib/extensions/range.rs | 2 +- lib/proto/websocket/frame.rs | 4 ++-- lib/response.rs | 4 ++-- lib/socket.rs | 33 ++++++++++++++++++++++++++++----- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/lib/extensions/range.rs b/lib/extensions/range.rs index 269becd..6337289 100644 --- a/lib/extensions/range.rs +++ b/lib/extensions/range.rs @@ -43,7 +43,7 @@ struct RangeResponse { impl Middleware for Range { // Inject the Accept-Ranges header into the response. fn post(&self, req: &Request, res: &mut Response) -> MiddleResult { - if req.method != Method::GET || req.method != Method::GET { + if req.method != Method::GET { return MiddleResult::Continue; } diff --git a/lib/proto/websocket/frame.rs b/lib/proto/websocket/frame.rs index 8938795..d695454 100644 --- a/lib/proto/websocket/frame.rs +++ b/lib/proto/websocket/frame.rs @@ -5,7 +5,7 @@ use std::{ }; use super::xor_mask; -use crate::trace::LazyFmt; +use crate::{trace::LazyFmt, socket::SocketStream}; /// ## Frame Layout /// ```plain @@ -126,7 +126,7 @@ impl Frame { buf } - pub fn write(&self, socket: &mut TcpStream) -> io::Result<()> { + pub fn write(&self, socket: &mut SocketStream) -> io::Result<()> { let buf = self.to_bytes(); trace!(Level::Debug, "[WS] Writing: {:?}", buf); diff --git a/lib/response.rs b/lib/response.rs index fe862ae..399052b 100644 --- a/lib/response.rs +++ b/lib/response.rs @@ -9,7 +9,7 @@ use crate::consts; use crate::header::{HeaderName, Headers}; use crate::internal::sync::ForceLockMutex; use crate::proto::http::status::Status; -use crate::socket::Socket; +use crate::socket::{Socket, Stream, SocketStream}; use crate::{ error::Result, header::headers_to_string, internal::handle::Writeable, Content, Header, SetCookie, @@ -366,7 +366,7 @@ impl ResponseBody { /// Writes a ResponseBody to a TcpStream. /// Either in one go if it is static or in chunks if it is a stream. - fn write(&mut self, stream: &mut TcpStream) -> Result<()> { + fn write(&mut self, stream: &mut SocketStream) -> Result<()> { match self { ResponseBody::Empty => {} ResponseBody::Static(data) => stream.write_all(data)?, diff --git a/lib/socket.rs b/lib/socket.rs index 9e85542..e65940f 100644 --- a/lib/socket.rs +++ b/lib/socket.rs @@ -1,5 +1,6 @@ use std::{ - net::TcpStream, + io::{self, Read, Write}, + net::{IpAddr, Shutdown, SocketAddr, TcpStream}, ops::Deref, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -12,10 +13,32 @@ use crate::{ response::ResponseFlag, }; +pub type SocketStream = Box; + +pub trait Stream: Read + Write { + fn peer_addr(&self) -> io::Result; + fn try_clone(&self) -> io::Result; + fn shutdown(&self, shutdown: Shutdown) -> io::Result<()>; +} + +impl Stream for TcpStream { + fn peer_addr(&self) -> io::Result { + self.peer_addr() + } + + fn try_clone(&self) -> io::Result { + Ok(self.try_clone().map(Box::new)?) + } + + fn shutdown(&self, shutdown: Shutdown) -> io::Result<()> { + self.shutdown(shutdown) + } +} + /// Socket is a wrapper around TcpStream that allows for sending a response from other threads. pub struct Socket { /// The internal TcpStream. - pub socket: Mutex, + pub socket: Mutex>, /// A unique identifier that uniquely identifies this socket. pub id: u64, /// A barrier that is used to wait for the response to be sent in the case of a guaranteed send. @@ -32,10 +55,10 @@ pub struct Socket { impl Socket { /// Create a new `Socket` from a `TcpStream`. /// Will also create a new unique identifier for the socket. - pub(crate) fn new(socket: TcpStream) -> Self { + pub(crate) fn new(socket: impl Stream + Send + Sync + 'static) -> Self { static ID: AtomicU64 = AtomicU64::new(0); Self { - socket: Mutex::new(socket), + socket: Mutex::new(Box::new(socket)), id: ID.fetch_add(1, Ordering::Relaxed), barrier: Arc::new(SingleBarrier::new()), raw: AtomicBool::new(false), @@ -76,7 +99,7 @@ impl Socket { } impl Deref for Socket { - type Target = Mutex; + type Target = Mutex>; fn deref(&self) -> &Self::Target { &self.socket From 794189eb396b4727ba0e72961fdafe5aecfcd151 Mon Sep 17 00:00:00 2001 From: Connor Slade Date: Tue, 31 Oct 2023 14:00:34 -0400 Subject: [PATCH 2/3] Start on custom event loop --- lib/context.rs | 2 +- lib/extensions/range.rs | 2 +- lib/internal/handle.rs | 11 +++--- lib/proto/websocket/frame.rs | 3 +- lib/response.rs | 4 +- lib/server.rs | 71 ++++++++++++++++++++++++------------ lib/socket.rs | 39 ++++++++++++-------- lib/thread_pool.rs | 1 - 8 files changed, 81 insertions(+), 52 deletions(-) diff --git a/lib/context.rs b/lib/context.rs index 32b93e0..5fc5a10 100644 --- a/lib/context.rs +++ b/lib/context.rs @@ -147,7 +147,7 @@ impl Context { // TODO: NOT CALLING POST_RAW for i in &self.server.middleware { - i.post(&self.req.clone(), &mut *self.response.force_lock()); + i.post(&self.req.clone(), &mut self.response.force_lock()); } self.response diff --git a/lib/extensions/range.rs b/lib/extensions/range.rs index 6337289..b186c05 100644 --- a/lib/extensions/range.rs +++ b/lib/extensions/range.rs @@ -246,7 +246,7 @@ fn singlepart_response(res: &RangeResponse) -> Response { )) } -fn multipart_response(res: &RangeResponse) -> Response { +fn multipart_response(_res: &RangeResponse) -> Response { todo!() } diff --git a/lib/internal/handle.rs b/lib/internal/handle.rs index 01c3799..b4eea2f 100644 --- a/lib/internal/handle.rs +++ b/lib/internal/handle.rs @@ -23,18 +23,19 @@ pub(crate) type Writeable = Box>; /// Handles a socket. /// /// -pub(crate) fn handle(stream: TcpStream, this: Arc>) +pub(crate) fn handle(stream: Arc, this: Arc>) where State: 'static + Send + Sync, { + let mut socket = stream.force_lock(); trace!( Level::Debug, "Opening socket {:?}", - LazyFmt(|| stream.peer_addr()) + LazyFmt(|| socket.peer_addr()) ); - stream.set_read_timeout(this.socket_timeout).unwrap(); - stream.set_write_timeout(this.socket_timeout).unwrap(); - let stream = Arc::new(Socket::new(stream)); + socket.set_timeout(this.socket_timeout).unwrap(); + drop(socket); + 'outer: loop { let mut keep_alive = false; let mut req = Request::from_socket(stream.clone()); diff --git a/lib/proto/websocket/frame.rs b/lib/proto/websocket/frame.rs index d695454..07a193e 100644 --- a/lib/proto/websocket/frame.rs +++ b/lib/proto/websocket/frame.rs @@ -1,11 +1,10 @@ use std::{ convert::{TryFrom, TryInto}, io::{self, Write}, - net::TcpStream, }; use super::xor_mask; -use crate::{trace::LazyFmt, socket::SocketStream}; +use crate::{socket::SocketStream, trace::LazyFmt}; /// ## Frame Layout /// ```plain diff --git a/lib/response.rs b/lib/response.rs index 399052b..7612a83 100644 --- a/lib/response.rs +++ b/lib/response.rs @@ -2,14 +2,14 @@ use std::cell::RefCell; use std::fmt::{self, Debug, Display, Formatter}; use std::io::{ErrorKind, Read, Write}; use std::mem; -use std::net::TcpStream; + use std::sync::Arc; use crate::consts; use crate::header::{HeaderName, Headers}; use crate::internal::sync::ForceLockMutex; use crate::proto::http::status::Status; -use crate::socket::{Socket, Stream, SocketStream}; +use crate::socket::{Socket, SocketStream}; use crate::{ error::Result, header::headers_to_string, internal::handle::Writeable, Content, Header, SetCookie, diff --git a/lib/server.rs b/lib/server.rs index 4114bee..d31fcd4 100644 --- a/lib/server.rs +++ b/lib/server.rs @@ -16,6 +16,7 @@ use crate::{ header::Headers, internal::misc::ToHostAddress, route::Route, + socket::Socket, thread_pool::ThreadPool, trace::emoji, Content, Context, Header, HeaderName, Method, Middleware, Request, Response, Status, VERSION, @@ -33,6 +34,8 @@ pub struct Server { /// Ip address to listen on. pub ip: IpAddr, + pub event_loop: Box, + /// Routes to handle. pub routes: Vec>, @@ -83,6 +86,7 @@ impl Server { Server { port, ip: raw_ip.to_address().unwrap(), + event_loop: TcpEventLoop, routes: Vec::new(), middleware: Vec::new(), @@ -121,7 +125,7 @@ impl Server { pub fn run(self) -> Result<()> { let threads = self.thread_pool.threads(); if threads == 0 { - // self.thread_pool.resize(1); // TODO: this + self.thread_pool.resize_exact(1); } trace!( @@ -144,31 +148,11 @@ impl Server { .into()); } - let listener = TcpListener::bind(SocketAddr::new(self.ip, self.port))?; + let addr = SocketAddr::new(self.ip, self.port); let this = Arc::new(self); + let event_loop = TcpEventLoop; - for event in listener.incoming() { - if !this.running.load(Ordering::Relaxed) { - trace!( - Level::Debug, - "Stopping event loop. No more connections will be accepted." - ); - break; - } - - let this2 = this.clone(); - this.thread_pool.execute(move || { - let event = match event { - Ok(event) => event, - Err(err) => { - trace!(Level::Error, "Error accepting connection: {err}"); - return; - } - }; - - handle(event, this2) - }); - } + event_loop.run(this, addr, |this, event| handle(event, this))?; trace!("{}Server Stopped", emoji("🛑")); Ok(()) @@ -371,3 +355,42 @@ impl Server { let _ = TcpStream::connect(addr); } } + +pub trait EventLoop { + fn run( + &self, + addr: SocketAddr, + ) -> Result<()>; +} + +pub struct TcpEventLoop; + +impl EventLoop for TcpEventLoop { + fn run( + &self, + addr: SocketAddr, + ) -> Result<()> { + let listener = TcpListener::bind(addr)?; + for i in listener.incoming() { + // if !this.running.load(Ordering::Relaxed) { + // trace!( + // Level::Debug, + // "Stopping event loop. No more connections will be accepted." + // ); + // break; + // } + + let event = match i { + Ok(event) => event, + Err(err) => { + trace!(Level::Error, "Error accepting connection: {err}"); + continue; + } + }; + + let event = Arc::new(Socket::new(event)); + handle(this, event); + } + Ok(()) + } +} diff --git a/lib/socket.rs b/lib/socket.rs index e65940f..f1cd2be 100644 --- a/lib/socket.rs +++ b/lib/socket.rs @@ -1,11 +1,11 @@ use std::{ io::{self, Read, Write}, - net::{IpAddr, Shutdown, SocketAddr, TcpStream}, + net::{Shutdown, SocketAddr, TcpStream}, ops::Deref, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Mutex, RwLock, - }, + }, time::Duration, }; use crate::{ @@ -19,20 +19,7 @@ pub trait Stream: Read + Write { fn peer_addr(&self) -> io::Result; fn try_clone(&self) -> io::Result; fn shutdown(&self, shutdown: Shutdown) -> io::Result<()>; -} - -impl Stream for TcpStream { - fn peer_addr(&self) -> io::Result { - self.peer_addr() - } - - fn try_clone(&self) -> io::Result { - Ok(self.try_clone().map(Box::new)?) - } - - fn shutdown(&self, shutdown: Shutdown) -> io::Result<()> { - self.shutdown(shutdown) - } + fn set_timeout(&self, duration: Option) -> io::Result<()>; } /// Socket is a wrapper around TcpStream that allows for sending a response from other threads. @@ -98,6 +85,26 @@ impl Socket { } } +impl Stream for TcpStream { + fn peer_addr(&self) -> io::Result { + self.peer_addr() + } + + fn try_clone(&self) -> io::Result { + Ok(self.try_clone().map(Box::new)?) + } + + fn shutdown(&self, shutdown: Shutdown) -> io::Result<()> { + self.shutdown(shutdown) + } + + fn set_timeout(&self, duration: Option) -> io::Result<()> { + self.set_read_timeout(duration)?; + self.set_write_timeout(duration)?; + Ok(()) + } +} + impl Deref for Socket { type Target = Mutex>; diff --git a/lib/thread_pool.rs b/lib/thread_pool.rs index 377ddca..f521872 100644 --- a/lib/thread_pool.rs +++ b/lib/thread_pool.rs @@ -2,7 +2,6 @@ //! Used for handling multiple connections at once. use std::{ - fs::DirBuilder, panic, sync::{ atomic::{AtomicUsize, Ordering}, From 1cd6606aff3dd9458da46f112348749feda60bfc Mon Sep 17 00:00:00 2001 From: Connor Slade Date: Tue, 19 Dec 2023 11:48:41 -0500 Subject: [PATCH 3/3] Working event loop trait --- Changelog.md | 4 ++++ lib/server.rs | 36 +++++++++++++++--------------------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/Changelog.md b/Changelog.md index aa2b248..45632fb 100644 --- a/Changelog.md +++ b/Changelog.md @@ -67,6 +67,10 @@ Coming Soon - Don't store whole stream to get its length in Head ext - Added sync_route extension - Make threadpool more robust +- Add a Stream trait to allow using different socket impls +- Allow using custom event loop + - The custom event loop with the Stream trait should allow a separate crate to add tls support to an afire server + # 2.2.1 diff --git a/lib/server.rs b/lib/server.rs index d31fcd4..dc186e7 100644 --- a/lib/server.rs +++ b/lib/server.rs @@ -34,7 +34,7 @@ pub struct Server { /// Ip address to listen on. pub ip: IpAddr, - pub event_loop: Box, + pub event_loop: Box>, /// Routes to handle. pub routes: Vec>, @@ -86,7 +86,7 @@ impl Server { Server { port, ip: raw_ip.to_address().unwrap(), - event_loop: TcpEventLoop, + event_loop: Box::new(TcpEventLoop), routes: Vec::new(), middleware: Vec::new(), @@ -152,7 +152,7 @@ impl Server { let this = Arc::new(self); let event_loop = TcpEventLoop; - event_loop.run(this, addr, |this, event| handle(event, this))?; + event_loop.run(this, addr)?; trace!("{}Server Stopped", emoji("🛑")); Ok(()) @@ -356,29 +356,23 @@ impl Server { } } -pub trait EventLoop { - fn run( - &self, - addr: SocketAddr, - ) -> Result<()>; +pub trait EventLoop { + fn run(&self, server: Arc>, addr: SocketAddr) -> Result<()>; } pub struct TcpEventLoop; -impl EventLoop for TcpEventLoop { - fn run( - &self, - addr: SocketAddr, - ) -> Result<()> { +impl EventLoop for TcpEventLoop { + fn run(&self, server: Arc>, addr: SocketAddr) -> Result<()> { let listener = TcpListener::bind(addr)?; for i in listener.incoming() { - // if !this.running.load(Ordering::Relaxed) { - // trace!( - // Level::Debug, - // "Stopping event loop. No more connections will be accepted." - // ); - // break; - // } + if !server.running.load(Ordering::Relaxed) { + trace!( + Level::Debug, + "Stopping event loop. No more connections will be accepted." + ); + break; + } let event = match i { Ok(event) => event, @@ -389,7 +383,7 @@ impl EventLoop for TcpEventLoop { }; let event = Arc::new(Socket::new(event)); - handle(this, event); + handle(event, server.clone()); } Ok(()) }