Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simulated Robstride actuator #34

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions actuator/rust/bindings/bindings.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class PyRobstrideMotorFeedback:
def __repr__(self) -> str:
...

@staticmethod
def create_feedback(can_id:int, position:float, velocity:float, torque:float, mode:str, faults:int) -> PyRobstrideMotorFeedback:
...


class PyRobstrideMotors:
def __new__(cls,port_name,motor_infos,verbose = ...): ...
Expand Down
39 changes: 30 additions & 9 deletions actuator/rust/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;
use pyo3_stub_gen::define_stub_info_gatherer;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use robstride::{
motor_mode_from_str as robstride_motor_mode_from_str,
motor_type_from_str as robstride_motor_type_from_str,
MotorControlParams as RobstrideMotorControlParams, MotorFeedback as RobstrideMotorFeedback,
MotorType as RobstrideMotorType, Motors as RobstrideMotors,
Expand Down Expand Up @@ -106,6 +107,27 @@ impl PyRobstrideMotorFeedback {
self.can_id, self.position, self.velocity, self.torque, self.mode, self.faults
))
}

#[staticmethod]
fn create_feedback(
can_id: u8,
position: f32,
velocity: f32,
torque: f32,
mode: String,
faults: u16,
) -> PyResult<Self> {
let feedback = RobstrideMotorFeedback {
can_id,
position,
velocity,
torque,
mode: robstride_motor_mode_from_str(mode.as_str())?,
faults,
};

Ok(feedback.into())
}
}

impl From<RobstrideMotorFeedback> for PyRobstrideMotorFeedback {
Expand Down Expand Up @@ -209,15 +231,14 @@ impl PyRobstrideMotorsSupervisor {
})
.collect::<PyResult<HashMap<u8, RobstrideMotorType>>>()?;

let controller =
RobstrideMotorsSupervisor::new(
&port_name,
&motor_infos,
verbose,
target_update_rate,
zero_on_init,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let controller = RobstrideMotorsSupervisor::new(
&port_name,
&motor_infos,
verbose,
target_update_rate,
zero_on_init,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;

Ok(PyRobstrideMotorsSupervisor { inner: controller })
}
Expand Down
21 changes: 20 additions & 1 deletion actuator/rust/robstride/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ pub const CAN_ID_MOTOR_DEFAULT: u8 = 0x7F;
pub const CAN_ID_BROADCAST: u8 = 0xFE;
pub const CAN_ID_DEBUG_UI: u8 = 0xFD;

pub const BAUDRATE: nix::sys::termios::BaudRate = nix::sys::termios::BaudRate::B921600;
#[cfg(target_os = "linux")]
pub const BAUD_RATE: nix::sys::termios::BaudRate = nix::sys::termios::BaudRate::B921600;

// WARNING: NOT A VALID BAUDRATE
// This is just a configuration to build on MacOS
#[cfg(target_os = "macos")]
pub const BAUD_RATE: nix::sys::termios::BaudRate = nix::sys::termios::BaudRate::B115200;

pub struct MotorConfig {
pub p_min: f32,
Expand Down Expand Up @@ -386,6 +392,19 @@ pub enum MotorType {
Type04,
}

pub fn motor_mode_from_str(s: &str) -> Result<MotorMode, std::io::Error> {
match s {
"Reset" => Ok(MotorMode::Reset),
"Cali" => Ok(MotorMode::Cali),
"Motor" => Ok(MotorMode::Motor),
"Brake" => Ok(MotorMode::Brake),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid motor mode",
)),
}
}

pub fn motor_type_from_str(s: &str) -> Result<MotorType, std::io::Error> {
match s {
"01" => Ok(MotorType::Type01),
Expand Down
9 changes: 8 additions & 1 deletion actuator/rust/robstride/src/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ use serialport::{SerialPort, TTYPort};
use std::os::unix::io::FromRawFd;
use std::time::Duration;

const BAUD_RATE: termios::BaudRate = termios::BaudRate::B921600;
#[cfg(target_os = "linux")]
pub const BAUD_RATE: nix::sys::termios::BaudRate = nix::sys::termios::BaudRate::B921600;

// WARNING: NOT A VALID BAUDRATE
// This is just a configuration to build on MacOS
#[cfg(target_os = "macos")]
pub const BAUD_RATE: nix::sys::termios::BaudRate = nix::sys::termios::BaudRate::B115200;

const TIMEOUT: Duration = Duration::from_millis(10);

pub fn init_serial_port(device: &str) -> Result<TTYPort, std::io::Error> {
Expand Down
Empty file added actuator/sim/__init__.py
Empty file.
213 changes: 213 additions & 0 deletions actuator/sim/robstride.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""Simulated Robstride motors."""

import time
from enum import Enum, auto

from actuator import RobstrideMotorControlParams, RobstrideMotorFeedback


class MotorType(Enum):
Type01 = auto()
Type02 = auto()
Type03 = auto()
Type04 = auto()

@classmethod
def from_str(cls, s: str) -> "MotorType":
try:
return cls[f"Type{s}"]
except KeyError:
raise ValueError(f"Invalid motor type: {s}")


class MotorConfig:
def __init__(
self,
p_min: float,
p_max: float,
v_min: float,
v_max: float,
kp_min: float,
kp_max: float,
kd_min: float,
kd_max: float,
t_min: float,
t_max: float,
zero_on_init: bool,
can_timeout_command: int,
can_timeout_factor: float,
) -> None:
self.p_min = p_min
self.p_max = p_max
self.v_min = v_min
self.v_max = v_max
self.kp_min = kp_min
self.kp_max = kp_max
self.kd_min = kd_min
self.kd_max = kd_max
self.t_min = t_min
self.t_max = t_max
self.zero_on_init = zero_on_init
self.can_timeout_command = can_timeout_command
self.can_timeout_factor = can_timeout_factor


ROBSTRIDE_CONFIGS = {
WT-MM marked this conversation as resolved.
Show resolved Hide resolved
MotorType.Type01: MotorConfig(
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=True,
can_timeout_command=0x200C,
can_timeout_factor=12000.0,
),
MotorType.Type02: MotorConfig(
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
MotorType.Type03: MotorConfig(
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
MotorType.Type04: MotorConfig(
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
}


class _MotorSim:
def __init__(self, motor_id: int) -> None:
self.motor_id = motor_id
self.position = 0.0
self.velocity = 0.0
self.torque = 0.0
self.kp = 0.0
self.kd = 0.0

def update_state(self, params: RobstrideMotorControlParams) -> None:
self.position = params.position
self.velocity = params.velocity
self.torque = params.torque
self.kp = params.kp
self.kd = params.kd

def get_feedback(self) -> RobstrideMotorFeedback:
feedback = RobstrideMotorFeedback.create_feedback(
self.motor_id, self.position, self.velocity, self.torque, "Motor", 0
)
return feedback


class RobstrideMotorsSim:
def __init__(self, port_name: str, motor_infos: dict[int, str], verbose: bool = False) -> None:
self.port_name = port_name
self.motor_configs = {
id: ROBSTRIDE_CONFIGS[MotorType.from_str(motor_type)] for id, motor_type in motor_infos.items()
}
self.verbose = verbose

self.motors = {motor_id: _MotorSim(motor_id) for motor_id in motor_infos} # Use MotorSim for each motor

def send_get_mode(self) -> dict[int, str]:
return {motor_id: "Motor" for motor_id in self.motors}

def send_resets(self) -> None:
for motor_id in self.motors:
self.motors[motor_id] = _MotorSim(motor_id)

def send_starts(self) -> None:
if self.verbose:
print("Motors started")

def send_motor_controls(
self, motor_controls: dict[int, RobstrideMotorControlParams], serial: bool
) -> dict[int, RobstrideMotorFeedback]:
feedback: dict[int, RobstrideMotorFeedback] = {}
for motor_id, params in motor_controls.items():
self.motors[motor_id].update_state(params)
feedback[motor_id] = self.motors[motor_id].get_feedback()
return feedback

def __repr__(self) -> str:
return f"RobstrideMotorsSim(port_name={self.port_name}, motor_configs={self.motor_configs})"


class RobstrideMotorsSupervisorSim:
def __init__(self, motor_infos: dict[int, str], verbose: bool = False) -> None:
self.verbose = verbose
self.delay = 0.01 # Default delay in seconds

self.target_params = {motor_id: RobstrideMotorControlParams(0, 0, 0, 0, 0) for motor_id in motor_infos}
self.latest_feedback = {
motor_id: RobstrideMotorFeedback.create_feedback(0, 0, 0, 0, "Reset", 0) for motor_id in motor_infos
}

self.running = True
self.motors_sim = RobstrideMotorsSim("virtual_port", motor_infos, verbose)
WT-MM marked this conversation as resolved.
Show resolved Hide resolved

def set_delay(self, delay: float) -> None:
"""Set the delay for blocking communication methods."""
self.delay = delay

def send_motor_controls(
self, motor_controls: dict[int, RobstrideMotorControlParams]
) -> dict[int, RobstrideMotorFeedback]:
"""Simulate sending motor controls with a blocking delay."""
time.sleep(self.delay) # Simulate blocking communication
feedback = self.motors_sim.send_motor_controls(motor_controls, serial=False)
self.latest_feedback.update(feedback)
return self.latest_feedback.copy()

def get_latest_feedback(self) -> dict[int, RobstrideMotorFeedback]:
"""Get the latest feedback from the motors."""
return self.latest_feedback.copy()

def stop(self) -> None:
"""Stop the supervisor."""
self.running = False
self.motors_sim.send_resets()

def is_running(self) -> bool:
"""Check if the supervisor is running."""
return self.running
Loading
Loading