-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* added ability to store and replay can messages * added some ease of use features to the replay of the can messages. - added checks if replay file exists. - only allow recording when driving with controller - only listen to the CANContolIdentifiers instead of all messages - moved toggle recording to the manual mode - made some changes to replay script to make it more robust and easier to use * small improvements to cli arguments * ruff format * moved can recording to a script and updated braking_calibration to up-to-date conventions * removed unused vars * resolved comments
- Loading branch information
1 parent
c198659
commit aba5f1a
Showing
8 changed files
with
222 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import can | ||
import logging | ||
import threading | ||
import time | ||
|
||
from datetime import datetime | ||
from pathlib import Path | ||
from typing import Any, Callable | ||
|
||
from src.constants import CANControlIdentifier | ||
from src.driving.can import CANController, get_can_bus | ||
from src.driving.gamepad import EventType, Gamepad, GamepadButton | ||
from src.driving.modes import ManualDriving | ||
|
||
|
||
class CANRecorder: | ||
"""Procedure to record CAN message. | ||
This procedure can be used to record messages send on the CAN bus. | ||
This will record all the messages we can send and not the ones we receive. | ||
""" | ||
|
||
__can_bus: can.Bus | ||
__recording: bool = False | ||
__recorder: threading.Thread = None | ||
|
||
def __init__(self) -> None: | ||
"""Procedure to record CAN messages.""" | ||
self.__can_bus = get_can_bus() | ||
self.__can_bus.set_filters( | ||
[{"can_id": can_id, "can_mask": 0xFFF, "extended": False} for can_id in CANControlIdentifier] | ||
) | ||
|
||
def toggle_recording(self) -> None: | ||
"""Toggle the recording of CAN messages. | ||
This function will start recording CAN messages if it is not already recording, | ||
and stop recording if it is already recording. | ||
""" | ||
if not self.__recording: | ||
path = Path(f"./data/can_recordings/{datetime.now().strftime("%m_%d_%Y_%H_%M_%S")}.asc") | ||
path.parent.mkdir(parents=True, exist_ok=True) | ||
|
||
logging.info("Recording CAN messages to %s", path) | ||
self.__recorder = threading.Thread(target=self.__recording_thread, args=(path,), daemon=True) | ||
self.__recorder.start() | ||
else: | ||
self.__recording = False | ||
self.__recorder.join(1) | ||
|
||
self.__can_bus.shutdown() | ||
self.__can_bus = None | ||
|
||
logging.info("Stopped recording CAN messages") | ||
|
||
def __recording_thread(self, filepath: Path) -> None: | ||
"""Record CAN messages into a .asc file.""" | ||
self.__recording = True | ||
|
||
with can.ASCWriter(filepath) as writer: | ||
while self.__recording: | ||
msg = self.__can_bus.recv(1) | ||
if msg is not None: | ||
writer.on_message_received(msg) | ||
|
||
|
||
def create_toggle_callback(can_recorder: CANRecorder, gamepad: Gamepad) -> Callable[[Any, Any], None]: | ||
"""Create the callback for the toggle of the CAN recording.""" | ||
|
||
def __toggle(*_args: Any, **_kwargs: Any) -> None: | ||
can_recorder.toggle_recording() | ||
gamepad.vibrate() | ||
|
||
return __toggle | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
can_bus = get_can_bus() | ||
can_controller = CANController(can_bus) | ||
can_controller.start() | ||
|
||
gamepad = Gamepad() | ||
gamepad.start() | ||
|
||
controller_driving = ManualDriving(gamepad, can_controller) | ||
controller_driving.start() | ||
|
||
can_recorder = CANRecorder() | ||
gamepad.add_listener(GamepadButton.LB, EventType.LONG_PRESS, create_toggle_callback(can_recorder, gamepad)) | ||
|
||
while True: | ||
time.sleep(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import argparse | ||
import logging | ||
import threading | ||
|
||
from pathlib import Path | ||
|
||
from src.constants import CANControlIdentifier, CANFeedbackIdentifier | ||
from src.driving.can import get_can_bus, replay_log | ||
|
||
|
||
def log_all_can_messages(stopped: threading.Event, blacklist: list[int], whitelist: list[int]) -> threading.Thread: | ||
"""Log all messages of the CAN bus.""" | ||
listen_can = get_can_bus() | ||
|
||
# create and apply filters to listen to only the messages we want to log | ||
allowed_ids = set([int(v) for v in CANControlIdentifier] + [int(v) for v in CANFeedbackIdentifier]) - set(blacklist) | ||
|
||
if len(whitelist) > 0: | ||
allowed_ids = set(whitelist) | ||
|
||
logging.info("Listening to messages with IDs: %s", [hex(i) for i in allowed_ids]) | ||
filters = [{"can_id": can_id, "can_mask": 0xFFF, "extended": False} for can_id in allowed_ids] | ||
listen_can.set_filters(filters) | ||
|
||
def __listen() -> None: | ||
"""Listen to the CAN bus and log all messages.""" | ||
while not stopped.is_set(): | ||
message = listen_can.recv() | ||
print(message) # noqa: T201 | ||
|
||
listen_can.shutdown() | ||
|
||
thread = threading.Thread(target=__listen, daemon=True) | ||
thread.start() | ||
return thread | ||
|
||
|
||
if __name__ == "__main__": | ||
"""Replay CAN messages from a file and log to a file. | ||
This script will replay CAN messages from a file and log them to a file. | ||
We will also log them to the console depending on the arguments. | ||
You can only use either --blacklist or --whitelist. | ||
Arguments: | ||
logfile: The path to the log file. | ||
--log: Log all CAN messages. | ||
--blacklist: The list of message IDs to ignore when logging. The IDs are in hexadecimal. | ||
--whitelist: The list of message IDs to listen to. The IDs are in hexadecimal. | ||
""" | ||
logging.basicConfig(level=logging.INFO) | ||
|
||
arg_parser = argparse.ArgumentParser(description="Replay CAN messages from a file.") | ||
arg_parser.add_argument("logfile", type=Path, help="The path to the log file.") | ||
id_list_group = arg_parser.add_mutually_exclusive_group() | ||
id_list_group.add_argument( | ||
"--blacklist", | ||
nargs="+", | ||
type=lambda x: int(x, 0), | ||
help="The list of message IDs to ignore when logging.", | ||
default=[], | ||
) | ||
id_list_group.add_argument( | ||
"--whitelist", nargs="+", type=lambda x: int(x, 0), help="The list of message IDs to listen to.", default=[] | ||
) | ||
id_list_group.add_argument("--log", action="store_true", help="Log all CAN messages.") | ||
args = arg_parser.parse_args() | ||
|
||
# Check if the log file exists. | ||
if not args.logfile.exists(): | ||
logging.error("The log file %s does not exist.", args.logfile) | ||
exit(1) | ||
|
||
can_bus = get_can_bus() | ||
|
||
# Setup and start the log thread if needed. If we want to log then we will set stop_event, so we can use stop_event | ||
# not being None to know if we want to log or not. | ||
log_thread = None | ||
stop_event = threading.Event() if args.log or len(args.blacklist) > 0 or len(args.whitelist) > 0 else None | ||
if stop_event is not None: | ||
log_thread = log_all_can_messages(stop_event, args.blacklist, args.whitelist) | ||
|
||
# Replay the log file. | ||
logging.info("Replaying CAN messages from file...") | ||
replay_log(can_bus, args.logfile) | ||
logging.info("Replaying finished.") | ||
|
||
if stop_event is not None: | ||
stop_event.set() | ||
log_thread.join(1) | ||
|
||
can_bus.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
from .can_controller_interface import ICANController | ||
from .can_bus import get_can_bus | ||
from .can_bus import replay_log | ||
from .can_controller import CANController |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters