Skip to content

Commit

Permalink
[skip ci] Start draft implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 27, 2024
1 parent 90617ff commit 199219f
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 7 deletions.
10 changes: 10 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants.
pub mod defaults;
mod include;
use core::time;
use include::recursive_include;
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
use serde::{
Expand Down Expand Up @@ -97,6 +98,15 @@ pub struct DownsamplingItemConf {
pub flow: DownsamplingFlow,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConnectionRetryConf {
//TODO(sashacmc): add comments
// Do we need some skip/fail/etc.?
pub count: i32,
pub timeout_ms: time::Duration,
pub timeout_increase_factor: f32,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down
59 changes: 52 additions & 7 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use super::{Runtime, RuntimeSession};
use async_std::net::UdpSocket;
use async_std::prelude::FutureExt;
use core::time;
use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
Expand Down Expand Up @@ -291,24 +292,67 @@ impl Runtime {
Ok(())
}

fn get_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf {
let guard = &self.state.config.lock();
let config = endpoint.config();
let retry = config.get("TODO");

zenoh_config::ConnectionRetryConf {
count: 0,
timeout_ms: time::Duration::from_micros(0),
timeout_increase_factor: 1.,
}
}

async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> {
for listener in listeners {
let endpoint = listener.clone();
match self.manager().add_listener(endpoint).await {
Ok(listener) => log::debug!("Listener added: {}", listener),
Err(err) => {
log::error!("Unable to open listener {}: {}", listener, err);
return Err(err);
}
let retry_config = self.get_retry_config(&endpoint);
if retry_config.count > 0 {
self.spawn_add_listener(endpoint, retry_config).await?
} else {
self.add_listener(endpoint).await?
}
}

self.print_locators();

Ok(())
}

async fn spawn_add_listener(
&self,
listener: EndPoint,
retry_config: zenoh_config::ConnectionRetryConf,
) -> ZResult<()> {
let this = self.clone();
self.spawn(async move {
// TODO(sashacmc): do retry
if this.add_listener(listener).await.is_ok() {
this.print_locators();
}
});
Ok(())
}

async fn add_listener(&self, listener: EndPoint) -> ZResult<()> {
let endpoint = listener.clone();
match self.manager().add_listener(endpoint).await {
Ok(listener) => log::debug!("Listener added: {}", listener),
Err(err) => {
log::error!("Unable to open listener {}: {}", listener, err);
return Err(err);
}
}
Ok(())
}

fn print_locators(&self) {
let mut locators = self.state.locators.write().unwrap();
*locators = self.manager().get_locators();
for locator in &*locators {
log::info!("Zenoh can be reached at: {}", locator);
}
Ok(())
}

pub fn get_interfaces(names: &str) -> Vec<IpAddr> {
Expand Down Expand Up @@ -499,6 +543,7 @@ impl Runtime {
);
}
}
//TODO(sashacmc): rework
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
Expand Down

0 comments on commit 199219f

Please sign in to comment.