Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
WT-MM committed Oct 22, 2024
1 parent 57bde99 commit 447e8fe
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 77 deletions.
20 changes: 10 additions & 10 deletions actuator/rust/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use pyo3::prelude::*;
use pyo3_stub_gen::define_stub_info_gatherer;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use robstride::{
motor_type_from_str as robstride_motor_type_from_str, motor_mode_from_str as robstride_motor_mode_from_str,
motor_mode_from_str as robstride_motor_mode_from_str,
motor_type_from_str as robstride_motor_type_from_str,
MotorControlParams as RobstrideMotorControlParams, MotorFeedback as RobstrideMotorFeedback,
MotorType as RobstrideMotorType, Motors as RobstrideMotors,
MotorsSupervisor as RobstrideMotorsSupervisor,
Expand Down Expand Up @@ -230,15 +231,14 @@ impl PyRobstrideMotorsSupervisor {
})
.collect::<PyResult<HashMap<u8, RobstrideMotorType>>>()?;

let controller =
RobstrideMotorsSupervisor::new(
&port_name,
&motor_infos,
verbose,
target_update_rate,
zero_on_init,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let controller = RobstrideMotorsSupervisor::new(
&port_name,
&motor_infos,
verbose,
target_update_rate,
zero_on_init,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;

Ok(PyRobstrideMotorsSupervisor { inner: controller })
}
Expand Down
137 changes: 82 additions & 55 deletions actuator/sim/robstride.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Simulated Robstride motors."""

import time
from enum import Enum, auto

Expand All @@ -12,27 +13,30 @@ class MotorType(Enum):
Type04 = auto()

@classmethod
def from_str(cls, s: str) -> 'MotorType':
def from_str(cls, s: str) -> "MotorType":
try:
return cls[f"Type{s}"]
except KeyError:
raise ValueError(f"Invalid motor type: {s}")


class MotorConfig:
def __init__(self,
p_min: float,
p_max: float,
v_min: float,
v_max: float,
kp_min: float,
kp_max: float,
kd_min: float,
kd_max: float,
t_min: float,
t_max: float,
zero_on_init: bool,
can_timeout_command: int,
can_timeout_factor: float) -> None:
def __init__(
self,
p_min: float,
p_max: float,
v_min: float,
v_max: float,
kp_min: float,
kp_max: float,
kd_min: float,
kd_max: float,
t_min: float,
t_max: float,
zero_on_init: bool,
can_timeout_command: int,
can_timeout_factor: float,
) -> None:
self.p_min = p_min
self.p_max = p_max
self.v_min = v_min
Expand All @@ -47,49 +51,71 @@ def __init__(self,
self.can_timeout_command = can_timeout_command
self.can_timeout_factor = can_timeout_factor


ROBSTRIDE_CONFIGS = {
MotorType.Type01: MotorConfig(
p_min=-12.5, p_max=12.5,
v_min=-44.0, v_max=44.0,
kp_min=0.0, kp_max=500.0,
kd_min=0.0, kd_max=5.0,
t_min=-12.0, t_max=12.0,
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=True,
can_timeout_command=0x200c,
can_timeout_factor=12000.0
can_timeout_command=0x200C,
can_timeout_factor=12000.0,
),
MotorType.Type02: MotorConfig(
p_min=-12.5, p_max=12.5,
v_min=-44.0, v_max=44.0,
kp_min=0.0, kp_max=500.0,
kd_min=0.0, kd_max=5.0,
t_min=-12.0, t_max=12.0,
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200b,
can_timeout_factor=12000.0
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
MotorType.Type03: MotorConfig(
p_min=-12.5, p_max=12.5,
v_min=-44.0, v_max=44.0,
kp_min=0.0, kp_max=500.0,
kd_min=0.0, kd_max=5.0,
t_min=-12.0, t_max=12.0,
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200b,
can_timeout_factor=12000.0
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
MotorType.Type04: MotorConfig(
p_min=-12.5, p_max=12.5,
v_min=-44.0, v_max=44.0,
kp_min=0.0, kp_max=500.0,
kd_min=0.0, kd_max=5.0,
t_min=-12.0, t_max=12.0,
p_min=-12.5,
p_max=12.5,
v_min=-44.0,
v_max=44.0,
kp_min=0.0,
kp_max=500.0,
kd_min=0.0,
kd_max=5.0,
t_min=-12.0,
t_max=12.0,
zero_on_init=False,
can_timeout_command=0x200b,
can_timeout_factor=12000.0
)
can_timeout_command=0x200B,
can_timeout_factor=12000.0,
),
}


class _MotorSim:
def __init__(self, motor_id: int) -> None:
self.motor_id = motor_id
Expand All @@ -108,12 +134,7 @@ def update_state(self, params: RobstrideMotorControlParams) -> None:

def get_feedback(self) -> RobstrideMotorFeedback:
feedback = RobstrideMotorFeedback.create_feedback(
self.motor_id,
self.position,
self.velocity,
self.torque,
"Motor",
0
self.motor_id, self.position, self.velocity, self.torque, "Motor", 0
)
return feedback

Expand All @@ -122,8 +143,7 @@ class RobstrideMotorsSim:
def __init__(self, port_name: str, motor_infos: dict[int, str], verbose: bool = False) -> None:
self.port_name = port_name
self.motor_configs = {
id: ROBSTRIDE_CONFIGS[MotorType.from_str(motor_type)]
for id, motor_type in motor_infos.items()
id: ROBSTRIDE_CONFIGS[MotorType.from_str(motor_type)] for id, motor_type in motor_infos.items()
}
self.verbose = verbose

Expand All @@ -140,7 +160,9 @@ def send_starts(self) -> None:
if self.verbose:
print("Motors started")

def send_motor_controls(self, motor_controls: dict[int, RobstrideMotorControlParams], serial: bool) -> dict[int, RobstrideMotorFeedback]:
def send_motor_controls(
self, motor_controls: dict[int, RobstrideMotorControlParams], serial: bool
) -> dict[int, RobstrideMotorFeedback]:
feedback: dict[int, RobstrideMotorFeedback] = {}
for motor_id, params in motor_controls.items():
self.motors[motor_id].update_state(params)
Expand All @@ -150,13 +172,16 @@ def send_motor_controls(self, motor_controls: dict[int, RobstrideMotorControlPar
def __repr__(self) -> str:
return f"RobstrideMotorsSim(port_name={self.port_name}, motor_configs={self.motor_configs})"


class RobstrideMotorsSupervisorSim:
def __init__(self, motor_infos: dict[int, str], verbose: bool = False) -> None:
self.verbose = verbose
self.delay = 0.01 # Default delay in seconds

self.target_params = {motor_id: RobstrideMotorControlParams(0, 0, 0, 0, 0) for motor_id in motor_infos}
self.latest_feedback = {motor_id: RobstrideMotorFeedback.create_feedback(0, 0, 0, 0, "Reset", 0) for motor_id in motor_infos}
self.latest_feedback = {
motor_id: RobstrideMotorFeedback.create_feedback(0, 0, 0, 0, "Reset", 0) for motor_id in motor_infos
}

self.running = True
self.motors_sim = RobstrideMotorsSim("virtual_port", motor_infos, verbose)
Expand All @@ -165,7 +190,9 @@ def set_delay(self, delay: float) -> None:
"""Set the delay for blocking communication methods."""
self.delay = delay

def send_motor_controls(self, motor_controls: dict[int, RobstrideMotorControlParams]) -> dict[int, RobstrideMotorFeedback]:
def send_motor_controls(
self, motor_controls: dict[int, RobstrideMotorControlParams]
) -> dict[int, RobstrideMotorFeedback]:
"""Simulate sending motor controls with a blocking delay."""
time.sleep(self.delay) # Simulate blocking communication
feedback = self.motors_sim.send_motor_controls(motor_controls, serial=False)
Expand Down
25 changes: 13 additions & 12 deletions examples/sim.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Example use of simulated actuators."""

import math
import time

Expand All @@ -8,10 +9,7 @@

def test_robstride_motors_sim() -> None:
# Define motor information
motor_infos = {
1: "01",
2: "02"
}
motor_infos = {1: "01", 2: "02"}

# Initialize the simulator
motors_sim = RobstrideMotorsSim("virtual_port", motor_infos, verbose=True)
Expand All @@ -22,7 +20,7 @@ def test_robstride_motors_sim() -> None:
# Define control parameters for the motors
motor_controls = {
1: RobstrideMotorControlParams(position=1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0),
2: RobstrideMotorControlParams(position=-1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0)
2: RobstrideMotorControlParams(position=-1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0),
}

# Send motor controls
Expand All @@ -32,12 +30,10 @@ def test_robstride_motors_sim() -> None:
# Send reset command
motors_sim.send_resets()


def test_robstride_motors_supervisor_sim() -> None:
# Define motor information
motor_infos = {
1: "01",
2: "02"
}
motor_infos = {1: "01", 2: "02"}

# Initialize the supervisor simulator
supervisor_sim = RobstrideMotorsSupervisorSim(motor_infos, verbose=True)
Expand All @@ -48,7 +44,7 @@ def test_robstride_motors_supervisor_sim() -> None:
# Define control parameters for the motors
motor_controls = {
1: RobstrideMotorControlParams(position=1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0),
2: RobstrideMotorControlParams(position=-1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0)
2: RobstrideMotorControlParams(position=-1.0, velocity=0.0, kp=10.0, kd=1.0, torque=0.0),
}

start_time = time.time()
Expand All @@ -57,8 +53,12 @@ def test_robstride_motors_supervisor_sim() -> None:
while supervisor_sim.is_running():
# Sinusoidal set point
goal_pos = math.sin(2 * math.pi * (time.time() - start_time) / 5.0)
motor_controls[1] = RobstrideMotorControlParams(position=goal_pos, velocity=0.0, kp=10.0, kd=1.0, torque=0.0)
motor_controls[2] = RobstrideMotorControlParams(position=-goal_pos, velocity=0.0, kp=10.0, kd=1.0, torque=0.0)
motor_controls[1] = RobstrideMotorControlParams(
position=goal_pos, velocity=0.0, kp=10.0, kd=1.0, torque=0.0
)
motor_controls[2] = RobstrideMotorControlParams(
position=-goal_pos, velocity=0.0, kp=10.0, kd=1.0, torque=0.0
)
# Send motor controls
feedback = supervisor_sim.send_motor_controls(motor_controls)
print("Feedback:", feedback)
Expand All @@ -71,6 +71,7 @@ def test_robstride_motors_supervisor_sim() -> None:
# Stop the supervisor
supervisor_sim.stop()


if __name__ == "__main__":
print("Testing Robstride Motors Sim")
test_robstride_motors_sim()
Expand Down

0 comments on commit 447e8fe

Please sign in to comment.