Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debugging signal server #124

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Following https://keepachangelog.com/en/1.1.0/ and using

### Fixed
- reconnections should work better now, both for libc and wasm
- fixed signalling server to correctly close and remove timed-out connections
- removed mdns calls, so it doesn't flood my home network

### Changed
Expand Down
2 changes: 1 addition & 1 deletion cli/flsignal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

log::info!("Started listening on port 8765");
for msg in msgs {
log::debug!("{:?}", msg);
// log::debug!("Got message {:?}", msg);
if matches!(msg, SignalMessage::Output(SignalOutput::Stopped)) {
log::error!("Server stopped working - exiting");
return Ok(());
Expand Down
33 changes: 22 additions & 11 deletions flarch/src/web_rtc/libc/web_socket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use tokio::{
net::{TcpListener, TcpStream},
select,
Expand All @@ -19,14 +19,14 @@ use crate::web_rtc::websocket::{
};

pub struct WebSocketServer {
connections: Arc<Mutex<Vec<WSConnection>>>,
connections: Arc<Mutex<HashMap<usize, WSConnection>>>,
conn_thread: JoinHandle<()>,
}

impl WebSocketServer {
pub async fn new(port: u16) -> Result<Broker<WSServerMessage>, WSSError> {
let server = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let connections = Arc::new(Mutex::new(Vec::new()));
let connections = Arc::new(Mutex::new(HashMap::new()));
let connections_cl = Arc::clone(&connections);
let mut broker = Broker::new();
let mut broker_cl = broker.clone();
Expand All @@ -37,17 +37,17 @@ impl WebSocketServer {
let broker_cl2 = broker_cl.clone();
match WSConnection::new(stream, broker_cl2, connection_id).await {
Ok(conn) => {
log::trace!("Got new connection");
connections_cl.lock().await.push(conn);
log::warn!("Got new connection {connection_id}");
(*connections_cl.lock().await).insert(connection_id, conn);
broker_cl
.emit_msg(WSServerMessage::Output(WSServerOutput::NewConnection(
connection_id,
)))
.expect("Error sending connect message");
connection_id += 1;
}
Err(e) => log::error!("Error while getting connection: {:?}", e),
}
connection_id += 1;
}
}
});
Expand All @@ -71,19 +71,28 @@ impl SubsystemHandler<WSServerMessage> for WebSocketServer {
match msg_in {
WSServerInput::Message(id, msg) => {
let mut connections = self.connections.lock().await;
if let Some(conn) = connections.get_mut(id) {
if let Some(conn) = connections.get_mut(&id) {
if let Err(e) = conn.send(msg).await {
log::error!("Error while sending: {e}");
conn.close();
connections.remove(id);
connections.remove(&id);
return vec![WSServerMessage::Output(
WSServerOutput::Disconnection(id),
)];
}
} else {
log::warn!("No connection found");
return vec![WSServerMessage::Output(WSServerOutput::Disconnection(
id,
))];
}
}
WSServerInput::Close(id) => {
log::warn!("Closing {id}");
let mut connections = self.connections.lock().await;
if let Some(conn) = connections.get_mut(id) {
if let Some(conn) = connections.get_mut(&id) {
conn.close();
connections.remove(id);
connections.remove(&id);
}
}
WSServerInput::Stop => {
Expand Down Expand Up @@ -133,6 +142,7 @@ impl WSConnection {
loop {
select! {
_ = (&mut rx) => {
log::warn!("In loop_read");
broker
.emit_msg(WSServerMessage::Output(WSServerOutput::Disconnection(id)))
.expect("While sending message to broker.");
Expand All @@ -146,12 +156,13 @@ impl WSConnection {
Some(WSServerMessage::Output(WSServerOutput::Message(id, s)))
}
Message::Close(_) => {
log::warn!("Websocket closed for {id}");
Some(WSServerMessage::Output(WSServerOutput::Disconnection(id)))
}
_ => None,
},
Err(e) => {
log::warn!("Closing connection because of error: {e:?}");
log::warn!("Closing connection {id} because of error: {e:?}");
Some(WSServerMessage::Output(WSServerOutput::Disconnection(id)))
}
} {
Expand Down
41 changes: 29 additions & 12 deletions flmodules/src/network/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,27 @@
//! You can find an example of how the signalling server is used in
//! <https://github.com/ineiti/fledger/tree/0.7.0/cli/flsignal/src/main.rs>

use bimap::BiMap;
use bimap::{BiMap, Overwritten};
use serde::{Deserialize, Serialize};
use serde_with::{base64::Base64, serde_as};
use std::{
collections::HashMap,
fmt::{Error, Formatter},
};

use flarch::{
broker::{Broker, BrokerError, Subsystem, SubsystemHandler}, nodeids::{NodeID, U256}, platform_async_trait, web_rtc::{
messages::PeerInfo,
websocket::{WSServerInput, WSServerMessage, WSServerOutput},
}
};
use crate::{
nodeconfig::NodeInfo,
timer::{TimerBroker, TimerMessage},
};
use flarch::{
broker::{Broker, BrokerError, Subsystem, SubsystemHandler},
nodeids::{NodeID, U256},
platform_async_trait,
web_rtc::{
messages::PeerInfo,
websocket::{WSServerInput, WSServerMessage, WSServerOutput},
},
};

#[derive(Clone, Debug)]
/// The possible messages for the signalling server broker, including the
Expand Down Expand Up @@ -104,6 +107,7 @@ pub enum SignalOutput {
/// PeerInfo messages between nodes.
/// It also handles statistics by forwarding NodeStats to a listener.
pub struct SignalServer {
challenge_ids: BiMap<U256, usize>,
connection_ids: BiMap<U256, usize>,
info: HashMap<U256, NodeInfo>,
ttl: HashMap<usize, u64>,
Expand All @@ -124,6 +128,7 @@ impl SignalServer {
let mut broker = Broker::new();
broker
.add_subsystem(Subsystem::Handler(Box::new(SignalServer {
challenge_ids: BiMap::new(),
connection_ids: BiMap::new(),
info: HashMap::new(),
ttl: HashMap::new(),
Expand Down Expand Up @@ -216,7 +221,7 @@ impl SignalServer {
fn msg_ws_connect(&mut self, index: usize) -> Vec<SignalMessage> {
log::debug!("Sending challenge to new connection");
let challenge = U256::rnd();
self.connection_ids.insert(challenge, index);
self.challenge_ids.insert(challenge, index);
self.ttl.insert(index, self.ttl_minutes);
let challenge_msg =
serde_json::to_string(&WSSignalMessageToNode::Challenge(SIGNAL_VERSION, challenge))
Expand All @@ -225,7 +230,7 @@ impl SignalServer {
}

fn ws_announce(&mut self, index: usize, msg: MessageAnnounce) -> Vec<SignalMessage> {
let challenge = match self.connection_ids.get_by_right(&index) {
let challenge = match self.challenge_ids.get_by_right(&index) {
Some(id) => id,
None => {
log::warn!("Got an announcement message without challenge.");
Expand All @@ -237,15 +242,25 @@ impl SignalServer {
return vec![];
}
let id = msg.node_info.get_id();
self.connection_ids.insert(id, index);

let mut out = vec![SignalOutput::NewNode(id).into()];
for (id, _) in &self.connection_ids {
log::warn!("Have ID: {id}");
}
if let Overwritten::Left(_, old) = self.connection_ids.insert(id, index) {
log::warn!("Sending close for {old}");
out.push(SignalMessage::WSServer(WSServerMessage::Input(
WSServerInput::Close(old),
)));
}

log::info!("Registration of node-id {}: {}", id, msg.node_info.name);
self.info.insert(id, msg.node_info);
vec![SignalOutput::NewNode(id).into()]
out
}

fn ws_list_ids(&mut self, id: usize) -> Vec<SignalMessage> {
log::info!("Current list is: {:?}", self.info.values());
// log::info!("Current list is: {:?}", self.info.values());
self.send_msg_node(
id,
WSSignalMessageToNode::ListIDsReply(self.info.values().cloned().collect()),
Expand Down Expand Up @@ -278,6 +293,8 @@ impl SignalServer {
}

fn remove_node(&mut self, index: usize) {
log::warn!("Removing node {index}");
self.challenge_ids.remove_by_right(&index);
if let Some((id, _)) = self.connection_ids.remove_by_right(&index) {
self.info.remove(&id);
}
Expand Down
Loading