Skip to content

Commit

Permalink
[testing] Enhancement: Improve tcp-echo
Browse files Browse the repository at this point in the history
  • Loading branch information
iyzhang authored and anandbonde committed Nov 22, 2024
1 parent 38b7fdf commit f3a2109
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 32 deletions.
19 changes: 5 additions & 14 deletions examples/tcp-echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ pub struct TcpEchoClient {
remote: SocketAddr,
/// List of pending operations.
qts: Vec<QToken>,
/// Reverse lookup table of pending operations.
qts_reverse: HashMap<QToken, QDesc>,
/// Start time.
start: Instant,
/// Statistics.
Expand All @@ -77,7 +75,6 @@ impl TcpEchoClient {
npushed: 0,
clients: HashMap::default(),
qts: Vec::default(),
qts_reverse: HashMap::default(),
start: Instant::now(),
stats: Histogram::new(7, 64)?,
});
Expand Down Expand Up @@ -185,7 +182,7 @@ impl TcpEchoClient {
// Set default linger to a short period, otherwise, this test will take a long time to complete.

let qt: QToken = self.libos.connect(qd, self.remote)?;
self.register_operation(qd, qt);
self.register_operation(qt);

// First client connects synchronously.
if i == 0 {
Expand Down Expand Up @@ -391,42 +388,36 @@ impl TcpEchoClient {
/// Issues a pop operation.
fn issue_pop(&mut self, qd: QDesc, size: Option<usize>) -> Result<()> {
let qt: QToken = self.libos.pop(qd, size)?;
self.register_operation(qd, qt);
self.register_operation(qt);
Ok(())
}

/// Issues a push operation
fn issue_push(&mut self, qd: QDesc) -> Result<()> {
let sga: demi_sgarray_t = self.mksga(self.bufsize)?;
let qt: QToken = self.libos.push(qd, &sga)?;
self.register_operation(qd, qt);
self.register_operation(qt);
// Ok to immediately free because the push clones the reference and keeps it until the push completes.
self.libos.sgafree(sga)?;
Ok(())
}

/// Handles a close operation.
fn handle_close(&mut self, qd: QDesc) -> Result<()> {
let qts_drained: HashMap<QToken, QDesc> = self.qts_reverse.extract_if(|_k, v| v == &qd).collect();
let _: Vec<_> = self.qts.extract_if(|x| qts_drained.contains_key(x)).collect();
self.clients.remove(&qd);
self.libos.close(qd)?;
println!("INFO: {} clients connected", self.clients.len());
Ok(())
}

// Registers an asynchronous I/O operation.
fn register_operation(&mut self, qd: QDesc, qt: QToken) {
self.qts_reverse.insert(qt, qd);
fn register_operation(&mut self, qt: QToken) {
self.qts.push(qt);
}

// Unregisters an asynchronous I/O operation.
fn unregister_operation(&mut self, index: usize) -> Result<()> {
let qt: QToken = self.qts.remove(index);
self.qts_reverse
.remove(&qt)
.ok_or(anyhow::anyhow!("unregistered queue token qt={:?}", qt))?;
let _: QToken = self.qts.remove(index);
Ok(())
}
}
Expand Down
27 changes: 9 additions & 18 deletions examples/tcp-echo/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use anyhow::Result;
use demikernel::{
demi_sgarray_t,
runtime::types::{demi_opcode_t, demi_qresult_t},
LibOS, QDesc, QToken,
timer, LibOS, QDesc, QToken,
};
use std::{
collections::{HashMap, HashSet},
collections::HashSet,
net::SocketAddr,
time::{Duration, Instant},
};
Expand All @@ -39,7 +39,6 @@ pub struct TcpEchoServer {
listening_sockqd: QDesc,
connected_clients: HashSet<QDesc>,
pending_qtokens: Vec<QToken>,
pending_qtokens_reverse: HashMap<QToken, QDesc>,
}

//======================================================================================================================
Expand Down Expand Up @@ -69,7 +68,6 @@ impl TcpEchoServer {
listening_sockqd,
connected_clients: HashSet::default(),
pending_qtokens: Vec::default(),
pending_qtokens_reverse: HashMap::default(),
});
}

Expand Down Expand Up @@ -123,19 +121,19 @@ impl TcpEchoServer {

fn issue_accept(&mut self) -> Result<()> {
let qt: QToken = self.libos.accept(self.listening_sockqd)?;
self.register_operation(self.listening_sockqd, qt);
self.register_operation(qt);
Ok(())
}

fn issue_push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result<()> {
let qt: QToken = self.libos.push(qd, &sga)?;
self.register_operation(qd, qt);
self.register_operation(qt);
Ok(())
}

fn issue_pop(&mut self, qd: QDesc) -> Result<()> {
let qt: QToken = self.libos.pop(qd, None)?;
self.register_operation(qd, qt);
self.register_operation(qt);
Ok(())
}

Expand Down Expand Up @@ -198,26 +196,19 @@ impl TcpEchoServer {
}

fn handle_close(&mut self, qd: QDesc) -> Result<()> {
let qts_drained: HashMap<QToken, QDesc> = self.pending_qtokens_reverse.extract_if(|_k, v| v == &qd).collect();
let _: Vec<_> = self
.pending_qtokens
.extract_if(|x| qts_drained.contains_key(x))
.collect();
self.connected_clients.remove(&qd);
self.libos.close(qd)?;
Ok(())
}

fn register_operation(&mut self, qd: QDesc, qt: QToken) {
self.pending_qtokens_reverse.insert(qt, qd);
fn register_operation(&mut self, qt: QToken) {
timer!("register");
self.pending_qtokens.push(qt);
}

fn unregister_operation(&mut self, index: usize) -> Result<()> {
let qt: QToken = self.pending_qtokens.remove(index);
self.pending_qtokens_reverse
.remove(&qt)
.ok_or(anyhow::anyhow!("unregistered queue token"))?;
timer!("unregister");
self.pending_qtokens.remove(index);
Ok(())
}
}
Expand Down

0 comments on commit f3a2109

Please sign in to comment.