From 1617766691c2a05d74586eab14fa8d40ebedc4f3 Mon Sep 17 00:00:00 2001 From: Linus Gasser Date: Thu, 19 Sep 2024 19:54:32 +0200 Subject: [PATCH] Debugging signal server Making the handling of the connections cleaner. --- CHANGELOG.md | 1 + cli/flsignal/src/main.rs | 2 +- flarch/src/web_rtc/libc/web_socket_server.rs | 33 ++++++++++------ flmodules/src/network/signal.rs | 41 ++++++++++++++------ 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53790d79..4e222a7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Following https://keepachangelog.com/en/1.1.0/ and using ### Fixed - reconnections should work better now, both for libc and wasm +- fixed signalling server to correctly close and remove timed-out connections ## [0.8.0] - 2024-09-09 diff --git a/cli/flsignal/src/main.rs b/cli/flsignal/src/main.rs index 0858b7ac..80793af8 100644 --- a/cli/flsignal/src/main.rs +++ b/cli/flsignal/src/main.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { log::info!("Started listening on port 8765"); for msg in msgs { - log::debug!("{:?}", msg); + // log::debug!("Got message {:?}", msg); if matches!(msg, SignalMessage::Output(SignalOutput::Stopped)) { log::error!("Server stopped working - exiting"); return Ok(()); diff --git a/flarch/src/web_rtc/libc/web_socket_server.rs b/flarch/src/web_rtc/libc/web_socket_server.rs index ea69568b..ebbfe24c 100644 --- a/flarch/src/web_rtc/libc/web_socket_server.rs +++ b/flarch/src/web_rtc/libc/web_socket_server.rs @@ -4,7 +4,7 @@ use futures::{ stream::{SplitSink, SplitStream}, SinkExt, StreamExt, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use tokio::{ net::{TcpListener, TcpStream}, select, @@ -19,14 +19,14 @@ use crate::web_rtc::websocket::{ }; pub struct WebSocketServer { - connections: Arc>>, + connections: Arc>>, conn_thread: JoinHandle<()>, } impl WebSocketServer { pub async fn new(port: u16) -> Result, WSSError> { let server = TcpListener::bind(format!("0.0.0.0:{}", port)).await?; - let connections = Arc::new(Mutex::new(Vec::new())); + let connections = Arc::new(Mutex::new(HashMap::new())); let connections_cl = Arc::clone(&connections); let mut broker = Broker::new(); let mut broker_cl = broker.clone(); @@ -37,17 +37,17 @@ impl WebSocketServer { let broker_cl2 = broker_cl.clone(); match WSConnection::new(stream, broker_cl2, connection_id).await { Ok(conn) => { - log::trace!("Got new connection"); - connections_cl.lock().await.push(conn); + log::warn!("Got new connection {connection_id}"); + (*connections_cl.lock().await).insert(connection_id, conn); broker_cl .emit_msg(WSServerMessage::Output(WSServerOutput::NewConnection( connection_id, ))) .expect("Error sending connect message"); + connection_id += 1; } Err(e) => log::error!("Error while getting connection: {:?}", e), } - connection_id += 1; } } }); @@ -71,19 +71,28 @@ impl SubsystemHandler for WebSocketServer { match msg_in { WSServerInput::Message(id, msg) => { let mut connections = self.connections.lock().await; - if let Some(conn) = connections.get_mut(id) { + if let Some(conn) = connections.get_mut(&id) { if let Err(e) = conn.send(msg).await { log::error!("Error while sending: {e}"); conn.close(); - connections.remove(id); + connections.remove(&id); + return vec![WSServerMessage::Output( + WSServerOutput::Disconnection(id), + )]; } + } else { + log::warn!("No connection found"); + return vec![WSServerMessage::Output(WSServerOutput::Disconnection( + id, + ))]; } } WSServerInput::Close(id) => { + log::warn!("Closing {id}"); let mut connections = self.connections.lock().await; - if let Some(conn) = connections.get_mut(id) { + if let Some(conn) = connections.get_mut(&id) { conn.close(); - connections.remove(id); + connections.remove(&id); } } WSServerInput::Stop => { @@ -133,6 +142,7 @@ impl WSConnection { loop { select! { _ = (&mut rx) => { + log::warn!("In loop_read"); broker .emit_msg(WSServerMessage::Output(WSServerOutput::Disconnection(id))) .expect("While sending message to broker."); @@ -146,12 +156,13 @@ impl WSConnection { Some(WSServerMessage::Output(WSServerOutput::Message(id, s))) } Message::Close(_) => { + log::warn!("Websocket closed for {id}"); Some(WSServerMessage::Output(WSServerOutput::Disconnection(id))) } _ => None, }, Err(e) => { - log::warn!("Closing connection because of error: {e:?}"); + log::warn!("Closing connection {id} because of error: {e:?}"); Some(WSServerMessage::Output(WSServerOutput::Disconnection(id))) } } { diff --git a/flmodules/src/network/signal.rs b/flmodules/src/network/signal.rs index a91fc639..74e17429 100644 --- a/flmodules/src/network/signal.rs +++ b/flmodules/src/network/signal.rs @@ -47,7 +47,7 @@ //! You can find an example of how the signalling server is used in //! -use bimap::BiMap; +use bimap::{BiMap, Overwritten}; use serde::{Deserialize, Serialize}; use serde_with::{base64::Base64, serde_as}; use std::{ @@ -55,16 +55,19 @@ use std::{ fmt::{Error, Formatter}, }; -use flarch::{ - broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, nodeids::{NodeID, U256}, platform_async_trait, web_rtc::{ - messages::PeerInfo, - websocket::{WSServerInput, WSServerMessage, WSServerOutput}, - } -}; use crate::{ nodeconfig::NodeInfo, timer::{TimerBroker, TimerMessage}, }; +use flarch::{ + broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, + nodeids::{NodeID, U256}, + platform_async_trait, + web_rtc::{ + messages::PeerInfo, + websocket::{WSServerInput, WSServerMessage, WSServerOutput}, + }, +}; #[derive(Clone, Debug)] /// The possible messages for the signalling server broker, including the @@ -104,6 +107,7 @@ pub enum SignalOutput { /// PeerInfo messages between nodes. /// It also handles statistics by forwarding NodeStats to a listener. pub struct SignalServer { + challenge_ids: BiMap, connection_ids: BiMap, info: HashMap, ttl: HashMap, @@ -124,6 +128,7 @@ impl SignalServer { let mut broker = Broker::new(); broker .add_subsystem(Subsystem::Handler(Box::new(SignalServer { + challenge_ids: BiMap::new(), connection_ids: BiMap::new(), info: HashMap::new(), ttl: HashMap::new(), @@ -216,7 +221,7 @@ impl SignalServer { fn msg_ws_connect(&mut self, index: usize) -> Vec { log::debug!("Sending challenge to new connection"); let challenge = U256::rnd(); - self.connection_ids.insert(challenge, index); + self.challenge_ids.insert(challenge, index); self.ttl.insert(index, self.ttl_minutes); let challenge_msg = serde_json::to_string(&WSSignalMessageToNode::Challenge(SIGNAL_VERSION, challenge)) @@ -225,7 +230,7 @@ impl SignalServer { } fn ws_announce(&mut self, index: usize, msg: MessageAnnounce) -> Vec { - let challenge = match self.connection_ids.get_by_right(&index) { + let challenge = match self.challenge_ids.get_by_right(&index) { Some(id) => id, None => { log::warn!("Got an announcement message without challenge."); @@ -237,15 +242,25 @@ impl SignalServer { return vec![]; } let id = msg.node_info.get_id(); - self.connection_ids.insert(id, index); + + let mut out = vec![SignalOutput::NewNode(id).into()]; + for (id, _) in &self.connection_ids { + log::warn!("Have ID: {id}"); + } + if let Overwritten::Left(_, old) = self.connection_ids.insert(id, index) { + log::warn!("Sending close for {old}"); + out.push(SignalMessage::WSServer(WSServerMessage::Input( + WSServerInput::Close(old), + ))); + } log::info!("Registration of node-id {}: {}", id, msg.node_info.name); self.info.insert(id, msg.node_info); - vec![SignalOutput::NewNode(id).into()] + out } fn ws_list_ids(&mut self, id: usize) -> Vec { - log::info!("Current list is: {:?}", self.info.values()); + // log::info!("Current list is: {:?}", self.info.values()); self.send_msg_node( id, WSSignalMessageToNode::ListIDsReply(self.info.values().cloned().collect()), @@ -278,6 +293,8 @@ impl SignalServer { } fn remove_node(&mut self, index: usize) { + log::warn!("Removing node {index}"); + self.challenge_ids.remove_by_right(&index); if let Some((id, _)) = self.connection_ids.remove_by_right(&index) { self.info.remove(&id); }