Skip to content

Commit

Permalink
Dont get stuck in connecting
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusPettersson98 committed Jan 28, 2025
1 parent 6cdc050 commit b88cd5c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
42 changes: 27 additions & 15 deletions talpid-routing/src/unix/android.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::sync::Mutex;
use crate::imp::RouteManagerCommand;
use futures::{
channel::mpsc::{self, UnboundedReceiver, UnboundedSender},
stream::StreamExt, Stream,
stream::StreamExt, select_biased,
future::FutureExt,
};
use ipnetwork::IpNetwork;
use jnix::{
Expand Down Expand Up @@ -56,7 +57,7 @@ pub enum RoutesUpdate {
// TODO: This is le actor state
/// Stub route manager for Android
pub struct RouteManagerImpl {
routes_udates: UnboundedReceiver<RoutesUpdate>,
routes_updates: UnboundedReceiver<RoutesUpdate>,
listeners: Vec<UnboundedSender<RoutesUpdate>>,
}

Expand All @@ -73,7 +74,7 @@ impl RouteManagerImpl {
// TODO: What id `ROUTE_UPDATES_TX` has already been initialized?
*ROUTE_UPDATES_TX.lock().unwrap() = Some(tx);
Ok(RouteManagerImpl {
routes_udates: rx,
routes_updates: rx,
listeners: Default::default(),
})
}
Expand All @@ -83,20 +84,31 @@ impl RouteManagerImpl {
manage_rx: mpsc::UnboundedReceiver<RouteManagerCommand>,
) -> Result<(), Error> {
let mut manage_rx = manage_rx.fuse();
while let Some(command) = manage_rx.next().await {
match command {
RouteManagerCommand::NewChangeListener(tx) => {
// register a listener for new route updates
self.listeners.push(tx);
loop {
select_biased! {
command = manage_rx.next().fuse() => {
let Some(command) = command else { break };

match command {
RouteManagerCommand::NewChangeListener(tx) => {
// register a listener for new route updates
self.listeners.push(tx);
}
RouteManagerCommand::Shutdown(tx) => {
tx.send(()).map_err(|()| Error::Send)?; // TODO: Surely we can do better than this
break;
}
RouteManagerCommand::AddRoutes(_routes, tx) => {
tx.send(Ok(())).map_err(|_x| Error::Send)?;
}
RouteManagerCommand::ClearRoutes => (),
}
}
RouteManagerCommand::Shutdown(tx) => {
tx.send(()).map_err(|()| Error::Send)?; // TODO: Surely we can do better than this
break;
}
RouteManagerCommand::AddRoutes(_routes, tx) => {
tx.send(Ok(())).map_err(|_x| Error::Send)?;

route_update = self.routes_updates.next().fuse() => {
let Some(route_update) = route_update else { break };
self.notify_change_listeners(route_update);
}
RouteManagerCommand::ClearRoutes => (),
}
}
Ok(())
Expand Down
1 change: 1 addition & 0 deletions talpid-routing/src/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl RouteManagerHandle {
.unbounded_send(RouteManagerCommand::NewChangeListener(stream_tx))
.map_err(|_| Error::RouteManagerDown).unwrap(); //?;

log::info!("comparing routes with {routes:?}");
stream_rx.map(move |change| {
use std::collections::HashSet;

Expand Down
10 changes: 9 additions & 1 deletion talpid-wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,15 @@ impl WireguardMonitor {

// Wait for routes to come up
// TODO: Time out (eventually) and return proper error
route_updates.any(|routes_are_correct| async move { routes_are_correct }).await;
let route_update = route_updates
.inspect(|x| {
log::info!("routes_are_correct: {x}");
})
.any(|routes_are_correct| async move { routes_are_correct });
if let Err(_) = tokio::time::timeout(std::time::Duration::from_secs(4), route_update).await {
todo!("fixme");
//return Err(Error::SetupRoutingError(talpid_routing::Error::RouteManagerDown)); // TODO: Wrong error. Expose "routes are not up" error
}

if should_negotiate_ephemeral_peer {
let ephemeral_obfs_sender = close_obfs_sender.clone();
Expand Down

0 comments on commit b88cd5c

Please sign in to comment.