Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Implement WebSocket via tokio-tungstenite. Closes #145
Browse files Browse the repository at this point in the history
  • Loading branch information
PooyaEimandar committed Nov 20, 2021
1 parent 644e17d commit d2db214
Showing 1 changed file with 203 additions and 17 deletions.
220 changes: 203 additions & 17 deletions wolf-system/src/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,66 @@ where
break;
}

let want_to_close_conn: anyhow::Result<()>;
let elapsed_secs = socket_live_time.elapsed().as_secs_f64();
let msg_vec = msg.clone().into_data();
msg_size = msg_vec.len();

msg_buf = msg_vec.as_slice().try_into()?;
let want_to_close_conn = p_on_msg_callback.run(
&elapsed_secs,
&p_peer_address,
&mut msg_type,
&mut msg_size,
&mut msg_buf,
);
if msg.is_text() {
msg_type = MessageType::TEXT;
let msg_res = msg.into_text();
match msg_res {
Ok(str) => {
unsafe {
let src_len = str.len();
let src_ptr = str.as_bytes().as_ptr();
let dst_ptr = msg_buf.as_mut_ptr();

std::ptr::copy_nonoverlapping(
src_ptr, dst_ptr, src_len,
);
}
want_to_close_conn = p_on_msg_callback.run(
&elapsed_secs,
&p_peer_address,
&mut msg_type,
&mut msg_size,
&mut msg_buf,
);
}
Err(e) => {
close_msg = format!("websocket connection is going to close. Reason: Unsupported text message. {:?}", e);
close_code = CloseCode::Unsupported;
break;
}
};
} else if msg.is_binary() {
msg_type = MessageType::BINARY;
let msg_vec = msg.clone().into_data();
let msg_res: core::result::Result<
&[u8; 1024],
std::array::TryFromSliceError,
> = msg_vec.as_slice().try_into();
match msg_res {
Ok(arr) => {
msg_buf.copy_from_slice(arr);
want_to_close_conn = p_on_msg_callback.run(
&elapsed_secs,
&p_peer_address,
&mut msg_type,
&mut msg_size,
&mut msg_buf,
);
}
Err(e) => {
close_msg = format!("websocket connection is going to close. Reason: Unsupported binary message. {:?}", e);
close_code = CloseCode::Unsupported;
break;
}
};
} else {
close_msg = "websocket connection is going to close. Reason: Unsupported message type".to_string();
close_code = CloseCode::Unsupported;
break;
}

if want_to_close_conn.is_err() {
close_msg = format!("websocket connection is going to close because of the p_on_msg_callback request. Reason: {:?}", want_to_close_conn);
close_code = CloseCode::Normal;
Expand All @@ -213,6 +261,8 @@ where

match msg_type {
MessageType::TEXT => {
//fill other data to zero for better converting array to string
msg_buf[msg_size..1024].fill(0u8);
let str_res = std::str::from_utf8(&msg_buf);
if str_res.is_ok() {
let r = futures::SinkExt::send(
Expand Down Expand Up @@ -557,7 +607,7 @@ pub async fn client(

#[tokio::main]
#[test]
async fn test() -> () {
async fn test_native() -> () {
use std::sync::mpsc::channel;
use std::time::Duration;

Expand Down Expand Up @@ -607,11 +657,11 @@ async fn test() -> () {
println!("client: received buffer is {}", msg);
}
//now store new bytes for write
let msg = "hello...world!\0"; //make sure append NULL terminate
let msg = "hello...world!"; //make sure append NULL terminate
p_msg_buf[0..msg.len()].copy_from_slice(msg.as_bytes());
*p_msg_size = msg.len();

if p_socket_time_in_secs > &10.0 {
if p_socket_time_in_secs > &5.0 {
anyhow::bail!("closing socket");
}
Ok(())
Expand Down Expand Up @@ -665,7 +715,7 @@ async fn test() -> () {
println!("server: received buffer is {}", msg);

//now store new bytes for write
let msg = "hello client!\0"; //make sure append NULL terminate
let msg = "hello client!"; //make sure append NULL terminate
p_msg_buf[0..msg.len()].copy_from_slice(msg.as_bytes());
*p_msg_size = msg.len();
}
Expand All @@ -676,7 +726,7 @@ async fn test() -> () {
let on_close_connection = OnCloseSocketCallback::new(Box::new(
|p_socket_address: &SocketAddr, p_close_msg: &str| -> anyhow::Result<()> {
println!(
"server: remote address with peer id {:?} just disconnected because of {}",
"server: remote address with peer id {:?} just disconnected. close message is {}",
p_socket_address, p_close_msg
);
Ok(())
Expand Down Expand Up @@ -709,6 +759,142 @@ async fn test() -> () {
on_close_socket,
)
.await;
assert!(ret.is_ok(), "{:?}", ret);

println!("native tcp tests were done");
}

#[tokio::main]
#[test]
async fn test_ws() -> () {
//run client code for this test
/*
<!DOCTYPE html>
<html>
<body>
<h1>Hello Wolf</h1>
</body>
<script>
let socket = new WebSocket("ws://127.0.0.1:8000");
socket.onopen = function (e) {
alert("[open] Connection established");
alert("Sending to server");
socket.send("Hello Wolf from WS");
};
socket.onmessage = function (event) {
console.log(`[message] Data received from server: ${event.data}`);
};
socket.onclose = function (event) {
if (event.wasClean) {
alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
} else {
// e.g. server process killed or network down
// event.code is usually 1006 in this case
alert('[close] Connection died');
}
};
socket.onerror = function (error) {
alert(`[error] ${error.message}`);
};
</script>
</html>
*/

use std::sync::mpsc::channel;

lazy_static::lazy_static! {
static ref CHANNEL_MUTEX: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel::<bool>());
}

// block main thread with tcp server
let on_bind_socket = OnSocketCallback::new(Box::new(
|p_socket_address: &SocketAddr| -> anyhow::Result<()> {
println!("server: socket {:?} just binded", p_socket_address);
Ok(())
},
));

let on_accept_connection = OnSocketCallback::new(Box::new(
|p_socket_address: &SocketAddr| -> anyhow::Result<()> {
println!(
"server: remote address with peer id {:?} just connected",
p_socket_address
);
Ok(())
},
));

let on_msg_callback = OnMessageCallback::new(Box::new(
|p_socket_time_in_secs: &f64,
p_peer_address: &SocketAddr,
_p_msg_type: &mut MessageType,
p_msg_size: &mut usize,
p_msg_buf: &mut [u8]|
-> anyhow::Result<()> {
println!(
"server: number of received byte(s) from {:?} is {}. socket live time {}",
p_peer_address, *p_msg_size, p_socket_time_in_secs
);
if *p_msg_size > 0 {
let msg = std::str::from_utf8(&p_msg_buf)?;
println!("server: received buffer is {}", msg);

//now store new bytes for write
let msg = "hello websocket client!";
p_msg_buf[0..msg.len()].copy_from_slice(msg.as_bytes());
*p_msg_size = msg.len();
}
Err(anyhow!("close"))
},
));

let on_close_connection = OnCloseSocketCallback::new(Box::new(
|p_socket_address: &SocketAddr, p_close_msg: &str| -> anyhow::Result<()> {
println!(
"server: remote address with peer id {:?} just disconnected. close message is {}",
p_socket_address, p_close_msg
);

//send request to close the server socket
let _ = CHANNEL_MUTEX.lock().and_then(|channel| {
let _ = channel.0.send(true).and_then(|_| Ok(())).or_else(|e| {
println!("could not send data to close_sig_channel. error: {:?}", e);
Err(e)
});
Ok(())
});

Ok(())
},
));

let on_close_socket = OnSocketCallback::new(Box::new(
|p_socket_address: &SocketAddr| -> anyhow::Result<()> {
println!("server: socket {:?} just closed", p_socket_address);
Ok(())
},
));

let ret = server(
TcpProtocol::TcpWebsocket,
"0.0.0.0",
8000,
100,
5.0, //5 seconds
3.0, // 3 seconds
false,
None,
None,
None,
&CHANNEL_MUTEX,
on_bind_socket,
on_accept_connection,
on_msg_callback,
on_close_connection,
on_close_socket,
)
.await;
assert!(ret.is_ok(), "{:?}", ret);

println!("tcp tests done {:?}", ret);
println!("websocket tests were done");
}

0 comments on commit d2db214

Please sign in to comment.