Skip to content

Commit

Permalink
Merge branch 'dev' of https://github.com/Basicprogrammer10/afire into…
Browse files Browse the repository at this point in the history
… dev
  • Loading branch information
connorslade committed Dec 19, 2023
2 parents 19bd97e + 1cd6606 commit 65d0544
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 44 deletions.
4 changes: 4 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ Coming Soon
- Don't store whole stream to get its length in Head ext
- Added sync_route extension
- Make threadpool more robust
- Add a Stream trait to allow using different socket impls
- Allow using custom event loop
- The custom event loop with the Stream trait should allow a separate crate to add tls support to an afire server


# 2.2.1

Expand Down
2 changes: 1 addition & 1 deletion lib/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl<State: 'static + Send + Sync> Context<State> {

// TODO: NOT CALLING POST_RAW
for i in &self.server.middleware {
i.post(&self.req.clone(), &mut *self.response.force_lock());
i.post(&self.req.clone(), &mut self.response.force_lock());
}

self.response
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct RangeResponse {
impl Middleware for Range {
// Inject the Accept-Ranges header into the response.
fn post(&self, req: &Request, res: &mut Response) -> MiddleResult {
if req.method != Method::GET || req.method != Method::GET {
if req.method != Method::GET {
return MiddleResult::Continue;
}

Expand Down
11 changes: 6 additions & 5 deletions lib/internal/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ pub(crate) type Writeable = Box<RefCell<dyn Read + Send>>;
/// Handles a socket.
///
/// <https://open.spotify.com/track/50txng2W8C9SycOXKIQP0D>
pub(crate) fn handle<State>(stream: TcpStream, this: Arc<Server<State>>)
pub(crate) fn handle<State>(stream: Arc<Socket>, this: Arc<Server<State>>)
where
State: 'static + Send + Sync,
{
let mut socket = stream.force_lock();
trace!(
Level::Debug,
"Opening socket {:?}",
LazyFmt(|| stream.peer_addr())
LazyFmt(|| socket.peer_addr())
);
stream.set_read_timeout(this.socket_timeout).unwrap();
stream.set_write_timeout(this.socket_timeout).unwrap();
let stream = Arc::new(Socket::new(stream));
socket.set_timeout(this.socket_timeout).unwrap();
drop(socket);

'outer: loop {
let mut keep_alive = false;
let mut req = Request::from_socket(stream.clone());
Expand Down
5 changes: 2 additions & 3 deletions lib/proto/websocket/frame.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::{
convert::{TryFrom, TryInto},
io::{self, Write},
net::TcpStream,
};

use super::xor_mask;
use crate::trace::LazyFmt;
use crate::{socket::SocketStream, trace::LazyFmt};

/// ## Frame Layout
/// ```plain
Expand Down Expand Up @@ -126,7 +125,7 @@ impl Frame {
buf
}

pub fn write(&self, socket: &mut TcpStream) -> io::Result<()> {
pub fn write(&self, socket: &mut SocketStream) -> io::Result<()> {
let buf = self.to_bytes();
trace!(Level::Debug, "[WS] Writing: {:?}", buf);

Expand Down
6 changes: 3 additions & 3 deletions lib/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::cell::RefCell;
use std::fmt::{self, Debug, Display, Formatter};
use std::io::{ErrorKind, Read, Write};
use std::mem;
use std::net::TcpStream;

use std::sync::Arc;

use crate::consts;
use crate::header::{HeaderName, Headers};
use crate::internal::sync::ForceLockMutex;
use crate::proto::http::status::Status;
use crate::socket::Socket;
use crate::socket::{Socket, SocketStream};
use crate::{
error::Result, header::headers_to_string, internal::handle::Writeable, Content, Header,
SetCookie,
Expand Down Expand Up @@ -366,7 +366,7 @@ impl ResponseBody {

/// Writes a ResponseBody to a TcpStream.
/// Either in one go if it is static or in chunks if it is a stream.
fn write(&mut self, stream: &mut TcpStream) -> Result<()> {
fn write(&mut self, stream: &mut SocketStream) -> Result<()> {
match self {
ResponseBody::Empty => {}
ResponseBody::Static(data) => stream.write_all(data)?,
Expand Down
65 changes: 41 additions & 24 deletions lib/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
header::Headers,
internal::misc::ToHostAddress,
route::Route,
socket::Socket,
thread_pool::ThreadPool,
trace::emoji,
Content, Context, Header, HeaderName, Method, Middleware, Request, Response, Status, VERSION,
Expand All @@ -33,6 +34,8 @@ pub struct Server<State: 'static + Send + Sync = ()> {
/// Ip address to listen on.
pub ip: IpAddr,

pub event_loop: Box<dyn EventLoop<State>>,

/// Routes to handle.
pub routes: Vec<Route<State>>,

Expand Down Expand Up @@ -83,6 +86,7 @@ impl<State: Send + Sync> Server<State> {
Server {
port,
ip: raw_ip.to_address().unwrap(),
event_loop: Box::new(TcpEventLoop),
routes: Vec::new(),
middleware: Vec::new(),

Expand Down Expand Up @@ -121,7 +125,7 @@ impl<State: Send + Sync> Server<State> {
pub fn run(self) -> Result<()> {
let threads = self.thread_pool.threads();
if threads == 0 {
// self.thread_pool.resize(1); // TODO: this
self.thread_pool.resize_exact(1);
}

trace!(
Expand All @@ -144,31 +148,11 @@ impl<State: Send + Sync> Server<State> {
.into());
}

let listener = TcpListener::bind(SocketAddr::new(self.ip, self.port))?;
let addr = SocketAddr::new(self.ip, self.port);
let this = Arc::new(self);
let event_loop = TcpEventLoop;

for event in listener.incoming() {
if !this.running.load(Ordering::Relaxed) {
trace!(
Level::Debug,
"Stopping event loop. No more connections will be accepted."
);
break;
}

let this2 = this.clone();
this.thread_pool.execute(move || {
let event = match event {
Ok(event) => event,
Err(err) => {
trace!(Level::Error, "Error accepting connection: {err}");
return;
}
};

handle(event, this2)
});
}
event_loop.run(this, addr)?;

trace!("{}Server Stopped", emoji("🛑"));
Ok(())
Expand Down Expand Up @@ -371,3 +355,36 @@ impl<State: Send + Sync> Server<State> {
let _ = TcpStream::connect(addr);
}
}

pub trait EventLoop<State: Send + Sync> {
fn run(&self, server: Arc<Server<State>>, addr: SocketAddr) -> Result<()>;
}

pub struct TcpEventLoop;

impl<State: Send + Sync> EventLoop<State> for TcpEventLoop {
fn run(&self, server: Arc<Server<State>>, addr: SocketAddr) -> Result<()> {
let listener = TcpListener::bind(addr)?;
for i in listener.incoming() {
if !server.running.load(Ordering::Relaxed) {
trace!(
Level::Debug,
"Stopping event loop. No more connections will be accepted."
);
break;
}

let event = match i {
Ok(event) => event,
Err(err) => {
trace!(Level::Error, "Error accepting connection: {err}");
continue;
}
};

let event = Arc::new(Socket::new(event));
handle(event, server.clone());
}
Ok(())
}
}
42 changes: 36 additions & 6 deletions lib/socket.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
use std::{
net::TcpStream,
io::{self, Read, Write},
net::{Shutdown, SocketAddr, TcpStream},
ops::Deref,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
}, time::Duration,
};

use crate::{
internal::sync::{ForceLockRwLock, SingleBarrier},
response::ResponseFlag,
};

pub type SocketStream = Box<dyn Stream + Send + Sync>;

pub trait Stream: Read + Write {
fn peer_addr(&self) -> io::Result<SocketAddr>;
fn try_clone(&self) -> io::Result<SocketStream>;
fn shutdown(&self, shutdown: Shutdown) -> io::Result<()>;
fn set_timeout(&self, duration: Option<Duration>) -> io::Result<()>;
}

/// Socket is a wrapper around TcpStream that allows for sending a response from other threads.
pub struct Socket {
/// The internal TcpStream.
pub socket: Mutex<TcpStream>,
pub socket: Mutex<Box<dyn Stream + Send + Sync>>,
/// A unique identifier that uniquely identifies this socket.
pub id: u64,
/// A barrier that is used to wait for the response to be sent in the case of a guaranteed send.
Expand All @@ -32,10 +42,10 @@ pub struct Socket {
impl Socket {
/// Create a new `Socket` from a `TcpStream`.
/// Will also create a new unique identifier for the socket.
pub(crate) fn new(socket: TcpStream) -> Self {
pub(crate) fn new(socket: impl Stream + Send + Sync + 'static) -> Self {
static ID: AtomicU64 = AtomicU64::new(0);
Self {
socket: Mutex::new(socket),
socket: Mutex::new(Box::new(socket)),
id: ID.fetch_add(1, Ordering::Relaxed),
barrier: Arc::new(SingleBarrier::new()),
raw: AtomicBool::new(false),
Expand Down Expand Up @@ -75,8 +85,28 @@ impl Socket {
}
}

impl Stream for TcpStream {
fn peer_addr(&self) -> io::Result<SocketAddr> {
self.peer_addr()
}

fn try_clone(&self) -> io::Result<SocketStream> {
Ok(self.try_clone().map(Box::new)?)
}

fn shutdown(&self, shutdown: Shutdown) -> io::Result<()> {
self.shutdown(shutdown)
}

fn set_timeout(&self, duration: Option<Duration>) -> io::Result<()> {
self.set_read_timeout(duration)?;
self.set_write_timeout(duration)?;
Ok(())
}
}

impl Deref for Socket {
type Target = Mutex<TcpStream>;
type Target = Mutex<Box<dyn Stream + Send + Sync>>;

fn deref(&self) -> &Self::Target {
&self.socket
Expand Down
1 change: 0 additions & 1 deletion lib/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! Used for handling multiple connections at once.
use std::{
fs::DirBuilder,
panic,
sync::{
atomic::{AtomicUsize, Ordering},
Expand Down

0 comments on commit 65d0544

Please sign in to comment.