Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add downsampling interceptor POC #660

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf {
Self { enabled: false }
}
}

impl Default for DownsamplingItemConf {
fn default() -> Self {
Self {
keyexprs: None,
interfaces: None,
strategy: Some("ratelimit".to_string()),
threshold_ms: None,
}
}
}
17 changes: 17 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ impl Zeroize for SecretString {

pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingItemConf {
/// A list of key-expressions to which the downsampling will be applied.
pub keyexprs: Option<Vec<OwnedKeyExpr>>,
/// A list of interfaces to which the downsampling will be applied.
pub interfaces: Option<Vec<String>>,
/// Downsampling strategy (default: ratelimit).
pub strategy: Option<String>,
/// Minimim timeout between two messages for ratelimit starategy
pub threshold_ms: Option<u64>,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down Expand Up @@ -405,6 +417,11 @@ validated_struct::validator! {
},

},
/// Configuration of the downsampling.
pub downsampling: #[derive(Default)]
DownsamplingConf {
items: Vec<DownsamplingItemConf>,
},
/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
plugins_search_dirs: Vec<String>, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well)
Expand Down
192 changes: 90 additions & 102 deletions commons/zenoh-util/src/std_only/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ pub fn set_linger(socket: &TcpStream, dur: Option<Duration>) -> ZResult<()> {
}
}

#[cfg(windows)]
unsafe fn get_adapters_adresses(af_spec: i32) -> ZResult<Vec<u8>> {
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
af_spec.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}

Ok(buffer)
}

pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
#[cfg(unix)]
{
Expand All @@ -117,31 +150,7 @@ pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
Expand Down Expand Up @@ -218,33 +227,9 @@ pub fn get_local_addresses() -> ZResult<Vec<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut result = vec![];
let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?;

let mut result = vec![];
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref();
Expand Down Expand Up @@ -317,33 +302,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult<Vec<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut addrs = vec![];
let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret);
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut addrs = vec![];
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
if name == ffi::pstr_to_string(iface.AdapterName)
Expand Down Expand Up @@ -380,31 +341,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
Expand All @@ -424,6 +361,57 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
}
}

pub fn get_interfaces_by_addr(addr: IpAddr) -> ZResult<Vec<String>> {
#[cfg(unix)]
{
if addr.is_unspecified() {
Ok(pnet_datalink::interfaces()
.iter()
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
} else {
Ok(pnet_datalink::interfaces()
.iter()
.filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
}
}
#[cfg(windows)]
{
let mut result = vec![];
unsafe {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?;

if addr.is_unspecified() {
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
result.push(ffi::pstr_to_string(iface.AdapterName));
next_iface = iface.Next.as_ref();
}
} else {
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref();
while let Some(ucast_addr) = next_ucast_addr {
if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) {
if ifaddr.ip() == addr {
result.push(ffi::pstr_to_string(iface.AdapterName));
}
}
next_ucast_addr = ucast_addr.Next.as_ref();
}
next_iface = iface.Next.as_ref();
}
}
}
Ok(result)
}
}

pub fn get_ipv4_ipaddrs() -> Vec<IpAddr> {
get_local_addresses()
.unwrap_or_else(|_| vec![])
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern crate alloc;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String};
use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use multicast::*;
Expand All @@ -43,6 +43,7 @@ pub struct Link {
pub mtu: u16,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
}

#[async_trait]
Expand Down Expand Up @@ -71,6 +72,7 @@ impl From<&LinkUnicast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: link.is_streamed(),
interfaces: link.get_interfaces(),
}
}
}
Expand All @@ -90,6 +92,7 @@ impl From<&LinkMulticast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: false,
interfaces: vec![],
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::{
fmt,
Expand Down Expand Up @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync {
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
fn is_streamed(&self) -> bool;
fn get_interfaces(&self) -> Vec<String>;
async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
async fn read(&self, buffer: &mut [u8]) -> ZResult<usize>;
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ impl LinkUnicastTrait for LinkUnicastQuic {
*QUIC_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ impl LinkUnicastTrait for LinkUnicastSerial {
*SERIAL_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
false
Expand Down
22 changes: 22 additions & 0 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ impl LinkUnicastTrait for LinkUnicastTcp {
*TCP_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
match zenoh_util::net::get_interfaces_by_addr(self.src_addr.ip()) {
Ok(interfaces) => {
log::debug!(
"get_interfaces for {:?}: {:?}",
self.src_addr.ip(),
interfaces
);
interfaces
}
Err(e) => {
log::error!(
"get_interfaces for {:?} failed: {:?}",
self.src_addr.ip(),
e
);
vec![]
}
}
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
6 changes: 6 additions & 0 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ impl LinkUnicastTrait for LinkUnicastTls {
*TLS_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
Loading
Loading