diff --git a/Cargo.toml b/Cargo.toml index fa89a44..612ec5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,5 @@ sequence_trie = "0.3" byteorder = "1.1" lazy_static = "0.2" cervus = "0.2" +net2 = "0.2" +num_cpus = "1.6" diff --git a/src/ice_server.rs b/src/ice_server.rs index 4eecf57..5721c01 100644 --- a/src/ice_server.rs +++ b/src/ice_server.rs @@ -2,6 +2,8 @@ use std; use std::sync::{Arc, RwLock, Mutex}; use std::rc::Rc; use std::cell::RefCell; +use std::net::SocketAddr; +use std::os::raw::c_void; use hyper; use hyper::server::{Http, Request, Response, Service}; use futures; @@ -10,6 +12,10 @@ use futures::Stream; use delegates; use router; use tokio_core; +use net2; +use net2::unix::UnixTcpBuilderExt; +use num_cpus; +use cervus; use static_file; use logging; use session_storage::SessionStorage; @@ -29,6 +35,7 @@ pub struct IceServer { pub struct Preparation { pub router: Arc>, pub static_dir: RwLock>, + pub session_storage: Arc, pub session_cookie_name: Mutex, pub session_timeout_ms: RwLock, pub templates: Arc, @@ -73,6 +80,7 @@ impl IceServer { prep: Arc::new(Preparation { router: Arc::new(Mutex::new(router::Router::new())), static_dir: RwLock::new(None), + session_storage: Arc::new(SessionStorage::new()), session_cookie_name: Mutex::new(config::DEFAULT_SESSION_COOKIE_NAME.to_string()), session_timeout_ms: RwLock::new(600000), templates: Arc::new(TemplateStorage::new()), @@ -86,17 +94,15 @@ impl IceServer { } } - pub fn listen_in_this_thread(&self, addr: &str) { + pub fn listen_in_this_thread(&self, addr: &SocketAddr, protocol: &Http) { let logger = logging::Logger::new("IceServer::listen_in_this_thread"); - let addr = addr.parse().unwrap(); - let mut ev_loop = tokio_core::reactor::Core::new().unwrap(); let (control_tx, control_rx) = std::sync::mpsc::channel(); let remote_handle = ev_loop.handle().remote().clone(); - let session_storage = Arc::new(SessionStorage::new()); + let session_storage = self.prep.session_storage.clone(); let ctx = Arc::new(Context { ev_loop_remote: remote_handle.clone(), @@ -114,9 +120,6 @@ impl IceServer { modules: self.prep.modules.clone() }); - //let modules = self.prep.modules.clone(); - //modules.read().unwrap().run_hook(Hook::ContextInit(ctx.clone())); - let local_ctx = Rc::new(LocalContext { ev_loop_handle: ev_loop.handle(), static_file_worker_control_tx: control_tx, @@ -126,19 +129,25 @@ impl IceServer { let ctx_cloned = ctx.clone(); let _ = std::thread::spawn(move || static_file::worker(ctx_cloned, remote_handle, control_rx)); - let session_timeout_ms = *self.prep.session_timeout_ms.read().unwrap(); - let _ = std::thread::spawn(move || session_storage.run_gc(session_timeout_ms, config::SESSION_GC_PERIOD_MS)); - let this_handle = ev_loop.handle(); - let listener = tokio_core::net::TcpListener::bind(&addr, &this_handle).unwrap(); + let listener = net2::TcpBuilder::new_v4().unwrap() + .reuse_port(true).unwrap() + .bind(addr).unwrap() + .listen(128).unwrap(); + + let listener = tokio_core::net::TcpListener::from_listener( + listener, + addr, + &this_handle + ).unwrap(); let server = listener.incoming().for_each(|(sock, addr)| { let s = HttpService { context: ctx.clone(), local_context: local_ctx.clone() }; - Http::new().bind_connection(&this_handle, sock, addr, s); + protocol.bind_connection(&this_handle, sock, addr, s); Ok(()) }); @@ -148,12 +157,39 @@ impl IceServer { ev_loop.run(server).unwrap(); } - pub fn listen(&self, addr: &str) -> std::thread::JoinHandle<()> { - let addr = addr.to_string(); + pub fn listen(&self, addr: &str) { + let protocol = Arc::new(Http::new()); + let addr: SocketAddr = addr.parse().unwrap(); + + self.export_symbols(); - let target = self.clone(); + let session_timeout_ms = *self.prep.session_timeout_ms.read().unwrap(); + let session_storage = self.prep.session_storage.clone(); + std::thread::spawn(move || session_storage.run_gc(session_timeout_ms, config::SESSION_GC_PERIOD_MS)); + + for _ in 0..num_cpus::get() - 1 { + let addr = addr.clone(); + let target = self.clone(); + let protocol = protocol.clone(); + + std::thread::spawn(move || target.listen_in_this_thread(&addr, &protocol)); + } + } - std::thread::spawn(move || target.listen_in_this_thread(addr.as_str())) + fn export_symbols(&self) { + unsafe { + cervus::engine::add_global_symbol("ice_glue_create_response", glue::response::ice_glue_create_response as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_add_header", glue::response::ice_glue_response_add_header as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_set_cookie", glue::response::ice_glue_response_set_cookie as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_set_body", glue::response::ice_glue_response_set_body as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_set_file", glue::response::ice_glue_response_set_file as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_set_status", glue::response::ice_glue_response_set_status as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_consume_rendered_template", glue::response::ice_glue_response_consume_rendered_template as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_stream", glue::response::ice_glue_response_stream as *const c_void); + cervus::engine::add_global_symbol("ice_glue_custom_properties_set", glue::common::ice_glue_custom_properties_set as *const c_void); + cervus::engine::add_global_symbol("ice_glue_custom_properties_get", glue::common::ice_glue_custom_properties_get as *const c_void); + cervus::engine::add_global_symbol("ice_glue_response_borrow_custom_properties", glue::response::ice_glue_response_borrow_custom_properties as *const c_void); + } } } diff --git a/src/lib.rs b/src/lib.rs index 99999f5..aab1b1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ extern crate ansi_term; extern crate etag; extern crate sequence_trie; extern crate byteorder; +extern crate net2; +extern crate num_cpus; extern crate cervus; @@ -53,9 +55,9 @@ pub unsafe fn ice_server_listen(handle: ServerHandle, addr: *const c_char) -> *m let handle = &*handle; let server = handle.lock().unwrap(); - let thread_handle = Box::new(server.listen(CStr::from_ptr(addr).to_str().unwrap())); + server.listen(CStr::from_ptr(addr).to_str().unwrap()); - Box::into_raw(thread_handle) + std::ptr::null_mut() } #[no_mangle] diff --git a/src/module_manager.rs b/src/module_manager.rs index 590fcc0..0144f48 100644 --- a/src/module_manager.rs +++ b/src/module_manager.rs @@ -4,7 +4,7 @@ use std::cell::RefCell; use std::any::Any; use std::ops::{Deref, DerefMut}; use std::collections::HashMap; -use std::sync::{Arc, Weak, Mutex}; +use std::sync::{Arc, Weak, Mutex, RwLock}; use std::ffi::CStr; use std::os::raw::{c_void, c_char}; use cervus; @@ -26,7 +26,7 @@ pub struct ModuleRuntime { pub struct ModuleContext { all_hooks: Arc>>>>, hooks: Mutex>, - context_mem: Mutex>> + context_mem: RwLock>> } impl ModuleRuntime { @@ -43,7 +43,7 @@ impl ModuleContext { ModuleContext { all_hooks: m.all_hooks.clone(), hooks: Mutex::new(HashMap::new()), - context_mem: Mutex::new(None) + context_mem: RwLock::new(None) } } } @@ -254,12 +254,12 @@ unsafe extern fn _reset_context_mem( ) -> *mut u8 { let m = (&*m).downcast_ref::>().unwrap().upgrade().unwrap(); if size == 0 { - *m.context_mem.lock().unwrap() = None; + *m.context_mem.write().unwrap() = None; std::ptr::null_mut() } else { let mut v = vec![0; size as usize]; let addr = v.as_mut_ptr(); - *m.context_mem.lock().unwrap() = Some(v); + *m.context_mem.write().unwrap() = Some(v); addr } } @@ -268,8 +268,8 @@ unsafe extern fn _get_context_mem( m: *const cervus::engine::ModuleResource ) -> *mut u8 { let m = (&*m).downcast_ref::>().unwrap().upgrade().unwrap(); - let ret = match *m.context_mem.lock().unwrap() { - Some(ref mut v) => v.as_mut_ptr(), + let ret = match *m.context_mem.read().unwrap() { + Some(ref v) => v.as_ptr() as *mut u8, None => std::ptr::null_mut() }; ret