Skip to content
This repository has been archived by the owner on Apr 25, 2019. It is now read-only.

Commit

Permalink
Multithread
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Aug 12, 2017
1 parent c1ff4d0 commit c43781a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ sequence_trie = "0.3"
byteorder = "1.1"
lazy_static = "0.2"
cervus = "0.2"
net2 = "0.2"
num_cpus = "1.6"
68 changes: 52 additions & 16 deletions src/ice_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std;
use std::sync::{Arc, RwLock, Mutex};
use std::rc::Rc;
use std::cell::RefCell;
use std::net::SocketAddr;
use std::os::raw::c_void;
use hyper;
use hyper::server::{Http, Request, Response, Service};
use futures;
Expand All @@ -10,6 +12,10 @@ use futures::Stream;
use delegates;
use router;
use tokio_core;
use net2;
use net2::unix::UnixTcpBuilderExt;
use num_cpus;
use cervus;
use static_file;
use logging;
use session_storage::SessionStorage;
Expand All @@ -29,6 +35,7 @@ pub struct IceServer {
pub struct Preparation {
pub router: Arc<Mutex<router::Router>>,
pub static_dir: RwLock<Option<String>>,
pub session_storage: Arc<SessionStorage>,
pub session_cookie_name: Mutex<String>,
pub session_timeout_ms: RwLock<u64>,
pub templates: Arc<TemplateStorage>,
Expand Down Expand Up @@ -73,6 +80,7 @@ impl IceServer {
prep: Arc::new(Preparation {
router: Arc::new(Mutex::new(router::Router::new())),
static_dir: RwLock::new(None),
session_storage: Arc::new(SessionStorage::new()),
session_cookie_name: Mutex::new(config::DEFAULT_SESSION_COOKIE_NAME.to_string()),
session_timeout_ms: RwLock::new(600000),
templates: Arc::new(TemplateStorage::new()),
Expand All @@ -86,17 +94,15 @@ impl IceServer {
}
}

pub fn listen_in_this_thread(&self, addr: &str) {
pub fn listen_in_this_thread(&self, addr: &SocketAddr, protocol: &Http) {
let logger = logging::Logger::new("IceServer::listen_in_this_thread");

let addr = addr.parse().unwrap();

let mut ev_loop = tokio_core::reactor::Core::new().unwrap();

let (control_tx, control_rx) = std::sync::mpsc::channel();
let remote_handle = ev_loop.handle().remote().clone();

let session_storage = Arc::new(SessionStorage::new());
let session_storage = self.prep.session_storage.clone();

let ctx = Arc::new(Context {
ev_loop_remote: remote_handle.clone(),
Expand All @@ -114,9 +120,6 @@ impl IceServer {
modules: self.prep.modules.clone()
});

//let modules = self.prep.modules.clone();
//modules.read().unwrap().run_hook(Hook::ContextInit(ctx.clone()));

let local_ctx = Rc::new(LocalContext {
ev_loop_handle: ev_loop.handle(),
static_file_worker_control_tx: control_tx,
Expand All @@ -126,19 +129,25 @@ impl IceServer {
let ctx_cloned = ctx.clone();
let _ = std::thread::spawn(move || static_file::worker(ctx_cloned, remote_handle, control_rx));

let session_timeout_ms = *self.prep.session_timeout_ms.read().unwrap();
let _ = std::thread::spawn(move || session_storage.run_gc(session_timeout_ms, config::SESSION_GC_PERIOD_MS));

let this_handle = ev_loop.handle();

let listener = tokio_core::net::TcpListener::bind(&addr, &this_handle).unwrap();
let listener = net2::TcpBuilder::new_v4().unwrap()
.reuse_port(true).unwrap()
.bind(addr).unwrap()
.listen(128).unwrap();

let listener = tokio_core::net::TcpListener::from_listener(
listener,
addr,
&this_handle
).unwrap();

let server = listener.incoming().for_each(|(sock, addr)| {
let s = HttpService {
context: ctx.clone(),
local_context: local_ctx.clone()
};
Http::new().bind_connection(&this_handle, sock, addr, s);
protocol.bind_connection(&this_handle, sock, addr, s);

Ok(())
});
Expand All @@ -148,12 +157,39 @@ impl IceServer {
ev_loop.run(server).unwrap();
}

pub fn listen(&self, addr: &str) -> std::thread::JoinHandle<()> {
let addr = addr.to_string();
pub fn listen(&self, addr: &str) {
let protocol = Arc::new(Http::new());
let addr: SocketAddr = addr.parse().unwrap();

self.export_symbols();

let target = self.clone();
let session_timeout_ms = *self.prep.session_timeout_ms.read().unwrap();
let session_storage = self.prep.session_storage.clone();
std::thread::spawn(move || session_storage.run_gc(session_timeout_ms, config::SESSION_GC_PERIOD_MS));

for _ in 0..num_cpus::get() - 1 {
let addr = addr.clone();
let target = self.clone();
let protocol = protocol.clone();

std::thread::spawn(move || target.listen_in_this_thread(&addr, &protocol));
}
}

std::thread::spawn(move || target.listen_in_this_thread(addr.as_str()))
fn export_symbols(&self) {
unsafe {
cervus::engine::add_global_symbol("ice_glue_create_response", glue::response::ice_glue_create_response as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_add_header", glue::response::ice_glue_response_add_header as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_set_cookie", glue::response::ice_glue_response_set_cookie as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_set_body", glue::response::ice_glue_response_set_body as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_set_file", glue::response::ice_glue_response_set_file as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_set_status", glue::response::ice_glue_response_set_status as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_consume_rendered_template", glue::response::ice_glue_response_consume_rendered_template as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_stream", glue::response::ice_glue_response_stream as *const c_void);
cervus::engine::add_global_symbol("ice_glue_custom_properties_set", glue::common::ice_glue_custom_properties_set as *const c_void);
cervus::engine::add_global_symbol("ice_glue_custom_properties_get", glue::common::ice_glue_custom_properties_get as *const c_void);
cervus::engine::add_global_symbol("ice_glue_response_borrow_custom_properties", glue::response::ice_glue_response_borrow_custom_properties as *const c_void);
}
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ extern crate ansi_term;
extern crate etag;
extern crate sequence_trie;
extern crate byteorder;
extern crate net2;
extern crate num_cpus;

extern crate cervus;

Expand Down Expand Up @@ -53,9 +55,9 @@ pub unsafe fn ice_server_listen(handle: ServerHandle, addr: *const c_char) -> *m
let handle = &*handle;

let server = handle.lock().unwrap();
let thread_handle = Box::new(server.listen(CStr::from_ptr(addr).to_str().unwrap()));
server.listen(CStr::from_ptr(addr).to_str().unwrap());

Box::into_raw(thread_handle)
std::ptr::null_mut()
}

#[no_mangle]
Expand Down
14 changes: 7 additions & 7 deletions src/module_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::cell::RefCell;
use std::any::Any;
use std::ops::{Deref, DerefMut};
use std::collections::HashMap;
use std::sync::{Arc, Weak, Mutex};
use std::sync::{Arc, Weak, Mutex, RwLock};
use std::ffi::CStr;
use std::os::raw::{c_void, c_char};
use cervus;
Expand All @@ -26,7 +26,7 @@ pub struct ModuleRuntime {
pub struct ModuleContext {
all_hooks: Arc<Mutex<HashMap<String, Vec<Weak<ModuleContext>>>>>,
hooks: Mutex<HashMap<String, extern fn (*const HookContext)>>,
context_mem: Mutex<Option<Vec<u8>>>
context_mem: RwLock<Option<Vec<u8>>>
}

impl ModuleRuntime {
Expand All @@ -43,7 +43,7 @@ impl ModuleContext {
ModuleContext {
all_hooks: m.all_hooks.clone(),
hooks: Mutex::new(HashMap::new()),
context_mem: Mutex::new(None)
context_mem: RwLock::new(None)
}
}
}
Expand Down Expand Up @@ -254,12 +254,12 @@ unsafe extern fn _reset_context_mem(
) -> *mut u8 {
let m = (&*m).downcast_ref::<Weak<ModuleContext>>().unwrap().upgrade().unwrap();
if size == 0 {
*m.context_mem.lock().unwrap() = None;
*m.context_mem.write().unwrap() = None;
std::ptr::null_mut()
} else {
let mut v = vec![0; size as usize];
let addr = v.as_mut_ptr();
*m.context_mem.lock().unwrap() = Some(v);
*m.context_mem.write().unwrap() = Some(v);
addr
}
}
Expand All @@ -268,8 +268,8 @@ unsafe extern fn _get_context_mem(
m: *const cervus::engine::ModuleResource
) -> *mut u8 {
let m = (&*m).downcast_ref::<Weak<ModuleContext>>().unwrap().upgrade().unwrap();
let ret = match *m.context_mem.lock().unwrap() {
Some(ref mut v) => v.as_mut_ptr(),
let ret = match *m.context_mem.read().unwrap() {
Some(ref v) => v.as_ptr() as *mut u8,
None => std::ptr::null_mut()
};
ret
Expand Down

0 comments on commit c43781a

Please sign in to comment.