diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index dd12acf..e2d4aa3 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -38,3 +38,19 @@ jobs: echo "$formatted_files" exit 1 fi + test: + runs-on: ubuntu-latest + steps: + - name: ๐Ÿš€ Checkout + uses: actions/checkout@v4 + - name: ๐ŸŒฟ Setup Bazel + uses: bazel-contrib/setup-bazel@0.8.0 + with: + bazelisk-cache: true + disk-cache: ${{ github.workflow }} + repository-cache: true + bazelrc: | + build --color=yes + build --show_timestamps + - name: ๐Ÿงช Run Tests + run: bazel test //src/... diff --git a/BUILD.bazel b/BUILD.bazel index a7c10f9..9a323e1 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -12,15 +12,15 @@ compile_pip_requirements( # Set up commands to build all images in the project command( - name = "api_v1_img", - command = "//src/api/v1:tarball", + name = "api_v2_img", + command = "//src/api/v2:tarball", visibility = ["//visibility:public"], ) multirun( name = "build_all_imgs", commands = [ - ":api_v1_img", + ":api_v2_img", ], jobs = 0, ) diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 6c6c56b..20f57f2 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -3973,7 +3973,7 @@ "os:linux,arch:amd64": { "bzlTransitiveDigest": "O6takntg2fy8btRIFrv9hsGqQu89+vUPw5N/OGj+EuI=", "recordedFileInputs": { - "@@//requirements_lock.txt": "76f9aa2120c2d28e6160be8d9a73b3a2510209fd5f4f20e659b0aca1fc0e7d4e" + "@@//requirements_lock.txt": "7f619226d8d5919afa9c944dbbecb48c08251f8ce7838b5c0d2a87db13e7d81f" }, "recordedDirentsInputs": {}, "envVariables": {}, diff --git a/requirements.in b/requirements.in index 061757c..7eec5ea 100644 --- a/requirements.in +++ b/requirements.in @@ -1,6 +1,7 @@ asyncio fastapi jinja2 +numpy opencv-python uvicorn[standard] websockets \ No newline at end of file diff --git a/requirements_lock.txt b/requirements_lock.txt index 221d807..2f1ffd1 100644 --- a/requirements_lock.txt +++ b/requirements_lock.txt @@ -177,7 +177,9 @@ numpy==1.26.4 \ --hash=sha256:edd8b5fe47dab091176d21bb6de568acdd906d1887a4584a15a9a96a1dca06ef \ --hash=sha256:f870204a840a60da0b12273ef34f7051e98c3b5961b61b0c2c1be6dfd64fbcd3 \ --hash=sha256:ffa75af20b44f8dba823498024771d5ac50620e6915abac414251bd971b4529f - # via opencv-python + # via + # -r requirements.in + # opencv-python opencv-python==4.9.0.80 \ --hash=sha256:1a9f0e6267de3a1a1db0c54213d022c7c8b5b9ca4b580e80bdc58516c922c9e1 \ --hash=sha256:3f16f08e02b2a2da44259c7cc712e779eff1dd8b55fdb0323e8cab09548086c0 \ diff --git a/src/api/v1/api.py b/src/api/v1/api.py index 114a458..8c10512 100644 --- a/src/api/v1/api.py +++ b/src/api/v1/api.py @@ -27,7 +27,9 @@ motors = [ DynamixelMotor(10, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0)), - DynamixelMotor(11, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0), True), + DynamixelMotor(11, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0)), + DynamixelMotor(12, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0), True), + DynamixelMotor(13, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0), True), ] ctrl = RobotControl("/dev/ttyUSB0", 1, motors) ctrl.init(1000000) diff --git a/src/api/v1/robot.py b/src/api/v1/robot.py index 9f84c8e..4561830 100644 --- a/src/api/v1/robot.py +++ b/src/api/v1/robot.py @@ -120,6 +120,9 @@ def __init__( self.initialized = False def __validate_command(self): + """ + Run validation against the motor command + """ if not self.initialized: raise Exception("Motor needs to be initialized before commanding") @@ -182,10 +185,15 @@ def __read_bytes(self, offset: int, size: int) -> Optional[int]: def __get_memory_property(self, property: str) -> int: self.__validate_command() - value = self.__read_bytes( - getattr(self.address_config, property).byte_offset, - getattr(self.address_config, property).byte_size, - ) + try: + config: MemorySegment = getattr(self.address_config, property) + except: + # Swallow the missing attribute error and throw a NotImplementedError for the motor instead + raise NotImplementedError( + f"FATAL | DXID: '{self.id}' | Motor does not support {property} attribute!" + ) + + value = self.__read_bytes(config.byte_offset, config.byte_size) if value is None: raise Exception(f"FATAL | DXID: '{self.id}' | Value could not be read!") @@ -195,9 +203,17 @@ def __get_memory_property(self, property: str) -> int: def __set_memory_property(self, property: str, value: int): self.__validate_command() + try: + config: MemorySegment = getattr(self.address_config, property) + except: + # Swallow the missing attribute error and throw a NotImplementedError for the motor instead + raise NotImplementedError( + f"FATAL | DXID: '{self.id}' | Motor does not support {property} attribute!" + ) + self.__write_bytes( - getattr(self.address_config, property).byte_offset, - getattr(self.address_config, property).byte_size, + config.byte_offset, + config.byte_size, value, ) @@ -375,12 +391,27 @@ def set_velocity(self, command: Position2D) -> bool: math.pi / 2 ) - for motor in self.motors: + # rear left + motor_0_speed = min(max(clamped_y - clamped_x - command.rotation.theta, -1), 1) + + # front left + motor_1_speed = min(max(clamped_y + clamped_x - command.rotation.theta, -1), 1) + + # front right + motor_2_speed = min(max(clamped_y - clamped_x - command.rotation.theta, -1), 1) + + # rear right + motor_3_speed = min(max(clamped_y + clamped_x - command.rotation.theta, -1), 1) + + speeds = [motor_0_speed, motor_1_speed, motor_2_speed, motor_3_speed] + + for i, motor in enumerate(self.motors): + speed = speeds[i] # TODO actual kinematics here - if clamped_y < 0: - motor.set_moving_speed((abs(clamped_y) * 1023) + 1023) + if speed < 0: + motor.set_moving_speed((abs(speed) * 1023) + 1023) else: - motor.set_moving_speed(clamped_y * 1023) + motor.set_moving_speed(speed * 1023) return True diff --git a/src/api/v2/api.py b/src/api/v2/api.py index 870ef9d..b679c16 100644 --- a/src/api/v2/api.py +++ b/src/api/v2/api.py @@ -1,27 +1,41 @@ -from typing import Union from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from websockets.exceptions import ConnectionClosed from fastapi.templating import Jinja2Templates +from fastapi.middleware.cors import CORSMiddleware +from robot import ( + Position2D, + RobotControl, + DynamixelMotor, + DYNAMIXEL_MX_12_ADDR_CONFIG, +) import asyncio import cv2 app = FastAPI() -camera = cv2.VideoCapture(0, cv2.CAP_DSHOW) +camera = cv2.VideoCapture("/dev/video2") templates = Jinja2Templates(directory="templates") +origins = [ + "http://localhost:3000", +] -@app.get("/") -def read_root(): - return {"Hello": "World"} +app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], # Allow all HTTP methods + allow_headers=["*"], # Allow all headers +) - -@app.get("/items/{item_id}") -def read_item(item_id: int, q: Union[str, None] = None): - return {"item_id": item_id, "q": q} +motors = [ + DynamixelMotor(10, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0)), + DynamixelMotor(11, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0), True), +] +ctrl = RobotControl("/dev/ttyUSB0", 1, motors) +ctrl.init(1000000) # https://stackoverflow.com/a/70626324 -# Not actually sure if this works with bazel @app.websocket("/ws") async def get_stream(websocket: WebSocket): await websocket.accept() @@ -36,3 +50,15 @@ async def get_stream(websocket: WebSocket): await asyncio.sleep(0.03) except (WebSocketDisconnect, ConnectionClosed): print("Client disconnected") + + +@app.post("/command") +async def command(request: Request): + data = await request.json() + + x_val = -data.get("left", 0) + data.get("right", 0) + y_val = -data.get("down", 0) + data.get("up", 0) + + ctrl.set_velocity(Position2D(x_val, y_val, 0)) + + return {"status": "success"} diff --git a/src/robot/v1/BUILD.bazel b/src/robot/v1/BUILD.bazel new file mode 100644 index 0000000..c1429aa --- /dev/null +++ b/src/robot/v1/BUILD.bazel @@ -0,0 +1,18 @@ +load("@rules_python//python:defs.bzl", "py_library") + +# Put all robot files in this library +py_library( + name = "robot_src", + srcs = [ + "config.py", + "robot.py", + "structs.py", + ], + deps = [ + "@pypi//fastapi:pkg", + "@pypi//jinja2:pkg", + "@pypi//numpy:pkg", + "@pypi//opencv_python:pkg", + "@pypi//websockets:pkg", + ], +) diff --git a/src/robot/v1/config.py b/src/robot/v1/config.py new file mode 100644 index 0000000..d5ce2a0 --- /dev/null +++ b/src/robot/v1/config.py @@ -0,0 +1,24 @@ +from structs import DynamixelMotorAddressConfig, MemorySegment + +# Add all Dynamixel Motor Configs here +DYNAMIXEL_MX_12_ADDR_CONFIG = DynamixelMotorAddressConfig( + torque_enable=MemorySegment(24, 1), + led_enable=MemorySegment(25, 1), + d_gain=MemorySegment(26, 1), + i_gain=MemorySegment(27, 1), + p_gain=MemorySegment(28, 1), + goal_position=MemorySegment(30, 2), + moving_speed=MemorySegment(32, 2), + torque_limit=MemorySegment(34, 2), + present_position=MemorySegment(36, 2), + present_speed=MemorySegment(38, 2), + present_load=MemorySegment(40, 2), + present_input_voltage=MemorySegment(42, 1), + present_temperature=MemorySegment(43, 1), + registered=MemorySegment(44, 1), + moving=MemorySegment(46, 1), + lock=MemorySegment(47, 1), + punch=MemorySegment(48, 2), + realtime_tick=MemorySegment(50, 2), + goal_acceleration=MemorySegment(73, 1), +) diff --git a/src/robot/v1/robot.py b/src/robot/v1/robot.py new file mode 100644 index 0000000..81ef8c5 --- /dev/null +++ b/src/robot/v1/robot.py @@ -0,0 +1,325 @@ +# from dynamixel_sdk import * +import math +from typing import Optional +import dynamixel_sdk as dynamixel +import logging + +logger = logging.getLogger(__file__) + + +class DynamixelMotor: + def __init__( + self, + id: int, + address_config: MotorAddressConfig, + position: Position2D, + inverted: bool = False, + ): + self.id = id + self.address_config: MotorAddressConfig = address_config + self.position = position + self.inverted = inverted + self.port_handler = None + self.packet_handler = None + self.initialized = False + + def __validate_command(self): + """ + Run validation against the motor command + """ + if not self.initialized: + raise Exception("Motor needs to be initialized before commanding") + + def __write_bytes(self, offset: int, size: int, value: int): + if size == 1: + dxl_comm_result, dxl_error = self.packet_handler.write1ByteTxRx( + self.port_handler, self.id, offset, value + ) + elif size == 2: + dxl_comm_result, dxl_error = self.packet_handler.write2ByteTxRx( + self.port_handler, self.id, offset, value + ) + elif size == 4: + dxl_comm_result, dxl_error = self.packet_handler.write4ByteTxRx( + self.port_handler, self.id, offset, value + ) + else: + raise NotImplementedError + + if dxl_comm_result != dynamixel.COMM_SUCCESS: + logger.error( + f"FAILURE | DYNAMIXEL ID: '{self.id}' | WriteBytesDXLComm: '{self.packet_handler.getTxRxResult(dxl_comm_result)}'" + ) + return False + elif dxl_error != 0: + logger.error( + f"FAILURE | DYNAMIXEL ID: '{self.id}' | WriteBytesDXLError: '{self.packet_handler.getRxPacketError(dxl_error)}'" + ) + return False + return True + + def __read_bytes(self, offset: int, size: int) -> Optional[int]: + if size == 1: + dxl_value, dxl_comm_result, dxl_error = self.packet_handler.read1ByteTxRx( + self.port_handler, self.id, offset + ) + elif size == 2: + dxl_value, dxl_comm_result, dxl_error = self.packet_handler.read2ByteTxRx( + self.port_handler, self.id, offset + ) + elif size == 4: + dxl_value, dxl_comm_result, dxl_error = self.packet_handler.read4ByteTxRx( + self.port_handler, self.id, offset + ) + else: + raise NotImplementedError + + if dxl_comm_result != dynamixel.COMM_SUCCESS: + logger.error( + f"FAILURE | DXID: '{self.id}' | ReadBytesDXLComm: '{self.packet_handler.getTxRxResult(dxl_comm_result)}'" + ) + return False + elif dxl_error != 0: + logger.error( + f"FAILURE | DXID: '{self.id}' | ReadBytesDXLError: '{self.packet_handler.getRxPacketError(dxl_error)}'" + ) + return None + return dxl_value + + def __get_memory_property(self, property: str) -> int: + self.__validate_command() + + try: + config: MemorySegment = getattr(self.address_config, property) + except: + # Swallow the missing attribute error and throw a NotImplementedError for the motor instead + raise NotImplementedError( + f"FATAL | DXID: '{self.id}' | Motor does not support {property} attribute!" + ) + + value = self.__read_bytes(config.byte_offset, config.byte_size) + + if value is None: + raise Exception(f"FATAL | DXID: '{self.id}' | Value could not be read!") + + return value + + def __set_memory_property(self, property: str, value: int): + self.__validate_command() + + try: + config: MemorySegment = getattr(self.address_config, property) + except: + # Swallow the missing attribute error and throw a NotImplementedError for the motor instead + raise NotImplementedError( + f"FATAL | DXID: '{self.id}' | Motor does not support {property} attribute!" + ) + + self.__write_bytes( + config.byte_offset, + config.byte_size, + value, + ) + + def get_torque_enable(self) -> int: + return self.__get_memory_property("torque_enable") + + def set_torque_enable(self, is_enabled: int): + self.__set_memory_property("torque_enable", is_enabled) + + def get_led_enable(self) -> int: + return self.__get_memory_property("led_enable") + + def set_led_enable(self, is_enabled: int): + self.__set_memory_property("led_enable", is_enabled) + + def get_pid_gain(self) -> tuple[int, int, int]: + p = self.__get_memory_property("p_gain") + i = self.__get_memory_property("i_gain") + d = self.__get_memory_property("d_gain") + + return (p, i, d) + + def set_pid_gain(self, p: int, i: int, d: int): + self.__set_memory_property("p_gain", p) + self.__set_memory_property("i_gain", i) + self.__set_memory_property("d_gain", d) + + def get_goal_position(self) -> int: + return self.__get_memory_property("goal_position") + + def set_goal_position(self, position: int): + self.__set_memory_property("goal_position", position) + + def get_moving_speed(self) -> int: + return self.__get_memory_property("moving_speed") + + def set_moving_speed(self, speed: int): + if self.inverted: + if speed >= 1024: + speed -= 1024 + else: + speed += 1024 + + self.__set_memory_property("moving_speed", speed) + + def get_torque_limit(self) -> int: + return self.__get_memory_property("torque_limit") + + def set_torque_limit(self, limit: int): + self.__set_memory_property("torque_limit", limit) + + def get_present_position(self) -> int: + return self.__get_memory_property("present_position") + + def get_present_speed(self) -> int: + return self.__get_memory_property("present_speed") + + def get_present_load(self) -> int: + return self.__get_memory_property("present_load") + + def get_present_input_voltage(self) -> int: + return self.__get_memory_property("present_input_voltage") + + def get_present_temperature(self) -> int: + return self.__get_memory_property("present_temperature") + + def get_moving(self) -> int: + return self.__get_memory_property("moving") + + def get_realtime_tick(self) -> int: + return self.__get_memory_property("realtime_tick") + + def get_goal_acceleration(self) -> int: + return self.__get_memory_property("goal_acceleration") + + def set_goal_acceleration(self, acceleration: int): + self.__set_memory_property("goal_acceleration") + + def init(self, port_handler, packet_handler): + self.port_handler = port_handler + self.packet_handler = packet_handler + self.initialized = True + + def configure_default(self): + # Enable LED + self.set_led_enable(1) + + # Set default PID values intenral to the motor + self.set_pid_gain(8, 0, 8) + + # Enable the motor, and set to 100% max torque + self.set_torque_enable(1) + self.set_torque_limit(1023) + + # If we've made it this far without throwing an exception, + # we've configured our defaults successfully + logger.info( + f"SUCCESS | DYNAMIXEL ID: '{self.id}' | Default configuration applied!" + ) + + +class RobotControl: + def __init__( + self, + device_name: str, + protocol_version: int, + motors: list[DynamixelMotor], + ): + self.port_handler = dynamixel.PortHandler(device_name) + self.packet_handler = dynamixel.PacketHandler(protocol_version) + self.motors = motors + self.position: Vector2D = Vector2D(0, 0) + + self.__validate_motors() + + def __validate_motors(self): + # Confirm each motor has a unique ID + motor_ids = set([motor.id for motor in self.motors]) + + if len(motor_ids) != len(self.motors): + raise Exception("One or more motors have duplicate IDs defined!") + + def __open_port(self) -> bool: + try: + self.port_handler.openPort() + logger.info(f"SUCCESS | OpenPort") + except: + logger.error(f"FAILURE | OpenPort") + return False + return True + + def __set_baud(self, baud) -> bool: + """ + Set baud rate of the motor + + Args: + port_handler: Object responsible for handling port communications with the Dynamixel motors + + Returns: + bool: True if the command was sent successfully + """ + try: + self.port_handler.setBaudRate(baud) + logger.info(f"SUCCESS | SetBaud: '{baud}'") + except: + logger.error(f"FAILURE | SetBaud: '{baud}'") + return False + return True + + def init(self, baud): + if not self.__open_port(): + raise Exception("Unable to open port for Dynamixel!") + + if not self.__set_baud(baud): + raise Exception("Unable to set baud rate for Dynamixel!") + + for motor in self.motors: + motor.init(self.port_handler, self.packet_handler) + motor.configure_default() + + # TODO fix the verbiage here + def set_velocity(self, command: Position2D) -> bool: + """ + Translates a command of unit vectors and rotations into commands for + the motors on the robot. + + Args: + command: RobotCommand - Command of a unit vector and rotation with the + desired velocity of the robot. + """ + clamped_x = min(max(command.location.x, -1), 1) + clamped_y = min(max(command.location.y, -1), 1) + # TODO figure out how to get the smallest angle correctly + smallest_theta = ((command.rotation.theta + (math.pi / 2)) % math.pi) - ( + math.pi / 2 + ) + + for motor in self.motors: + # TODO actual kinematics here + if clamped_y < 0: + motor.set_moving_speed((abs(clamped_y) * 1023) + 1023) + else: + motor.set_moving_speed(clamped_y * 1023) + + return True + + +if __name__ == "__main__": + """ + NOTE: This isn't for actual execution. This entrypoint is only for + testing. + """ + # TODO set proper locations of the motors, once determined in CAD. + motors = [ + DynamixelMotor(10, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0)), + DynamixelMotor(11, DYNAMIXEL_MX_12_ADDR_CONFIG, Position2D(0, 0, 0), True), + ] + + # Set up the robot controller, with the correct baud rate for the MX12 motors + ctrl = RobotControl("/dev/ttyUSB0", 1, motors) + ctrl.init(1000000) + + from IPython import embed + + embed() diff --git a/src/robot/v1/structs.py b/src/robot/v1/structs.py new file mode 100644 index 0000000..056bd92 --- /dev/null +++ b/src/robot/v1/structs.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass + + +@dataclass +class MemorySegment: + byte_offset: int + byte_size: int + + +@dataclass +class DynamixelMotorAddressConfig: + torque_enable: MemorySegment + led_enable: MemorySegment + d_gain: MemorySegment + i_gain: MemorySegment + p_gain: MemorySegment + goal_position: MemorySegment + moving_speed: MemorySegment + torque_limit: MemorySegment + present_position: MemorySegment + present_speed: MemorySegment + present_load: MemorySegment + present_input_voltage: MemorySegment + present_temperature: MemorySegment + registered: MemorySegment + moving: MemorySegment + lock: MemorySegment + punch: MemorySegment + realtime_tick: MemorySegment + goal_acceleration: MemorySegment diff --git a/src/robot/v1/structs_test.py b/src/robot/v1/structs_test.py new file mode 100644 index 0000000..b047b3b --- /dev/null +++ b/src/robot/v1/structs_test.py @@ -0,0 +1,70 @@ +# @dataclass +# class Vector2D: +# x: float +# y: float + +# def __add__(self, other): +# assert isinstance(other, Vector2D) + +# return Vector2D(self.x + other.x, self.y + other.y) + +# def __iadd__(self, other): +# assert isinstance(other, Vector2D) + +# self.x += other.x +# self.y += other.y + +# def __sub__(self, other): +# assert isinstance(other, Vector2D) + +# return Vector2D(self.x - other.x, self.y - other.y) + +# def __isub__(self, other): +# assert isinstance(other, Vector2D) + +# self.x -= other.x +# self.y -= other.y + + +# @dataclass +# class Rotation2D: +# theta: float + + +# @dataclass +# class Position2D: +# location: Vector2D +# rotation: Rotation2D + +# def __init__(self, x, y, theta): +# self.location = Vector2D(x, y) +# self.rotation = Rotation2D(theta) + + +# @dataclass +# class MemorySegment: +# byte_offset: int +# byte_size: int + + +# @dataclass +# class DynamixelMotorAddressConfig: +# torque_enable: MemorySegment +# led_enable: MemorySegment +# d_gain: MemorySegment +# i_gain: MemorySegment +# p_gain: MemorySegment +# goal_position: MemorySegment +# moving_speed: MemorySegment +# torque_limit: MemorySegment +# present_position: MemorySegment +# present_speed: MemorySegment +# present_load: MemorySegment +# present_input_voltage: MemorySegment +# present_temperature: MemorySegment +# registered: MemorySegment +# moving: MemorySegment +# lock: MemorySegment +# punch: MemorySegment +# realtime_tick: MemorySegment +# goal_acceleration: MemorySegment diff --git a/src/robot/v1/README.md b/src/robot/v2/README.md similarity index 100% rename from src/robot/v1/README.md rename to src/robot/v2/README.md diff --git a/src/robot/v1/__init__.py b/src/robot/v2/__init__.py similarity index 100% rename from src/robot/v1/__init__.py rename to src/robot/v2/__init__.py diff --git a/src/robot/v1/publisher.py b/src/robot/v2/publisher.py similarity index 100% rename from src/robot/v1/publisher.py rename to src/robot/v2/publisher.py diff --git a/src/robot/v1/robot_control.py b/src/robot/v2/robot_control.py similarity index 100% rename from src/robot/v1/robot_control.py rename to src/robot/v2/robot_control.py diff --git a/src/robot/v1/subscriber.py b/src/robot/v2/subscriber.py similarity index 100% rename from src/robot/v1/subscriber.py rename to src/robot/v2/subscriber.py diff --git a/src/util/BUILD.bazel b/src/util/BUILD.bazel new file mode 100644 index 0000000..af408fe --- /dev/null +++ b/src/util/BUILD.bazel @@ -0,0 +1,15 @@ +load("@rules_python//python:defs.bzl", "py_library", "py_test") + +py_library( + name = "linalg_src", + srcs = ["linalg.py"], + deps = [ + "@pypi//numpy:pkg", + ], +) + +py_test( + name = "linalg_test", + srcs = ["linalg_test.py"], + deps = [":linalg_src"], +) diff --git a/src/util/linalg.py b/src/util/linalg.py new file mode 100644 index 0000000..7082389 --- /dev/null +++ b/src/util/linalg.py @@ -0,0 +1,61 @@ +from typing import Union +import numpy as np +import math + +M_PI = math.pi +M_PI_2 = M_PI / 2 +M_PI_3 = M_PI / 3 +M_PI_4 = M_PI / 4 + + +def vector2d(x: Union[int, float], y: Union[int, float]): + """ + Returns a 2D vector as a numpy array. + + Params: + x [Union[int, float]]: X component of the vector + y [Union[int, float]]: Y component of the vector + """ + return np.array([x, y]) + + +def rotation2d(theta: Union[int, float]): + """ + Return a 2D rotation matrix as a numpy array, provided an angle in radians. + From https://scipython.com/book/chapter-6-numpy/examples/creating-a-rotation-matrix-in-numpy/ + + Positive rotations move vectors clockwise, whereas negative rotations move + vectors counterclockwise + + Params: + theta [Union[int, float]]: Angle in radians + """ + c, s = np.cos(theta), np.sin(theta) + return np.array(((c, -s), (s, c))) + + +def smallest_angle(theta: Union[int, float]): + """ + Given an angle in radians, returns the smallest rotation for the angle, + between -PI and PI + + Params: + theta [Union[int, float]]: Angle in radians + """ + # Reduce the angle down to 0-PI + reduced_theta = abs(theta) % (M_PI) + + # If the value is between pi and 2pi (modulus 2pi to not count multiple + # rotations), we need to use 360 minus the angle's value + opposite_hemisphere = (abs(theta) // M_PI) % 2 != 0 + + if opposite_hemisphere: + # If the shortest angle is in the opposite hemisphere, we reverse + # the angle and take the opposite of the angle between 0-180. + # Ex. For 210deg, the shortest angle is actually -150deg, whereas + # if directly flipped, it would be reported as -30deg + return math.copysign(M_PI - reduced_theta, -theta) + else: + # If we are in the same hemisphere, we can just remove any extra + # rotations and keep the same sign. + return math.copysign(reduced_theta, theta) diff --git a/src/util/linalg_test.py b/src/util/linalg_test.py new file mode 100644 index 0000000..5ce87ce --- /dev/null +++ b/src/util/linalg_test.py @@ -0,0 +1,229 @@ +import unittest +import math +from src.util.linalg import ( + vector2d, + rotation2d, + smallest_angle, + M_PI, + M_PI_2, + M_PI_3, + M_PI_4, +) + + +class TestVector2D(unittest.TestCase): + def test_add(self): + total = vector2d(1, 2) + vector2d(3, 4) + self.assertEqual(total[0], 4) + self.assertEqual(total[1], 6) + + def test_iadd(self): + vec_a = vector2d(1, 2) + vec_a += vector2d(5, 5) + self.assertEqual(vec_a[0], 6) + self.assertEqual(vec_a[1], 7) + + def test_sub(self): + total = vector2d(5, 5) - vector2d(3, 4) + self.assertEqual(total[0], 2) + self.assertEqual(total[1], 1) + + def test_isub(self): + vec_a = vector2d(5, 5) + vec_a -= vector2d(1, 2) + self.assertEqual(vec_a[0], 4) + self.assertEqual(vec_a[1], 3) + + def test_scalar_multiply(self): + total = vector2d(1, 2) * 3 + self.assertEqual(total[0], 3) + self.assertEqual(total[1], 6) + + def test_scalar_divide(self): + total = vector2d(8, 4) / 2 + self.assertEqual(total[0], 4) + self.assertEqual(total[1], 2) + + def test_45deg_rotation(self): + angle = M_PI_4 + starting_vec = vector2d(1, 2) + + # Figure out the expected angles programmatically + r = math.sqrt(5) + expected_angle_cw = math.atan2(starting_vec[1], starting_vec[0]) - M_PI_4 + expected_vec_cw = vector2d( + r * math.cos(expected_angle_cw), r * math.sin(expected_angle_cw) + ) + + expected_angle_ccw = math.atan2(starting_vec[1], starting_vec[0]) + M_PI_4 + expected_vec_ccw = vector2d( + r * math.cos(expected_angle_ccw), r * math.sin(expected_angle_ccw) + ) + + for theta, expected in [(angle, expected_vec_cw), (-angle, expected_vec_ccw)]: + rotation = rotation2d(theta) + + rotated = starting_vec.dot(rotation) + self.assertEqual(rotated.shape, (2,)) + self.assertAlmostEqual(rotated[0], expected[0]) + self.assertAlmostEqual(rotated[1], expected[1]) + + def test_90deg_rotation(self): + angle = M_PI_2 + starting_vec = vector2d(1, 2) + expected_vec = vector2d(2, -1) + + for theta, expected in [(angle, expected_vec), (-angle, -expected_vec)]: + rotation = rotation2d(theta) + + rotated = starting_vec.dot(rotation) + self.assertEqual(rotated.shape, (2,)) + self.assertAlmostEqual(rotated[0], expected[0]) + self.assertAlmostEqual(rotated[1], expected[1]) + + def test_180deg_rotation(self): + angle = M_PI + starting_vec = vector2d(1, 2) + expected_vec = vector2d(-1, -2) + + for theta, expected in [(angle, expected_vec), (-angle, expected_vec)]: + rotation = rotation2d(theta) + + rotated = starting_vec.dot(rotation) + self.assertEqual(rotated.shape, (2,)) + self.assertAlmostEqual(rotated[0], expected[0]) + self.assertAlmostEqual(rotated[1], expected[1]) + + def test_270deg_rotation(self): + angle = 3 * M_PI_2 + starting_vec = vector2d(1, 2) + expected_vec = vector2d(-2, 1) + + for theta, expected in [(angle, expected_vec), (-angle, -expected_vec)]: + rotation = rotation2d(theta) + + rotated = starting_vec.dot(rotation) + self.assertEqual(rotated.shape, (2,)) + self.assertAlmostEqual(rotated[0], expected[0]) + self.assertAlmostEqual(rotated[1], expected[1]) + + def test_360deg_rotation(self): + angle = 2 * M_PI + starting_vec = vector2d(1, 2) + expected_vec = vector2d(1, 2) + + for theta, expected in [(angle, expected_vec), (-angle, expected_vec)]: + rotation = rotation2d(theta) + + rotated = starting_vec.dot(rotation) + self.assertEqual(rotated.shape, (2,)) + self.assertAlmostEqual(rotated[0], expected[0]) + self.assertAlmostEqual(rotated[1], expected[1]) + + +class TestRotation2D(unittest.TestCase): + def test_add_90deg_90deg(self): + """ + Test that adding two 90 degree rotations adds to a + 180 degree rotation. + """ + rot_a = rotation2d(M_PI_2) + rot_b = rotation2d(M_PI_2) + expected_total = rotation2d(M_PI) + + total = rot_a.dot(rot_b) + self.assertTrue((total == expected_total).all()) + + def test_add_neg90deg_neg90deg(self): + """ + Test that adding two -90 degree rotations adds to a + -180 degree rotation. + """ + rot_a = rotation2d(-M_PI_2) + rot_b = rotation2d(-M_PI_2) + expected_total = rotation2d(-M_PI) + + total = rot_a.dot(rot_b) + self.assertTrue((total == expected_total).all()) + + def test_add_90deg_180deg(self): + """ + Test that adding a 90 and 180 degree rotation adds to + a 270 degree rotation + """ + rot_a = rotation2d(M_PI_2) + rot_b = rotation2d(M_PI) + expected_total = rotation2d(3 * M_PI_2) + + total = rot_a.dot(rot_b) + self.assertTrue((total == expected_total).all()) + + +class TestSmallestAngle(unittest.TestCase): + def test_small_positive_angle(self): + """ + Test a small, positive angle (between 0 and PI) to confirm the + same angle gets returned + """ + angle = smallest_angle(M_PI_4) + self.assertAlmostEqual(angle, M_PI_4) + + def test_small_negative_angle(self): + """ + Test a small, negative angle (between 0 and -PI) to confirm the + same angle gets returned + """ + angle = smallest_angle(-M_PI_4) + self.assertAlmostEqual(angle, -M_PI_4) + + def test_mid_positive_angle(self): + """ + Test a medium, positive angle (between PI and 2PI) to confirm that + the smaller negative angle is returned + """ + angle = smallest_angle(5 * M_PI_3) + self.assertAlmostEqual(angle, -M_PI_3) + + def test_mid_negative_angle(self): + """ + Test a medium, negative angle (between -PI and -2PI) to confirm that + the smaller positive angle is returned + """ + angle = smallest_angle(-5 * M_PI_3) + self.assertAlmostEqual(angle, M_PI_3) + + def test_large_positive_angle(self): + """ + Test a large, positive angle (between 2PI and 3PI) to confirm + that the same positive angle is returned + """ + angle = smallest_angle(5 * M_PI_2) + self.assertAlmostEqual(angle, M_PI_2) + + def test_large_negative_angle(self): + """ + Test a large, negative angle (between -2PI and -3PI) to confirm + that the same negative angle is returned + """ + angle = smallest_angle(-5 * M_PI_2) + self.assertAlmostEqual(angle, -M_PI_2) + + def test_multi_rollover_positive_angle(self): + """ + Test a large, positive angle (0 and PI, + n*2PI) to confirm + that the reduced positive angle is returned + """ + angle = smallest_angle(M_PI_3 + (14 * 2 * M_PI)) + self.assertAlmostEqual(angle, M_PI_3) + + def test_multi_rollover_negative_angle(self): + """ + Test a large, negative angle (0 and -PI, + n*-2PI) to confirm + that the reduced negative angle is returned + """ + angle = smallest_angle(-M_PI_3 + (14 * -2 * M_PI)) + self.assertAlmostEqual(angle, -M_PI_3) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/lint/BUILD.bazel b/tools/lint/BUILD.bazel index c35d65d..336d71e 100644 --- a/tools/lint/BUILD.bazel +++ b/tools/lint/BUILD.bazel @@ -18,6 +18,8 @@ alias( "@bazel_tools//src/conditions:linux_aarch64": "@ruff_aarch64-unknown-linux-gnu//:ruff", "@bazel_tools//src/conditions:darwin_arm64": "@ruff_aarch64-apple-darwin//:ruff", "@bazel_tools//src/conditions:darwin_x86_64": "@ruff_x86_64-apple-darwin//:ruff", + "@bazel_tools//src/conditions:windows_x64": "@ruff_x86_64-pc-windows-msvc//:ruff", + "@bazel_tools//src/conditions:windows_arm64": "@ruff_aarch64-pc-windows-msvc//:ruff", }), )