Skip to content

Commit

Permalink
optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohit Joshi committed Mar 18, 2019
1 parent 37265cc commit 0b794f4
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 31 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ publish = false
travis-ci = { repository = "https://github.com/rohitjoshi/gadu", branch = "master" }
codecov = { repository = "https://github.com/rohitjoshi/gadu", branch = "master", service = "github" }


[profile.release]
opt-level = 3
lto = true
codegen-units = 1
incremental = false


[dependencies]
log="0.4.6"
log = { version = "0.3", features = ["max_level_trace", "release_max_level_info"] }
url="1.7.2"
futures = "0.1.23"
mio = "0.6"
Expand Down
18 changes: 10 additions & 8 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl Conn {
addr: net_addr,
close: false,
reg_write: false,
input: Vec::new(),
output: Vec::new(),
input: Vec::with_capacity(32768),
output: Vec::with_capacity(32768),
tags: HashMap::with_capacity(2),
}
}
Expand Down Expand Up @@ -173,9 +173,10 @@ impl Conn {
match self.stream.write(self.output.as_slice()) {
Ok(n) => {
if n < self.output.len() {
let mut output = Vec::new();
output.extend_from_slice(&self.output[n..self.output.len()]);
self.output = output
// let mut output = Vec::new();
// output.extend_from_slice(&self.output[n..self.output.len()]);
// self.output = output
self.output.drain(0..n);
} else {
self.output.clear();
}
Expand Down Expand Up @@ -219,9 +220,10 @@ impl Conn {
self.input.extend_from_slice(&packet[0..n]);
//self.input.extend(&buffer);
debug!(
"Received data: {}. Length:{}",
String::from_utf8_lossy(&self.input),
n
"Received Length:{}, data: {}. ",
n,
String::from_utf8_lossy(&self.input)

);
}
}
Expand Down
40 changes: 22 additions & 18 deletions src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub trait NetEvents {
&self,
id: usize,
conn_tags: &mut HashMap<String, String>,
buffer: &mut Vec<u8>,
) -> (Vec<u8>, bool);
input_buffer: &mut Vec<u8>,
output_buffer: &mut Vec<u8>,
) -> bool;
}

pub struct ServerEventHandler {
Expand Down Expand Up @@ -194,19 +195,21 @@ impl ConnEventHandler {
} else {
Ready::readable()
};

let poll_opt = PollOpt::empty(); //PollOpt::edge() | PollOpt::oneshot();
let res = match conn.get_stream() {
NetStream::UnsecuredTcpStream(ref stream) => {
self.poll
.register(stream, Token(id), flags, PollOpt::empty())
.register(stream, Token(id), flags, poll_opt)
}
NetStream::UdsStream(ref stream) => {
self.poll
.register(stream, Token(id), flags, PollOpt::empty())
.register(stream, Token(id), flags, poll_opt)
}
#[cfg(feature = "tls")]
NetStream::SslTcpStream(ref stream) => {
self.poll
.register(stream.get_ref(), Token(id), flags, PollOpt::empty())
.register(stream.get_ref(), Token(id), flags, poll_opt)
}
};
if let Err(e) = res {
Expand Down Expand Up @@ -257,13 +260,13 @@ impl ConnEventHandler {
T: NetEvents + ?Sized,
{
let mut streams: HashMap<usize, Conn> = HashMap::new();
let mut events = Events::with_capacity(2048);
let mut events = Events::with_capacity(1024);
let mut read_buffer = [0; 32768];

let mut timeout = Some(Duration::from_millis(5000));
let mut timeout = Some(Duration::from_millis(1));

if receiver.is_some() {
timeout = Some(Duration::from_millis(500));
timeout = Some(Duration::from_millis(1));
}

loop {
Expand Down Expand Up @@ -330,11 +333,12 @@ impl ConnEventHandler {
} else if !conn.close {
close = conn.read(&mut read_buffer);
// PROFILER.lock().unwrap().start("/tmp/my-prof.profile").expect("Couldn't start");
let (output, close_conn) =
event_handler.event_data(id, &mut conn.tags, &mut conn.input);
debug!("Invoking event_handler::event_data for connection id:{}", id);
let close_conn =
event_handler.event_data(id, &mut conn.tags, &mut conn.input, &mut conn.output);
// PROFILER.lock().unwrap().stop().expect("Couldn't stop");
debug!("event_data output:{}", String::from_utf8_lossy(&output));
conn.output.extend(&output);
debug!("event_data output:{}", String::from_utf8_lossy(&conn.output));
// conn.output.extend(&output);
conn.close = close_conn;
}
if !conn.close && !conn.output.is_empty() {
Expand Down Expand Up @@ -407,10 +411,10 @@ impl ConnEventHandler {
T: NetEvents + 'static + Sync + Send + Sized,
{
let mut streams: HashMap<usize, Conn> = HashMap::new();
let mut events = Events::with_capacity(2048);
let mut events = Events::with_capacity(1024);
let mut read_buffer = [0; 32768];

let timeout = Some(Duration::from_millis(5000));
let timeout = Some(Duration::from_millis(1));

loop {
//check if shutdown signal received
Expand Down Expand Up @@ -474,11 +478,11 @@ impl ConnEventHandler {
} else if !conn.close {
close = conn.read(&mut read_buffer);
// PROFILER.lock().unwrap().start("/tmp/my-prof.profile").expect("Couldn't start");
let (output, close_conn) =
event_handler.event_data(id, &mut conn.tags, &mut conn.input);
let close_conn =
event_handler.event_data(id, &mut conn.tags, &mut conn.input, &mut conn.output);
// PROFILER.lock().unwrap().stop().expect("Couldn't stop");
debug!("event_data output:{}", String::from_utf8_lossy(&output));
conn.output.extend(&output);
debug!("event_data output:{}", String::from_utf8_lossy(&conn.output));
//conn.output.extend(&output);
conn.close = close_conn;
}
if !conn.close && !conn.output.is_empty() {
Expand Down
9 changes: 5 additions & 4 deletions src/network_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
/// NetworkServer
///
pub struct NetworkServer {
//config : NetworkServerConfig,
conf : NetworkServerConfig,
server_event_handler: ServerEventHandler,
conn_handlers: Vec<Arc<ConnEventHandler>>,
pub shutdown: Arc<AtomicBool>,
Expand All @@ -37,12 +37,12 @@ impl NetworkServer {
ServerEventHandler::new(conf.server_id, conf.num_threads, &conf.server_config)?;
let mut conn_handlers = Vec::with_capacity(conf.num_threads);
for i in 0..conf.num_threads {
debug!("Initializing NetworkServer::ConnEventHandler {}", i);
debug!("Initializing {} NetworkServer::ConnEventHandler {}", conf.server_name, i);
let handler = ConnEventHandler::new()?;
conn_handlers.push(Arc::new(handler));
}
Ok(NetworkServer {
//config : conf.clone(),
conf : conf.clone(),
server_event_handler,
conn_handlers,
shutdown,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl NetworkServer {
std::thread::sleep(std::time::Duration::from_millis(250));
}
}
info!("Shutdown received. Exiting NetworkServer server_loop...");
info!("Shutdown received. Exiting {} NetworkServer server_loop...", self.conf.server_name);
}

pub fn run_loop<T>(scope: &Scope, network_server: Arc<NetworkServer>, net_event_handler: Arc<T>, non_blocking:bool)
Expand All @@ -103,6 +103,7 @@ impl NetworkServer {
scope.spawn(move |_| network_server.server_loop(net_event_handler));
}else {
network_server.server_loop(net_event_handler)

}
}
}
7 changes: 7 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ impl NetListener {
{
error!("Failed to set keepalive. Error:{:?}", e);
}

if let Err(e) =
s.0.set_nodelay(true)
{
error!("Failed to set nodelay to true. Error:{:?}", e);
}

Ok((
NetStream::UnsecuredTcpStream(s.0),
NetAddr::NetSocketAddress(s.1),
Expand Down

0 comments on commit 0b794f4

Please sign in to comment.