Skip to content

Commit

Permalink
Implemented cross-clock synchronization for hardware timestamping.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Sep 28, 2023
1 parent 56e67d2 commit 0458544
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion statime-linux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ rand = { version = "0.8.5", default-features = false, features = ["std", "std_rn
serde = { version = "1.0.188", features = ["derive"] }


clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "3ab6721" }
clock-steering = { git = "https://github.com/pendulum-project/clock-steering.git", rev = "46974d3" }
timestamped-socket = { git = "https://github.com/pendulum-project/timestamped-socket.git", rev = "021bebb", features = ["serde"] }
12 changes: 1 addition & 11 deletions statime-linux/src/clock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use statime::{Clock, Duration, Time, TimePropertiesDS};

#[derive(Debug, Clone)]
pub struct LinuxClock {
clock: clock_steering::unix::UnixClock,
pub clock: clock_steering::unix::UnixClock,
}

impl LinuxClock {
Expand All @@ -20,16 +20,6 @@ impl LinuxClock {

Ok(Self { clock })
}

pub fn timespec(&self) -> std::io::Result<libc::timespec> {
use clock_steering::Clock;

let now = self.clock.now()?;
Ok(libc::timespec {
tv_sec: now.seconds,
tv_nsec: now.nanos as _,
})
}
}

impl clock_steering::Clock for LinuxClock {
Expand Down
3 changes: 2 additions & 1 deletion statime-linux/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct Config {
pub domain: u8,
pub priority1: u8,
pub priority2: u8,
pub hardware_clock: Option<String>,
#[serde(rename = "port")]
pub ports: Vec<PortConfig>,
}
Expand All @@ -22,6 +21,8 @@ pub struct Config {
pub struct PortConfig {
pub interface: InterfaceName,
#[serde(default)]
pub hardware_clock: Option<String>,
#[serde(default)]
pub network_mode: NetworkMode,
pub announce_interval: i8,
pub sync_interval: i8,
Expand Down
165 changes: 151 additions & 14 deletions statime-linux/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::HashMap,
future::Future,
path::PathBuf,
pin::{pin, Pin},
Expand All @@ -9,8 +10,9 @@ use clap::Parser;
use fern::colors::Color;
use rand::{rngs::StdRng, SeedableRng};
use statime::{
BasicFilter, ClockIdentity, InBmca, InstanceConfig, Port, PortAction, PortActionIterator,
PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext, MAX_DATA_LEN,
BasicFilter, ClockIdentity, Filter, InBmca, InstanceConfig, Measurement, Port, PortAction,
PortActionIterator, PtpInstance, SdoId, Time, TimePropertiesDS, TimeSource, TimestampContext,
MAX_DATA_LEN,
};
use statime_linux::{
clock::LinuxClock,
Expand All @@ -23,7 +25,7 @@ use statime_linux::{
use timestamped_socket::{
interface::InterfaceIterator,
networkaddress::NetworkAddress,
socket::{Open, Socket},
socket::{InterfaceTimestampMode, Open, Socket},
};
use tokio::{
sync::mpsc::{Receiver, Sender},
Expand Down Expand Up @@ -152,6 +154,102 @@ impl Future for Timer {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
enum ClockSyncMode {
#[default]
FromSystem,
ToSystem,
}

fn start_clock_task(clock: LinuxClock) -> tokio::sync::watch::Sender<ClockSyncMode> {
let (mode_sender, mode_receiver) = tokio::sync::watch::channel(ClockSyncMode::FromSystem);

tokio::spawn(clock_task(clock, mode_receiver));

mode_sender
}

fn clock_timestamp_to_time(t: clock_steering::Timestamp) -> statime::Time {
Time::from_nanos((t.seconds as u64) * 1_000_000_000 + (t.nanos as u64))
}

async fn clock_task(
clock: LinuxClock,
mut mode_receiver: tokio::sync::watch::Receiver<ClockSyncMode>,
) {
let mut measurement_timer = pin!(Timer::new());
let mut update_timer = pin!(Timer::new());

measurement_timer.as_mut().reset(std::time::Duration::ZERO);

let mut filter = BasicFilter::new(0.25);

let mut current_mode = *mode_receiver.borrow_and_update();
let mut filter_clock = match current_mode {
ClockSyncMode::FromSystem => clock.clone(),
ClockSyncMode::ToSystem => LinuxClock::CLOCK_REALTIME,
};
loop {
tokio::select! {
() = &mut measurement_timer => {
let (t1, t2, t3) = clock.clock.system_offset().expect("Unable to determine offset from system clock");
let t1 = clock_timestamp_to_time(t1);
let t2 = clock_timestamp_to_time(t2);
let t3 = clock_timestamp_to_time(t3);

log::debug!("Interclock measurement: {} {} {}", t1, t2, t3);

let delay = (t3-t1)/2;
let offset_a = t2 - t1;
let offset_b = t3 - t2;

let m = match current_mode {
ClockSyncMode::FromSystem => Measurement {
event_time: t2,
offset: Some(offset_a - delay),
delay: Some(delay),
raw_sync_offset: Some(offset_a),
raw_delay_offset: Some(-offset_b),
},
ClockSyncMode::ToSystem => Measurement {
event_time: t1+delay,
offset: Some(offset_b - delay),
delay: Some(delay),
raw_sync_offset: Some(offset_b),
raw_delay_offset: Some(-offset_a),
},
};

let update = filter.measurement(m, &mut filter_clock);
if let Some(timeout) = update.next_update {
update_timer.as_mut().reset(timeout);
}

measurement_timer.as_mut().reset(std::time::Duration::from_millis(250));
}
() = &mut update_timer => {
let update = filter.update(&mut filter_clock);
if let Some(timeout) = update.next_update {
update_timer.as_mut().reset(timeout);
}
}
_ = mode_receiver.changed() => {
let new_mode = *mode_receiver.borrow_and_update();
if new_mode != current_mode {
let mut new_filter = BasicFilter::new(0.25);
std::mem::swap(&mut filter, &mut new_filter);
new_filter.demobilize(&mut filter_clock);
match new_mode {
ClockSyncMode::FromSystem => filter_clock = clock.clone(),
ClockSyncMode::ToSystem => filter_clock = LinuxClock::CLOCK_REALTIME,
}
current_mode = new_mode;
}
}
}
}
}

#[tokio::main]
async fn main() {
actual_main().await;
Expand All @@ -170,12 +268,6 @@ async fn actual_main() {
let log_level = log::LevelFilter::from_str(&config.loglevel).unwrap();
setup_logger(log_level).expect("could not setup logging");

let local_clock = if let Some(hardware_clock) = config.hardware_clock {
LinuxClock::open(hardware_clock).expect("could not open hardware clock")
} else {
LinuxClock::CLOCK_REALTIME
};

let clock_identity = ClockIdentity(get_clock_id().expect("could not get clock identity"));

let instance_config = InstanceConfig {
Expand All @@ -202,11 +294,37 @@ async fn actual_main() {
let mut main_task_senders = Vec::with_capacity(config.ports.len());
let mut main_task_receivers = Vec::with_capacity(config.ports.len());

let mut internal_sync_senders = vec![];

let mut clock_name_map = HashMap::new();
let mut clock_port_map = Vec::with_capacity(config.ports.len());

for port_config in config.ports {
let interface = port_config.interface;
let network_mode = port_config.network_mode;
let (port_clock, timestamping) = match &port_config.hardware_clock {
Some(path) => {
let clock = LinuxClock::open(path).expect("Unable to open clock");
if let Some(id) = clock_name_map.get(path) {
clock_port_map.push(Some(*id));
} else {
let id = internal_sync_senders.len();
clock_port_map.push(Some(id));
clock_name_map.insert(path.clone(), id);
internal_sync_senders.push(start_clock_task(clock.clone()));
}
(clock, InterfaceTimestampMode::HardwarePTPAll)
}
None => {
clock_port_map.push(None);
(
LinuxClock::CLOCK_REALTIME,
InterfaceTimestampMode::SoftwareAll,
)
}
};
let rng = StdRng::from_entropy();
let port = instance.add_port(port_config.into(), 0.25, local_clock.clone(), rng);
let port = instance.add_port(port_config.into(), 0.25, port_clock, rng);

let (main_task_sender, port_task_receiver) = tokio::sync::mpsc::channel(1);
let (port_task_sender, main_task_receiver) = tokio::sync::mpsc::channel(1);
Expand All @@ -221,8 +339,8 @@ async fn actual_main() {

match network_mode {
statime_linux::config::NetworkMode::Ipv4 => {
let event_socket =
open_ipv4_event_socket(interface).expect("Could not open event socket");
let event_socket = open_ipv4_event_socket(interface, timestamping)
.expect("Could not open event socket");
let general_socket =
open_ipv4_general_socket(interface).expect("Could not open general socket");

Expand All @@ -235,8 +353,8 @@ async fn actual_main() {
));
}
statime_linux::config::NetworkMode::Ipv6 => {
let event_socket =
open_ipv6_event_socket(interface).expect("Could not open event socket");
let event_socket = open_ipv6_event_socket(interface, timestamping)
.expect("Could not open event socket");
let general_socket =
open_ipv6_general_socket(interface).expect("Could not open general socket");

Expand All @@ -256,6 +374,8 @@ async fn actual_main() {
bmca_notify_sender,
main_task_receivers,
main_task_senders,
internal_sync_senders,
clock_port_map,
)
.await
}
Expand All @@ -265,6 +385,8 @@ async fn run(
bmca_notify_sender: tokio::sync::watch::Sender<bool>,
mut main_task_receivers: Vec<Receiver<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
main_task_senders: Vec<Sender<Port<InBmca<'static>, StdRng, LinuxClock, BasicFilter>>>,
internal_sync_senders: Vec<tokio::sync::watch::Sender<ClockSyncMode>>,
clock_port_map: Vec<Option<usize>>,
) -> ! {
// run bmca over all of the ports at the same time. The ports don't perform
// their normal actions at this time: bmca is stop-the-world!
Expand Down Expand Up @@ -300,6 +422,21 @@ async fn run(

instance.bmca(&mut mut_bmca_ports);

let mut clock_states: Vec<_> = internal_sync_senders
.iter()
.map(|_| ClockSyncMode::FromSystem)
.collect();
for (idx, port) in mut_bmca_ports.iter().enumerate() {
if port.is_steering() {
if let Some(id) = clock_port_map[idx] {
clock_states[id] = ClockSyncMode::ToSystem;
}
}
}
for (mode, sender) in clock_states.into_iter().zip(internal_sync_senders.iter()) {
sender.send(mode).expect("Clock mode change failed");
}

drop(mut_bmca_ports);

for (port, sender) in bmca_ports.into_iter().zip(main_task_senders.iter()) {
Expand Down
6 changes: 4 additions & 2 deletions statime-linux/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ impl PtpTargetAddress for SocketAddrV6 {

pub fn open_ipv4_event_socket(
interface: InterfaceName,
timestamping: InterfaceTimestampMode,
) -> std::io::Result<Socket<SocketAddrV4, Open>> {
let socket = open_interface_udp4(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?;
let socket = open_interface_udp4(interface, EVENT_PORT, timestamping)?;
socket.join_multicast(SocketAddrV4::new(IPV4_PRIMARY_MULTICAST, 0))?;
socket.join_multicast(SocketAddrV4::new(IPV4_PDELAY_MULTICAST, 0))?;
Ok(socket)
Expand All @@ -60,8 +61,9 @@ pub fn open_ipv4_general_socket(

pub fn open_ipv6_event_socket(
interface: InterfaceName,
timestamping: InterfaceTimestampMode,
) -> std::io::Result<Socket<SocketAddrV6, Open>> {
let socket = open_interface_udp6(interface, EVENT_PORT, InterfaceTimestampMode::SoftwareAll)?;
let socket = open_interface_udp6(interface, EVENT_PORT, timestamping)?;
socket.join_multicast(SocketAddrV6::new(IPV6_PRIMARY_MULTICAST, 0, 0, 0))?;
socket.join_multicast(SocketAddrV6::new(IPV6_PDELAY_MULTICAST, 0, 0, 0))?;
Ok(socket)
Expand Down

0 comments on commit 0458544

Please sign in to comment.