diff --git a/README.md b/README.md index 063f9a7..ec02a5c 100644 --- a/README.md +++ b/README.md @@ -69,8 +69,8 @@ python3 -m pylint robot_sf ### 5. Run Visual Debugging of Pre-Trained Demo Models ```sh -python3 demo_offensive.py -python3 demo_defensive.py +python3 examples/demo_offensive.py +python3 examples/demo_defensive.py ``` ### 6. Run StableBaselines Training (Docker) diff --git a/demo_defensive.py b/examples/demo_defensive.py similarity index 98% rename from demo_defensive.py rename to examples/demo_defensive.py index 0c00829..9d024e5 100644 --- a/demo_defensive.py +++ b/examples/demo_defensive.py @@ -2,8 +2,8 @@ from gymnasium import spaces from stable_baselines3 import PPO -from robot_sf.robot_env import RobotEnv, OBS_DRIVE_STATE, OBS_RAYS -from robot_sf.sim_config import EnvSettings +from robot_sf.gym_env.robot_env import RobotEnv, OBS_DRIVE_STATE, OBS_RAYS +from robot_sf.gym_env.env_config import EnvSettings from robot_sf.sim.sim_config import SimulationSettings from robot_sf.robot.differential_drive import DifferentialDriveSettings diff --git a/demo_offensive.py b/examples/demo_offensive.py similarity index 88% rename from demo_offensive.py rename to examples/demo_offensive.py index 34d873b..59fd510 100644 --- a/demo_offensive.py +++ b/examples/demo_offensive.py @@ -1,6 +1,6 @@ from stable_baselines3 import PPO -from robot_sf.robot_env import RobotEnv -from robot_sf.sim_config import EnvSettings +from robot_sf.gym_env.robot_env import RobotEnv +from robot_sf.gym_env.env_config import EnvSettings from robot_sf.sim.sim_config import SimulationSettings from robot_sf.robot.bicycle_drive import BicycleDriveSettings diff --git a/robot_sf/classes.svg b/robot_sf/classes.svg index bc9c5c2..2cf75cc 100644 --- a/robot_sf/classes.svg +++ b/robot_sf/classes.svg @@ -4,874 +4,777 @@ - - + + classes - + robot_sf.robot.bicycle_drive.BicycleDriveRobot - -BicycleDriveRobot - -action_space -config -current_speed -movement -observation_space -pos -pose -state - -apply_action(action: BicycleAction, d_t: float) -parse_action(action: np.ndarray): BicycleAction -reset_state(new_pose: RobotPose) + +BicycleDriveRobot + +action_space +config +current_speed +movement +observation_space +pos +pose +state + +apply_action(action: BicycleAction, d_t: float) +parse_action(action: np.ndarray): BicycleAction +reset_state(new_pose: RobotPose) robot_sf.robot.bicycle_drive.BicycleDriveSettings - -BicycleDriveSettings - -allow_backwards : bool -max_accel : float -max_steer : float -max_velocity : float -min_velocity -radius : float -wheelbase : float - - + +BicycleDriveSettings + +allow_backwards : bool +max_accel : float +max_steer : float +max_velocity : float +min_velocity +radius : float +wheelbase : float + + - + robot_sf.robot.bicycle_drive.BicycleDriveSettings->robot_sf.robot.bicycle_drive.BicycleDriveRobot - - -config + + +config robot_sf.robot.bicycle_drive.BicycleMotion - -BicycleMotion - -config - -move(state: BicycleDriveState, action: BicycleAction, d_t: float) + +BicycleMotion + +config + +move(state: BicycleDriveState, action: BicycleAction, d_t: float) - + robot_sf.robot.bicycle_drive.BicycleDriveSettings->robot_sf.robot.bicycle_drive.BicycleMotion - - -config + + +config robot_sf.robot.bicycle_drive.BicycleDriveState - -BicycleDriveState - -current_speed -orient -pos -pose : Tuple -velocity : float - - + +BicycleDriveState + +current_speed +orient +pos +pose : Tuple +velocity : float + + - + robot_sf.robot.bicycle_drive.BicycleDriveState->robot_sf.robot.bicycle_drive.BicycleDriveRobot - - -state + + +state - + robot_sf.robot.bicycle_drive.BicycleMotion->robot_sf.robot.bicycle_drive.BicycleDriveRobot - - -movement + + +movement robot_sf.nav.occupancy.ContinuousOccupancy - -ContinuousOccupancy - -get_goal_coords : Callable[[], Vec2D] -get_obstacle_coords : Callable[[], np.ndarray] -get_pedestrian_coords : Callable[[], np.ndarray] -get_robot_coords : Callable[[], Vec2D] -goal_radius : float -height : float -is_obstacle_collision -is_pedestrian_collision -is_robot_at_goal -is_robot_robot_collision -obstacle_coords -ped_radius : float -pedestrian_coords -robot_radius : float -width : float - -is_in_bounds(world_x: float, world_y: float): bool - - - -robot_sf.robot_env.RobotState - -RobotState - -d_t : float -episode : int -is_at_goal : bool -is_collision_with_obst : bool -is_collision_with_ped : bool -is_collision_with_robot : bool -is_route_complete -is_terminal -is_timeout : bool -is_waypoint_complete -max_sim_steps -nav -occupancy -sensors -sim_time_elapsed : float -sim_time_limit : float -timestep : int - -meta_dict(): dict -reset() -step() - - - -robot_sf.nav.occupancy.ContinuousOccupancy->robot_sf.robot_env.RobotState - - -occupancy + +ContinuousOccupancy + +get_goal_coords : Callable[[], Vec2D] +get_obstacle_coords : Callable[[], np.ndarray] +get_pedestrian_coords : Callable[[], np.ndarray] +get_robot_coords : Callable[[], Vec2D] +goal_radius : float +height : float +is_obstacle_collision +is_pedestrian_collision +is_robot_at_goal +is_robot_robot_collision +obstacle_coords +ped_radius : float +pedestrian_coords +robot_radius : float +width : float + +is_in_bounds(world_x: float, world_y: float): bool + + + +robot_sf.robot.robot_state.RobotState + +RobotState + +d_t : float +episode : int +is_at_goal : bool +is_collision_with_obst : bool +is_collision_with_ped : bool +is_collision_with_robot : bool +is_route_complete +is_terminal +is_timeout : bool +is_waypoint_complete +max_sim_steps +nav +occupancy +sensors +sim_time_elapsed : float +sim_time_limit : float +timestep : int + +meta_dict(): dict +reset() +step() + + + +robot_sf.nav.occupancy.ContinuousOccupancy->robot_sf.robot.robot_state.RobotState + + +occupancy robot_sf.ped_npc.ped_behavior.CrowdedZoneBehavior - -CrowdedZoneBehavior - -crowded_zones : List[Zone] -goal_proximity_threshold : float -groups -zone_assignments : Dict[int, int] - -reset() -step() + +CrowdedZoneBehavior + +crowded_zones : List[Zone] +goal_proximity_threshold : float +groups +zone_assignments : Dict[int, int] + +reset() +step() robot_sf.robot.differential_drive.DifferentialDriveMotion - -DifferentialDriveMotion - -config - -move(state: DifferentialDriveState, action: PolarVec2D, d_t: float) + +DifferentialDriveMotion + +config + +move(state: DifferentialDriveState, action: PolarVec2D, d_t: float) robot_sf.robot.differential_drive.DifferentialDriveRobot - -DifferentialDriveRobot - -action_space -config -current_speed -movement -observation_space -pos -pose -state - -apply_action(action: DifferentialDriveAction, d_t: float) -parse_action(action: np.ndarray): DifferentialDriveAction -reset_state(new_pose: RobotPose) + +DifferentialDriveRobot + +action_space +config +current_speed +movement +observation_space +pos +pose +state + +apply_action(action: DifferentialDriveAction, d_t: float) +parse_action(action: np.ndarray): DifferentialDriveAction +reset_state(new_pose: RobotPose) - + robot_sf.robot.differential_drive.DifferentialDriveMotion->robot_sf.robot.differential_drive.DifferentialDriveRobot - - -movement + + +movement robot_sf.robot.differential_drive.DifferentialDriveSettings - -DifferentialDriveSettings - -interaxis_length : float -max_angular_speed : float -max_linear_speed : float -radius : float -wheel_radius : float - - + +DifferentialDriveSettings + +interaxis_length : float +max_angular_speed : float +max_linear_speed : float +radius : float +wheel_radius : float + + - + robot_sf.robot.differential_drive.DifferentialDriveSettings->robot_sf.robot.differential_drive.DifferentialDriveMotion - - -config + + +config - + robot_sf.robot.differential_drive.DifferentialDriveSettings->robot_sf.robot.differential_drive.DifferentialDriveRobot - - -config + + +config robot_sf.robot.differential_drive.DifferentialDriveState - -DifferentialDriveState - -last_wheel_speeds : Tuple -pose : Tuple -velocity : Tuple -wheel_speeds : Tuple - - + +DifferentialDriveState + +last_wheel_speeds : Tuple +pose : Tuple +velocity : Tuple +wheel_speeds : Tuple + + - + robot_sf.robot.differential_drive.DifferentialDriveState->robot_sf.robot.differential_drive.DifferentialDriveRobot - - -state + + +state robot_sf.tb_logging.DrivingMetricsCallback - -DrivingMetricsCallback - -is_logging_step -meta_dicts -metrics -writer : NoneType, Optional[SummaryWriter] - - + +DrivingMetricsCallback + +is_logging_step +meta_dicts +metrics +writer : NoneType, Optional[SummaryWriter] + + robot_sf.feature_extractor.DynamicsExtractor - -DynamicsExtractor - -drive_state_extractor : Sequential -ray_extractor : Sequential - -forward(obs: dict): th.Tensor + +DynamicsExtractor + +drive_state_extractor : Sequential +ray_extractor : Sequential + +forward(obs: dict): th.Tensor robot_sf.eval.EnvMetrics - -EnvMetrics - -cache_size : int -completed_routes -exceeded_timesteps -interm_goal_completion_rate -intermediate_goal_outcomes : List[EnvOutcome] -obstacle_collision_rate -obstacle_collisions -pedestrian_collision_rate -pedestrian_collisions -reached_intermediate_goals -route_completion_rate -route_outcomes : List[EnvOutcome] -timeout_rate -total_intermediate_goals -total_routes - -update(meta: dict) + +EnvMetrics + +cache_size : int +completed_routes +exceeded_timesteps +interm_goal_completion_rate +intermediate_goal_outcomes : List[EnvOutcome] +obstacle_collision_rate +obstacle_collisions +pedestrian_collision_rate +pedestrian_collisions +reached_intermediate_goals +route_completion_rate +route_outcomes : List[EnvOutcome] +timeout_rate +total_intermediate_goals +total_routes + +update(meta: dict) robot_sf.eval.EnvOutcome - -EnvOutcome - -name - - - - - -robot_sf.sim_config.EnvSettings - -EnvSettings - -lidar_config -map_pool -robot_config : Union[DifferentialDriveSettings, BicycleDriveSettings] -sim_config - -robot_factory(): Union[DifferentialDriveRobot, BicycleDriveRobot] - - - -robot_sf.robot_env.RobotEnv - -RobotEnv - -action_space -debug : bool -env_config -last_action : NoneType -observation_space : Dict -reward_func -sim_ui -simulator -state - -exit() -render() -reset() -step(action) - - - -robot_sf.sim_config.EnvSettings->robot_sf.robot_env.RobotEnv - - -env_config + +EnvOutcome + +name + + - + robot_sf.ped_npc.ped_behavior.FollowRouteBehavior - -FollowRouteBehavior - -goal_proximity_threshold : float -groups -initial_sections : List[int] -navigators : Dict[int, RouteNavigator] -route_assignments : Dict[int, GlobalRoute] - -reset -() -respawn_group_at_start(gid: int) -step() + +FollowRouteBehavior + +goal_proximity_threshold : float +groups +initial_sections : List[int] +navigators : Dict[int, RouteNavigator] +route_assignments : Dict[int, GlobalRoute] + +reset +() +respawn_group_at_start(gid: int) +step() - + robot_sf.nav.map_config.GlobalRoute - -GlobalRoute - -goal_id : int -goal_zone : Tuple -section_lengths -section_offsets -sections -spawn_id : int -spawn_zone : Tuple -total_length -waypoints : List[Vec2D] - - + +GlobalRoute + +goal_id : int +goal_zone : Tuple +section_lengths +section_offsets +sections +spawn_id : int +spawn_zone : Tuple +total_length +waypoints : List[Vec2D] + + - + robot_sf.sensor.range_sensor.LidarScannerSettings - -LidarScannerSettings - -angle_opening : Tuple -max_scan_dist : float -num_rays : int -scan_noise : List[float] -visual_angle_portion : float - - - - - -robot_sf.sensor.range_sensor.LidarScannerSettings->robot_sf.sim_config.EnvSettings - - -lidar_config + +LidarScannerSettings + +angle_opening : Tuple +max_scan_dist : float +num_rays : int +scan_noise : List[float] +visual_angle_portion : float + + - + robot_sf.nav.map_config.MapDefinition - -MapDefinition - -bounds : List[Line2D] -height : float -max_target_dist -num_start_pos -obstacles : List[Obstacle] -obstacles_pysf : List[Line2D] -ped_crowded_zones : List[Rect] -ped_goal_zones : List[Rect] -ped_routes : List[GlobalRoute] -ped_spawn_zones : List[Rect] -robot_goal_zones : List[Rect] -robot_routes : List[GlobalRoute] -robot_routes_by_spawn_id : Dict[int, List[GlobalRoute]] -robot_spawn_zones : List[Rect] -width : float - -find_route(spawn_id: int, goal_id: int): Union[GlobalRoute, None] + +MapDefinition + +bounds : List[Line2D] +height : float +max_target_dist +num_start_pos +obstacles : List[Obstacle] +obstacles_pysf : List[Line2D] +ped_crowded_zones : List[Rect] +ped_goal_zones : List[Rect] +ped_routes : List[GlobalRoute] +ped_spawn_zones : List[Rect] +robot_goal_zones : List[Rect] +robot_routes : List[GlobalRoute] +robot_routes_by_spawn_id : Dict[int, List[GlobalRoute]] +robot_spawn_zones : List[Rect] +width : float + +find_route(spawn_id: int, goal_id: int): Union[GlobalRoute, None] - + robot_sf.sim.simulator.Simulator - -Simulator - -config -goal_pos -goal_proximity_threshold : float -groups -map_def -next_goal_pos -ped_pos -peds_behaviors : List[PedestrianBehavior] -pysf_sim : Simulator -pysf_state -random_start_pos : bool -robot_navs : List[RouteNavigator] -robot_pos -robot_poses -robots : List[Robot] - -reset_state() -step_once(actions: List[RobotAction]) + +Simulator + +config +goal_pos +goal_proximity_threshold : float +groups +map_def +next_goal_pos +ped_pos +peds_behaviors : List[PedestrianBehavior] +pysf_sim : Simulator +pysf_state +random_start_pos : bool +robot_navs : List[RouteNavigator] +robot_pos +robot_poses +robots : List[Robot] + +reset_state() +step_once(actions: List[RobotAction]) robot_sf.nav.map_config.MapDefinition->robot_sf.sim.simulator.Simulator - - -map_def + + +map_def - + robot_sf.nav.map_config.MapDefinitionPool - -MapDefinitionPool - -map_defs : List[MapDefinition] -maps_folder : str - -choose_random_map(): MapDefinition - - - -robot_sf.nav.map_config.MapDefinitionPool->robot_sf.sim_config.EnvSettings - - -map_pool - - - -robot_sf.robot_env.MultiRobotEnv - -MultiRobotEnv - -action_space : Box -debug : bool -obs_worker_pool : ThreadPool -reward_func -sim_worker_pool : ThreadPool -simulators : list -states : List[RobotState] - -close_extras() -render -(robot_id: int) -reset() -step(actions) + +MapDefinitionPool + +map_defs : List[MapDefinition] +maps_folder : str + +choose_random_map(): MapDefinition - + robot_sf.nav.map_config.Obstacle - -Obstacle - -lines : List[Line2D] -vertices : List[Vec2D] -vertices_np : ndarray - - + +Obstacle + +lines : List[Line2D] +vertices : List[Vec2D] +vertices_np : ndarray + + - + robot_sf.ped_npc.ped_robot_force.PedRobotForce - -PedRobotForce - -config -get_robot_pos : Callable[[], Vec2D] -last_forces : float -peds : PedState - - + +PedRobotForce + +config +get_robot_pos : Callable[[], Vec2D] +last_forces : float +peds : PedState + + - + robot_sf.ped_npc.ped_robot_force.PedRobotForceConfig - -PedRobotForceConfig - -activation_threshold : float -force_multiplier : float -is_active : bool -robot_radius : float - - + +PedRobotForceConfig + +activation_threshold : float +force_multiplier : float +is_active : bool +robot_radius : float + + - + robot_sf.ped_npc.ped_robot_force.PedRobotForceConfig->robot_sf.ped_npc.ped_robot_force.PedRobotForce - - -config + + +config - + robot_sf.sim.sim_config.SimulationSettings - -SimulationSettings - -difficulty : int -goal_radius : float -max_peds_per_group : int -max_sim_steps -ped_density_by_difficulty : List[float] -ped_radius : float -peds_per_area_m2 -peds_speed_mult : float -prf_config -sim_time_in_secs : float -stack_steps : int -time_per_step_in_secs : float -use_next_goal : bool - - + +SimulationSettings + +difficulty : int +goal_radius : float +max_peds_per_group : int +max_sim_steps +ped_density_by_difficulty : List[float] +ped_radius : float +peds_per_area_m2 +peds_speed_mult : float +prf_config +sim_time_in_secs : float +stack_steps : int +time_per_step_in_secs : float +use_next_goal : bool + + - + robot_sf.ped_npc.ped_robot_force.PedRobotForceConfig->robot_sf.sim.sim_config.SimulationSettings - - -prf_config + + +prf_config - + robot_sf.ped_npc.ped_population.PedSpawnConfig - -PedSpawnConfig - -group_member_probs : List[float] -group_size_decay : float -initial_speed : float -max_group_members : int -peds_per_area_m2 : float -sidewalk_width : float - - + +PedSpawnConfig + +group_member_probs : List[float] +group_size_decay : float +initial_speed : float +max_group_members : int +peds_per_area_m2 : float +sidewalk_width : float + + - + robot_sf.ped_npc.ped_behavior.PedestrianBehavior - -PedestrianBehavior - - -reset -() -step -() + +PedestrianBehavior + + +reset +() +step +() - + robot_sf.ped_npc.ped_grouping.PedestrianGroupings - -PedestrianGroupings - -group_by_ped_id : Dict[int, int] -group_ids -groups : Dict[int, Set[int]] -groups_as_lists -states - -goal_of_group(group_id: int): Vec2D -group_centroid(group_id: int): Vec2D -group_size(group_id: int): int -new_group(ped_ids: Set[int]): int -redirect_group(group_id: int, new_goal: Vec2D) -remove_group(group_id: int) -reposition_group(group_id: int, new_positions: List[Vec2D]) + +PedestrianGroupings + +group_by_ped_id : Dict[int, int] +group_ids +groups : Dict[int, Set[int]] +groups_as_lists +states + +goal_of_group(group_id: int): Vec2D +group_centroid(group_id: int): Vec2D +group_size(group_id: int): int +new_group(ped_ids: Set[int]): int +redirect_group(group_id: int, new_goal: Vec2D) +remove_group(group_id: int) +reposition_group(group_id: int, new_positions: List[Vec2D]) - + robot_sf.ped_npc.ped_grouping.PedestrianGroupings->robot_sf.ped_npc.ped_behavior.CrowdedZoneBehavior - - -groups + + +groups - + robot_sf.ped_npc.ped_grouping.PedestrianGroupings->robot_sf.ped_npc.ped_behavior.FollowRouteBehavior - - -groups + + +groups - + robot_sf.ped_npc.ped_grouping.PedestrianGroupings->robot_sf.sim.simulator.Simulator - - -groups + + +groups - + robot_sf.ped_npc.ped_grouping.PedestrianStates - -PedestrianStates - -num_peds -ped_positions -pysf_states : Callable[[], np.ndarray] - -goal_of(ped_id: int): Vec2D -pos_of(ped_id: int): Vec2D -pos_of_many(ped_ids: Set[int]): np.ndarray -redirect(ped_id: int, new_goal: Vec2D) -reposition(ped_id: int, new_pos: Vec2D) + +PedestrianStates + +num_peds +ped_positions +pysf_states : Callable[[], np.ndarray] + +goal_of(ped_id: int): Vec2D +pos_of(ped_id: int): Vec2D +pos_of_many(ped_ids: Set[int]): np.ndarray +redirect(ped_id: int, new_goal: Vec2D) +reposition(ped_id: int, new_pos: Vec2D) - + robot_sf.ped_npc.ped_grouping.PedestrianStates->robot_sf.ped_npc.ped_grouping.PedestrianGroupings - - -states + + +states - + robot_sf.ped_npc.ped_grouping.PedestrianStates->robot_sf.sim.simulator.Simulator - - -pysf_state - - - -robot_sf.robot_env.Robot - -Robot - -action_space -current_speed -observation_space -pos -pose - -apply_action -(action: Any, d_t: float) -parse_action -(action: Any): Any -reset_state -(new_pose: RobotPose) - - - -robot_sf.robot_env.RobotState->robot_sf.robot_env.RobotEnv - - -state + + +pysf_state - + robot_sf.nav.navigation.RouteNavigator - -RouteNavigator - -current_waypoint -initial_orientation -next_waypoint -pos : Tuple -proximity_threshold : float -reached_destination -reached_waypoint : bool -waypoint_id : int -waypoints : List[Vec2D] - -new_route(route: List[Vec2D]) -update_position(pos: Vec2D) - - - -robot_sf.nav.navigation.RouteNavigator->robot_sf.robot_env.RobotState - - -nav + +RouteNavigator + +current_waypoint +initial_orientation +next_waypoint +pos : Tuple +proximity_threshold : float +reached_destination +reached_waypoint : bool +waypoint_id : int +waypoints : List[Vec2D] + +new_route(route: List[Vec2D]) +update_position(pos: Vec2D) + + + +robot_sf.nav.navigation.RouteNavigator->robot_sf.robot.robot_state.RobotState + + +nav - + robot_sf.ped_npc.ped_population.RoutePointsGenerator - -RoutePointsGenerator - -routes : List[GlobalRoute] -sidewalk_width : float -total_length -total_sidewalks_area - -generate(num_samples: int): Tuple[List[Vec2D], int, int] + +RoutePointsGenerator + +routes : List[GlobalRoute] +sidewalk_width : float +total_length +total_sidewalks_area + +generate(num_samples: int): Tuple[List[Vec2D], int, int] - + robot_sf.sensor.sensor_fusion.SensorFusion - -SensorFusion - -cache_steps : int -drive_state_cache : List[np.ndarray] -lidar_sensor : Callable[[], np.ndarray] -lidar_state_cache : List[np.ndarray] -robot_speed_sensor : Callable[[], PolarVec2D] -target_sensor : Callable[[], Tuple[float, float, float]] -unnormed_obs_space : Dict -use_next_goal : bool - -next_obs(): Dict[str, np.ndarray] -reset_cache() - - - -robot_sf.sensor.sensor_fusion.SensorFusion->robot_sf.robot_env.RobotState - - -sensors - - - -robot_sf.sim.sim_config.SimulationSettings->robot_sf.sim_config.EnvSettings - - -sim_config + +SensorFusion + +cache_steps : int +drive_state_cache : List[np.ndarray] +lidar_sensor : Callable[[], np.ndarray] +lidar_state_cache : List[np.ndarray] +robot_speed_sensor : Callable[[], PolarVec2D] +target_sensor : Callable[[], Tuple[float, float, float]] +unnormed_obs_space : Dict +use_next_goal : bool + +next_obs(): Dict[str, np.ndarray] +reset_cache() + + + +robot_sf.sensor.sensor_fusion.SensorFusion->robot_sf.robot.robot_state.RobotState + + +sensors + + + +robot_sf.simple_robot_env.SimpleRobotEnv + +SimpleRobotEnv + +action_space : Discrete +info : dict +observation_space : Discrete + +close +() +render +(mode) +reset(seed) +step +(action) - + robot_sf.sim.sim_config.SimulationSettings->robot_sf.sim.simulator.Simulator - - -config + + +config - + robot_sf.sim.sim_view.SimulationView - -SimulationView - -font : Font -goal_radius : float -height : float -is_abortion_requested : bool -is_exit_requested : bool -obstacles : List[Obstacle] -ped_radius : float -robot_radius : float -scaling : float -screen -size_changed : bool -surface_obstacles -timestep_text_pos -ui_events_thread : Thread -width : float - -clear() -exit() -preprocess_obstacles(): pygame.Surface -render(state: VisualizableSimState) -show() - - - -robot_sf.sim.sim_view.SimulationView->robot_sf.robot_env.RobotEnv - - -sim_ui + +SimulationView + +font : Font +goal_radius : float +height : float +is_abortion_requested : bool +is_exit_requested : bool +obstacles : List[Obstacle] +ped_radius : float +robot_radius : float +scaling : float +screen +size_changed : bool +surface_obstacles +timestep_text_pos +ui_events_thread : Thread +width : float + +clear() +exit() +preprocess_obstacles(): pygame.Surface +render(state: VisualizableSimState) +show() - + robot_sf.eval.VecEnvMetrics - -VecEnvMetrics - -interm_goal_completion_rate -metrics : List[EnvMetrics] -obstacle_collision_rate -pedestrian_collision_rate -route_completion_rate -timeout_rate - -update(metas: List[dict]) + +VecEnvMetrics + +interm_goal_completion_rate +metrics : List[EnvMetrics] +obstacle_collision_rate +pedestrian_collision_rate +route_completion_rate +timeout_rate + +update(metas: List[dict]) robot_sf.eval.VecEnvMetrics->robot_sf.tb_logging.DrivingMetricsCallback - - -metrics + + +metrics - + robot_sf.sim.sim_view.VisualizableAction - -VisualizableAction - -robot_action : Union[DifferentialDriveAction, BicycleAction] -robot_goal : Tuple -robot_pose : Tuple - - + +VisualizableAction + +robot_action : Union[DifferentialDriveAction, BicycleAction] +robot_goal : Tuple +robot_pose : Tuple + + - + robot_sf.sim.sim_view.VisualizableSimState - -VisualizableSimState - -action : Union[VisualizableAction, None] -ped_actions : ndarray -pedestrian_positions : ndarray -ray_vecs : ndarray -robot_pose : Tuple -timestep : int - - + +VisualizableSimState + +action : Union[VisualizableAction, None] +ped_actions : ndarray +pedestrian_positions : ndarray +ray_vecs : ndarray +robot_pose : Tuple +timestep : int + + - + robot_sf.ped_npc.ped_population.ZonePointsGenerator - -ZonePointsGenerator - -zone_areas : List[float] -zones : List[Zone] - -generate(num_samples: int): Tuple[List[Vec2D], int] + +ZonePointsGenerator + +zone_areas : List[float] +zones : List[Zone] + +generate(num_samples: int): Tuple[List[Vec2D], int] diff --git a/robot_sf/feature_extractor.py b/robot_sf/feature_extractor.py index f1046ae..659f96c 100644 --- a/robot_sf/feature_extractor.py +++ b/robot_sf/feature_extractor.py @@ -9,7 +9,7 @@ from torch import nn from stable_baselines3.common.torch_layers import BaseFeaturesExtractor -from robot_sf.robot_env import OBS_DRIVE_STATE, OBS_RAYS +from robot_sf.sensor.sensor_fusion import OBS_RAYS, OBS_DRIVE_STATE class DynamicsExtractor(BaseFeaturesExtractor): diff --git a/robot_sf/gym_env/empty_robot_env.py b/robot_sf/gym_env/empty_robot_env.py new file mode 100644 index 0000000..9f759c3 --- /dev/null +++ b/robot_sf/gym_env/empty_robot_env.py @@ -0,0 +1,224 @@ +""" +An empty environment for the robot to drive to several goals. +""" + +from typing import Tuple, Callable +from copy import deepcopy + +import numpy as np + +from gymnasium import Env +from gymnasium.utils import seeding + +from robot_sf.robot.robot_state import RobotState +from robot_sf.gym_env.env_config import EnvSettings +from robot_sf.sensor.range_sensor import lidar_ray_scan + +from robot_sf.sim.sim_view import ( + SimulationView, + VisualizableAction, + VisualizableSimState) +from robot_sf.sim.simulator import init_simulators +from robot_sf.gym_env.reward import simple_reward +from robot_sf.gym_env.env_util import init_collision_and_sensors, init_spaces + +Vec2D = Tuple[float, float] +PolarVec2D = Tuple[float, float] +RobotPose = Tuple[Vec2D, float] + + +class EmptyRobotEnv(Env): + """ + Representing a Gymnasium environment for training a self-driving robot + with reinforcement learning. + """ + + def __init__( + self, + env_config: EnvSettings = EnvSettings(), + reward_func: Callable[[dict], float] = simple_reward, + debug: bool = False + ): + """ + Initialize the Robot Environment. + + Parameters: + - env_config (EnvSettings): Configuration for environment settings. + - reward_func (Callable[[dict], float]): Reward function that takes + a dictionary as input and returns a float as reward. + - debug (bool): If True, enables debugging information such as + visualizations. + """ + + # Environment configuration details + self.env_config = env_config + + # Extract first map definition; currently only supports using the first map + map_def = env_config.map_pool.map_defs["uni_campus_big"] + + # Initialize spaces based on the environment configuration and map + self.action_space, self.observation_space, orig_obs_space = \ + init_spaces(env_config, map_def) + + # Assign the reward function and debug flag + self.reward_func, self.debug = reward_func, debug + + # Initialize simulator with a random start position + self.simulator = init_simulators( + env_config, + map_def, + random_start_pos=True + )[0] + + # Delta time per simulation step and maximum episode time + d_t = env_config.sim_config.time_per_step_in_secs + max_ep_time = env_config.sim_config.sim_time_in_secs + + # Initialize collision detectors and sensor data processors + occupancies, sensors = init_collision_and_sensors( + self.simulator, + env_config, + orig_obs_space + ) + + # Setup initial state of the robot + self.state = RobotState( + self.simulator.robot_navs[0], + occupancies[0], + sensors[0], + d_t, + max_ep_time) + + # Store last action executed by the robot + self.last_action = None + + # If in debug mode, create a simulation view to visualize the state + if debug: + self.sim_ui = SimulationView( + scaling=10, + obstacles=map_def.obstacles, + robot_radius=env_config.robot_config.radius, + ped_radius=env_config.sim_config.ped_radius, + goal_radius=env_config.sim_config.goal_radius) + + # Display the simulation UI + self.sim_ui.show() + + def step(self, action): + """ + Execute one time step within the environment. + + Parameters: + - action: Action to be executed. + + Returns: + - obs: Observation after taking the action. + - reward: Calculated reward for the taken action. + - term: Boolean indicating if the episode has terminated. + - info: Additional information as dictionary. + """ + # Process the action through the simulator + action = self.simulator.robots[0].parse_action(action) + self.last_action = action + # Perform simulation step + self.simulator.step_once([action]) + # Get updated observation + obs = self.state.step() + # Fetch metadata about the current state + meta = self.state.meta_dict() + # Determine if the episode has reached terminal state + term = self.state.is_terminal + # Compute the reward using the provided reward function + reward = self.reward_func(meta) + return obs, reward, term, {"step": meta["step"], "meta": meta} + + def reset(self): + """ + Reset the environment state to start a new episode. + + Returns: + - obs: The initial observation after resetting the environment. + """ + # Reset internal simulator state + self.simulator.reset_state() + # Reset the environment's state and return the initial observation + obs = self.state.reset() + return obs + + def render(self): + """ + Render the environment visually if in debug mode. + + Raises RuntimeError if debug mode is not enabled. + """ + if not self.sim_ui: + raise RuntimeError( + 'Debug mode is not activated! Consider setting ' + 'debug=True!') + + # Prepare action visualization, if any action was executed + action = None if not self.last_action else VisualizableAction( + self.simulator.robot_poses[0], + self.last_action, + self.simulator.goal_pos[0]) + + # Robot position and LIDAR scanning visualization preparation + robot_pos = self.simulator.robot_poses[0][0] + distances, directions = lidar_ray_scan( + self.simulator.robot_poses[0], + self.state.occupancy, + self.env_config.lidar_config) + + # Construct ray vectors for visualization + ray_vecs = zip( + np.cos(directions) * distances, + np.sin(directions) * distances + ) + ray_vecs_np = np.array([[ + [robot_pos[0], robot_pos[1]], + [robot_pos[0] + x, robot_pos[1] + y] + ] for x, y in ray_vecs] + ) + + # Prepare pedestrian action visualization + ped_actions = zip( + self.simulator.pysf_sim.peds.pos(), + self.simulator.pysf_sim.peds.pos() + + self.simulator.pysf_sim.peds.vel() * 2) + ped_actions_np = np.array([[pos, vel] for pos, vel in ped_actions]) + + # Package the state for visualization + state = VisualizableSimState( + self.state.timestep, action, self.simulator.robot_poses[0], + deepcopy(self.simulator.ped_pos), ray_vecs_np, ped_actions_np) + + # Execute rendering of the state through the simulation UI + self.sim_ui.render(state) + + def seed(self, seed=None): + """ + Set the seed for this env's random number generator(s). + + Note: + Some environments use multiple pseudorandom number generators. + We want to capture all such seeds used in order to ensure that + there aren't accidental correlations between multiple generators. + + Returns: + list: Returns the list of seeds used in this env's random + number generators. The first value in the list should be the + "main" seed, or the value which a reproducer should pass to + 'seed'. Often, the main seed equals the provided 'seed', but + this won't be true if seed=None, for example. + + TODO: validate this method + """ + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def exit(self): + """ + Clean up and exit the simulation UI, if it exists. + """ + if self.sim_ui: + self.sim_ui.exit() diff --git a/robot_sf/sim_config.py b/robot_sf/gym_env/env_config.py similarity index 87% rename from robot_sf/sim_config.py rename to robot_sf/gym_env/env_config.py index 3267b51..77a3080 100644 --- a/robot_sf/sim_config.py +++ b/robot_sf/gym_env/env_config.py @@ -19,7 +19,9 @@ from robot_sf.nav.map_config import MapDefinitionPool from robot_sf.sensor.range_sensor import LidarScannerSettings -from robot_sf.robot.differential_drive import DifferentialDriveSettings, DifferentialDriveRobot +from robot_sf.robot.differential_drive import ( + DifferentialDriveSettings, + DifferentialDriveRobot) from robot_sf.robot.bicycle_drive import BicycleDriveSettings, BicycleDriveRobot from robot_sf.sim.sim_config import SimulationSettings @@ -37,7 +39,8 @@ class EnvSettings: def __post_init__(self): """ - Check if any of the properties are not initialized (None) and raise an error if so. + Check if any of the properties are not initialized (None) and raise an + error if so. """ if not self.sim_config or not self.lidar_config \ or not self.robot_config or not self.map_pool: @@ -45,12 +48,15 @@ def __post_init__(self): def robot_factory(self) -> Union[DifferentialDriveRobot, BicycleDriveRobot]: """ - Factory method to create a robot instance based on the type of robot configuration provided. + Factory method to create a robot instance based on the type of robot + configuration provided. :return: robot instance. """ + if isinstance(self.robot_config, DifferentialDriveSettings): return DifferentialDriveRobot(self.robot_config) elif isinstance(self.robot_config, BicycleDriveSettings): return BicycleDriveRobot(self.robot_config) else: - raise NotImplementedError(f"unsupported robot type {type(self.robot_config)}!") + raise NotImplementedError( + f"unsupported robot type {type(self.robot_config)}!") diff --git a/robot_sf/gym_env/env_util.py b/robot_sf/gym_env/env_util.py new file mode 100644 index 0000000..76a6df7 --- /dev/null +++ b/robot_sf/gym_env/env_util.py @@ -0,0 +1,112 @@ +""" +env_util +""" +from typing import List + +from gymnasium import spaces + +from robot_sf.gym_env.env_config import EnvSettings +from robot_sf.nav.map_config import MapDefinition +from robot_sf.nav.occupancy import ContinuousOccupancy +from robot_sf.sensor.range_sensor import lidar_ray_scan, lidar_sensor_space +from robot_sf.sensor.goal_sensor import target_sensor_obs, target_sensor_space +from robot_sf.sensor.sensor_fusion import fused_sensor_space, SensorFusion +from robot_sf.sim.simulator import Simulator + +def init_collision_and_sensors( + sim: Simulator, + env_config: EnvSettings, + orig_obs_space: spaces.Dict + ): + """ + Initialize collision detection and sensor fusion for the robots in the + simulator. + + Parameters: + sim (Simulator): The simulator object. + env_config (EnvSettings): Configuration settings for the environment. + orig_obs_space (spaces.Dict): Original observation space. + + Returns: + Tuple[List[ContinuousOccupancy], List[SensorFusion]]: + A tuple containing a list of occupancy objects for collision detection + and a list of sensor fusion objects for sensor data handling. + """ + + # Get the number of robots, simulation configuration, + # robot configuration, and lidar configuration + num_robots = len(sim.robots) + sim_config = env_config.sim_config + robot_config = env_config.robot_config + lidar_config = env_config.lidar_config + + # Initialize occupancy objects for each robot for collision detection + occupancies = [ContinuousOccupancy( + sim.map_def.width, sim.map_def.height, + lambda: sim.robot_pos[i], lambda: sim.goal_pos[i], + lambda: sim.pysf_sim.env.obstacles_raw[:, :4], lambda: sim.ped_pos, + robot_config.radius, sim_config.ped_radius, sim_config.goal_radius) + for i in range(num_robots)] + + # Initialize sensor fusion objects for each robot for sensor data handling + sensor_fusions: List[SensorFusion] = [] + for r_id in range(num_robots): + # Define the ray sensor, target sensor, and speed sensor for each robot + ray_sensor = lambda r_id=r_id: lidar_ray_scan( + sim.robots[r_id].pose, occupancies[r_id], lidar_config)[0] + target_sensor = lambda r_id=r_id: target_sensor_obs( + sim.robots[r_id].pose, sim.goal_pos[r_id], sim.next_goal_pos[r_id]) + speed_sensor = lambda r_id=r_id: sim.robots[r_id].current_speed + + # Create the sensor fusion object and add it to the list + sensor_fusions.append(SensorFusion( + ray_sensor, speed_sensor, target_sensor, + orig_obs_space, sim_config.use_next_goal)) + + return occupancies, sensor_fusions + + +def init_spaces(env_config: EnvSettings, map_def: MapDefinition): + """ + Initialize the action and observation spaces for the environment. + + This function creates action and observation space using the factory method + provided in the environment + configuration, and then uses the robot's action space and observation space + as the basis for the environment's action and observation spaces. + The observation space is + further extended with additional sensors. + + Parameters + ---------- + env_config : EnvSettings + The configuration settings for the environment. + map_def : MapDefinition + The definition of the map for the environment. + + Returns + ------- + Tuple[Space, Space, Space] + A tuple containing the action space, the extended observation space, and + the original observation space of the robot. + """ + + # Create a robot using the factory method in the environment configuration + robot = env_config.robot_factory() + + # Get the action space from the robot + action_space = robot.action_space + + # Extend the robot's observation space with additional sensors + observation_space, orig_obs_space = fused_sensor_space( + env_config.sim_config.stack_steps, + robot.observation_space, + target_sensor_space(map_def.max_target_dist), + lidar_sensor_space( + env_config.lidar_config.num_rays, + env_config.lidar_config.max_scan_dist) + ) + + # Return the action space, the extended observation space, and the original + # observation space + return action_space, observation_space, orig_obs_space diff --git a/robot_sf/gym_env/multi_robot_env.py b/robot_sf/gym_env/multi_robot_env.py new file mode 100644 index 0000000..7345ca9 --- /dev/null +++ b/robot_sf/gym_env/multi_robot_env.py @@ -0,0 +1,123 @@ +""" +`MultiRobotEnv`: A class that extends `RobotEnv` to handle multiple robots in the environment. +It overrides the `step_async` method to apply actions to all robots in the environment. +""" + +from typing import Callable, List + +from multiprocessing.pool import ThreadPool + +import numpy as np + +from gymnasium.vector import VectorEnv +from gymnasium import spaces + +from robot_sf.robot.robot_state import RobotState +from robot_sf.gym_env.env_config import EnvSettings +from robot_sf.sensor.sensor_fusion import OBS_RAYS, OBS_DRIVE_STATE +from robot_sf.sim.simulator import init_simulators +from robot_sf.gym_env.reward import simple_reward + +from robot_sf.gym_env.env_util import init_collision_and_sensors, init_spaces + + +class MultiRobotEnv(VectorEnv): + """Representing a Gymnasium environment for training + multiple self-driving robots with reinforcement learning""" + + def __init__( + self, env_config: EnvSettings = EnvSettings(), + reward_func: Callable[[dict], float] = simple_reward, + debug: bool = False, num_robots: int = 1): + + map_def = env_config.map_pool.map_defs["uni_campus_big"] # info: only use first map + action_space, observation_space, orig_obs_space = init_spaces( + env_config, + map_def + ) + super(MultiRobotEnv, self).__init__( + num_robots, + observation_space, + action_space + ) + self.action_space = spaces.Box( + low=np.array( + [self.single_action_space.low for _ in range(num_robots)] + ), + high=np.array( + [self.single_action_space.high for _ in range(num_robots)] + ), + dtype=self.single_action_space.low.dtype) + + self.reward_func, self.debug = reward_func, debug + self.simulators = init_simulators( + env_config, + map_def, + num_robots, + random_start_pos=False + ) + self.states: List[RobotState] = [] + d_t = env_config.sim_config.time_per_step_in_secs + max_ep_time = env_config.sim_config.sim_time_in_secs + + for sim in self.simulators: + occupancies, sensors = init_collision_and_sensors( + sim, env_config, + orig_obs_space) + states = [ + RobotState(nav, occ, sen, d_t, max_ep_time) + for nav, occ, sen in zip(sim.robot_navs, occupancies, sensors) + ] + self.states.extend(states) + + self.sim_worker_pool = ThreadPool(len(self.simulators)) + self.obs_worker_pool = ThreadPool(num_robots) + + def step(self, actions): + actions = [self.simulators[0].robots[0].parse_action(a) for a in actions] + i = 0 + actions_per_simulator = [] + for sim in self.simulators: + num_robots = len(sim.robots) + actions_per_simulator.append(actions[i:i+num_robots]) + i += num_robots + + self.sim_worker_pool.map( + lambda s_a: s_a[0].step_once(s_a[1]), + zip(self.simulators, actions_per_simulator)) + + obs = self.obs_worker_pool.map(lambda s: s.step(), self.states) + + metas = [state.meta_dict() for state in self.states] + masked_metas = [{ "step": meta["step"], "meta": meta } for meta in metas] + masked_metas = (*masked_metas,) + terms = [state.is_terminal for state in self.states] + rewards = [self.reward_func(meta) for meta in metas] + + for i, (sim, state, term) in enumerate( + zip(self.simulators, self.states, terms) + ): + if term: + sim.reset_state() + obs[i] = state.reset() + + obs = { OBS_DRIVE_STATE: np.array([o[OBS_DRIVE_STATE] for o in obs]), + OBS_RAYS: np.array([o[OBS_RAYS] for o in obs])} + + return obs, rewards, terms, masked_metas + + def reset(self): + self.sim_worker_pool.map(lambda sim: sim.reset_state(), self.simulators) + obs = self.obs_worker_pool.map(lambda s: s.reset(), self.states) + + obs = { OBS_DRIVE_STATE: np.array([o[OBS_DRIVE_STATE] for o in obs]), + OBS_RAYS: np.array([o[OBS_RAYS] for o in obs]) } + return obs + + def render(self, robot_id: int=0): + # TODO: add support for PyGame rendering + pass + + def close_extras(self, **kwargs): + self.sim_worker_pool.close() + self.obs_worker_pool.close() \ No newline at end of file diff --git a/robot_sf/gym_env/reward.py b/robot_sf/gym_env/reward.py new file mode 100644 index 0000000..dd0c4dc --- /dev/null +++ b/robot_sf/gym_env/reward.py @@ -0,0 +1,36 @@ +def simple_reward( + meta: dict, + max_episode_step_discount: float=-0.1, + ped_coll_penalty: float=-5, + obst_coll_penalty: float=-2, + reach_waypoint_reward: float=1) -> float: + """ + Calculate the reward for the robot's current state. + + Parameters: + meta (dict): Metadata containing information about the robot's current state. + max_episode_step_discount (float): Discount factor for each step in the episode. + ped_coll_penalty (float): Penalty for colliding with a pedestrian. + obst_coll_penalty (float): Penalty for colliding with an obstacle. + reach_waypoint_reward (float): Reward for reaching a waypoint. + + Returns: + float: The calculated reward. + """ + + # Initialize reward with a discount based on the maximum simulation steps + reward = max_episode_step_discount / meta["max_sim_steps"] + + # If there's a collision with a pedestrian or another robot, apply penalty + if meta["is_pedestrian_collision"] or meta["is_robot_collision"]: + reward += ped_coll_penalty + + # If there's a collision with an obstacle, apply penalty + if meta["is_obstacle_collision"]: + reward += obst_coll_penalty + + # If the robot has reached its goal, apply reward + if meta["is_robot_at_goal"]: + reward += reach_waypoint_reward + + return reward diff --git a/robot_sf/gym_env/robot_env.py b/robot_sf/gym_env/robot_env.py new file mode 100644 index 0000000..54cb561 --- /dev/null +++ b/robot_sf/gym_env/robot_env.py @@ -0,0 +1,233 @@ +""" +`robot_env.py` is a module that defines the simulation environment for a robot or multiple robots. +It includes classes and protocols for defining the robot's state, actions, and +observations within the environment. + +`RobotEnv`: A class that represents the robot's environment. It inherits from `VectorEnv` +from the `gymnasium` library, which is a base class for environments that operate over +vectorized actions and observations. It includes methods for stepping through the environment, +resetting it, rendering it, and closing it. +It also defines the action and observation spaces for the robot. +""" + +from typing import Tuple, Callable +from copy import deepcopy + +import numpy as np + +from gymnasium import Env +from gymnasium.utils import seeding + +from robot_sf.robot.robot_state import RobotState +from robot_sf.gym_env.env_config import EnvSettings +from robot_sf.sensor.range_sensor import lidar_ray_scan + +from robot_sf.sim.sim_view import ( + SimulationView, + VisualizableAction, + VisualizableSimState) +from robot_sf.sim.simulator import init_simulators +from robot_sf.gym_env.reward import simple_reward +from robot_sf.gym_env.env_util import init_collision_and_sensors, init_spaces + +Vec2D = Tuple[float, float] +PolarVec2D = Tuple[float, float] +RobotPose = Tuple[Vec2D, float] + + +class RobotEnv(Env): + """ + Representing a Gymnasium environment for training a self-driving robot + with reinforcement learning. + """ + + def __init__( + self, + env_config: EnvSettings = EnvSettings(), + reward_func: Callable[[dict], float] = simple_reward, + debug: bool = False + ): + """ + Initialize the Robot Environment. + + Parameters: + - env_config (EnvSettings): Configuration for environment settings. + - reward_func (Callable[[dict], float]): Reward function that takes + a dictionary as input and returns a float as reward. + - debug (bool): If True, enables debugging information such as + visualizations. + """ + + # Environment configuration details + self.env_config = env_config + + # Extract first map definition; currently only supports using the first map + map_def = env_config.map_pool.map_defs["uni_campus_big"] + + # Initialize spaces based on the environment configuration and map + self.action_space, self.observation_space, orig_obs_space = \ + init_spaces(env_config, map_def) + + # Assign the reward function and debug flag + self.reward_func = reward_func + self.debug = debug + + # Initialize simulator with a random start position + self.simulator = init_simulators( + env_config, + map_def, + random_start_pos=True + )[0] + + # Delta time per simulation step and maximum episode time + d_t = env_config.sim_config.time_per_step_in_secs + max_ep_time = env_config.sim_config.sim_time_in_secs + + # Initialize collision detectors and sensor data processors + occupancies, sensors = init_collision_and_sensors( + self.simulator, + env_config, + orig_obs_space + ) + + # Setup initial state of the robot + self.state = RobotState( + self.simulator.robot_navs[0], + occupancies[0], + sensors[0], + d_t, + max_ep_time) + + # Store last action executed by the robot + self.last_action = None + + # If in debug mode, create a simulation view to visualize the state + if debug: + self.sim_ui = SimulationView( + scaling=10, + obstacles=map_def.obstacles, + robot_radius=env_config.robot_config.radius, + ped_radius=env_config.sim_config.ped_radius, + goal_radius=env_config.sim_config.goal_radius) + + # Display the simulation UI + self.sim_ui.show() + + def step(self, action): + """ + Execute one time step within the environment. + + Parameters: + - action: Action to be executed. + + Returns: + - obs: Observation after taking the action. + - reward: Calculated reward for the taken action. + - term: Boolean indicating if the episode has terminated. + - info: Additional information as dictionary. + """ + # Process the action through the simulator + action = self.simulator.robots[0].parse_action(action) + self.last_action = action + # Perform simulation step + self.simulator.step_once([action]) + # Get updated observation + obs = self.state.step() + # Fetch metadata about the current state + meta = self.state.meta_dict() + # Determine if the episode has reached terminal state + term = self.state.is_terminal + # Compute the reward using the provided reward function + reward = self.reward_func(meta) + return obs, reward, term, {"step": meta["step"], "meta": meta} + + def reset(self): + """ + Reset the environment state to start a new episode. + + Returns: + - obs: The initial observation after resetting the environment. + """ + # Reset internal simulator state + self.simulator.reset_state() + # Reset the environment's state and return the initial observation + obs = self.state.reset() + return obs + + def render(self): + """ + Render the environment visually if in debug mode. + + Raises RuntimeError if debug mode is not enabled. + """ + if not self.sim_ui: + raise RuntimeError( + 'Debug mode is not activated! Consider setting ' + 'debug=True!') + + # Prepare action visualization, if any action was executed + action = None if not self.last_action else VisualizableAction( + self.simulator.robot_poses[0], + self.last_action, + self.simulator.goal_pos[0]) + + # Robot position and LIDAR scanning visualization preparation + robot_pos = self.simulator.robot_poses[0][0] + distances, directions = lidar_ray_scan( + self.simulator.robot_poses[0], + self.state.occupancy, + self.env_config.lidar_config) + + # Construct ray vectors for visualization + ray_vecs = zip( + np.cos(directions) * distances, + np.sin(directions) * distances + ) + ray_vecs_np = np.array([[ + [robot_pos[0], robot_pos[1]], + [robot_pos[0] + x, robot_pos[1] + y] + ] for x, y in ray_vecs] + ) + + # Prepare pedestrian action visualization + ped_actions = zip( + self.simulator.pysf_sim.peds.pos(), + self.simulator.pysf_sim.peds.pos() + + self.simulator.pysf_sim.peds.vel() * 2) + ped_actions_np = np.array([[pos, vel] for pos, vel in ped_actions]) + + # Package the state for visualization + state = VisualizableSimState( + self.state.timestep, action, self.simulator.robot_poses[0], + deepcopy(self.simulator.ped_pos), ray_vecs_np, ped_actions_np) + + # Execute rendering of the state through the simulation UI + self.sim_ui.render(state) + + def seed(self, seed=None): + """ + Set the seed for this env's random number generator(s). + + Note: + Some environments use multiple pseudorandom number generators. + We want to capture all such seeds used in order to ensure that + there aren't accidental correlations between multiple generators. + + Returns: + list: Returns the list of seeds used in this env's random + number generators. The first value in the list should be the + "main" seed, or the value which a reproducer should pass to + 'seed'. Often, the main seed equals the provided 'seed', but + this won't be true if seed=None, for example. + + TODO: validate this method + """ + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def exit(self): + """ + Clean up and exit the simulation UI, if it exists. + """ + if self.sim_ui: + self.sim_ui.exit() diff --git a/robot_sf/gym_env/simple_robot_env.py b/robot_sf/gym_env/simple_robot_env.py new file mode 100644 index 0000000..bad8e35 --- /dev/null +++ b/robot_sf/gym_env/simple_robot_env.py @@ -0,0 +1,82 @@ +""" +Module to write a simple robot environment that is compatible with gymnasium. +The focus of this module is to use more "of-the-shelf" components from sb3 and +to be natively compatible with gymnasium. +""" + +import gymnasium + +from robot_sf.gym_env.env_config import EnvSettings + + + +class SimpleRobotEnv(gymnasium.Env): + """ + A simple robot environment based on gymnasium.Env. + """ + + def __init__( + self, + env_config: EnvSettings = EnvSettings() + ): + + self.info = {} + + # Action space + # TODO: Action space depends on the robot type? Differential drive or bicycle model? + self.action_space = self._define_action_space() + + # Observation Space + self.observation_space = self._define_observation_space() + + def _define_action_space(self): + """ + Action Space: + - 2 actions: move forward, turn + """ + return gymnasium.spaces.Discrete(2) + + + def _define_observation_space(self): + """ + Observation Space: + - x, y, theta, v_x, v_y, omega + - goal_x, goal_y + - distance and angle to goal + - lidar observations + """ + return gymnasium.spaces.Discrete(5) + + def _get_observation(self): + """ + Get the current observation + """ + return self.observation + + def _get_info(self): + """ + Get the current info + """ + return self.info + + + def step(self, action): + pass + + def reset(self, seed=None): + # seed for self.np_random + super().reset(seed=seed) + + + + + observation = self_get_observation() + info = self._get_info() + return observation, info + + def render(self, mode='human'): + pass + + def close(self): + pass + diff --git a/robot_sf/nav/map_config.py b/robot_sf/nav/map_config.py index baa6f54..5e60d8b 100644 --- a/robot_sf/nav/map_config.py +++ b/robot_sf/nav/map_config.py @@ -220,8 +220,10 @@ class MapDefinition: def __post_init__(self): """ - Validates the map definition and initializes the obstacles_pysf and robot_routes_by_spawn_id attributes. - Raises a ValueError if the width or height is less than 0, if the robot spawn zones or goal zones are empty, + Validates the map definition and initializes the obstacles_pysf and + robot_routes_by_spawn_id attributes. + Raises a ValueError if the width or height is less than 0, + if the robot spawn zones or goal zones are empty, or if the bounds are not exactly 4. """ obstacle_lines = [line for o in self.obstacles for line in o.lines] @@ -284,8 +286,8 @@ class MapDefinitionPool: ---------- maps_folder : str The directory where the map files are located. - map_defs : List[MapDefinition] - The list of map definitions. + map_defs : Dict[str, MapDefinition] + The dictionary of map definitions, with the map name as the key. Methods ------- @@ -296,7 +298,7 @@ class MapDefinitionPool: """ maps_folder: str = os.path.join(os.path.dirname(os.path.dirname(__file__)), "maps") - map_defs: List[MapDefinition] = field(default_factory=list) + map_defs: Dict[str, MapDefinition] = field(default_factory=dict) def __post_init__(self): """ @@ -322,7 +324,9 @@ def load_json(path: str) -> dict: map_files = [os.path.join(self.maps_folder, f) for f in os.listdir(self.maps_folder)] # Load the map definitions from the files - self.map_defs = [serialize_map(load_json(f)) for f in map_files] + self.map_defs = { + os.path.splitext(os.path.basename(f))[0]: serialize_map(load_json(f)) + for f in map_files} # If map_defs is still empty, raise an error if not self.map_defs: @@ -338,7 +342,7 @@ def choose_random_map(self) -> MapDefinition: A random map definition. """ - return random.choice(self.map_defs) + return random.choice(list(self.map_defs.values())) def serialize_map(map_structure: dict) -> MapDefinition: diff --git a/robot_sf/nav/occupancy.py b/robot_sf/nav/occupancy.py index 0c3466e..248a5a9 100644 --- a/robot_sf/nav/occupancy.py +++ b/robot_sf/nav/occupancy.py @@ -17,9 +17,11 @@ def is_circle_circle_intersection(c_1: Circle2D, c_2: Circle2D) -> bool: Parameters ---------- c_1 : Circle2D - The first circle, represented as a tuple of the center coordinates and the radius. + The first circle, represented as a tuple of the center coordinates and + the radius. c_2 : Circle2D - The second circle, represented as a tuple of the center coordinates and the radius. + The second circle, represented as a tuple of the center coordinates and + the radius. Returns ------- diff --git a/robot_sf/packages.svg b/robot_sf/packages.svg index ed8a54a..de66af5 100644 --- a/robot_sf/packages.svg +++ b/robot_sf/packages.svg @@ -4,412 +4,334 @@ - - + + packages - + robot_sf - -robot_sf + +robot_sf robot_sf.eval - -robot_sf.eval + +robot_sf.eval robot_sf.feature_extractor - -robot_sf.feature_extractor - - - -robot_sf.robot_env - -robot_sf.robot_env - - - -robot_sf.feature_extractor->robot_sf.robot_env - - + +robot_sf.feature_extractor robot_sf.nav - -robot_sf.nav + +robot_sf.nav robot_sf.nav.map_config - -robot_sf.nav.map_config + +robot_sf.nav.map_config robot_sf.nav.navigation - -robot_sf.nav.navigation + +robot_sf.nav.navigation - + robot_sf.nav.navigation->robot_sf.nav.map_config - - + + robot_sf.ped_npc.ped_zone - -robot_sf.ped_npc.ped_zone + +robot_sf.ped_npc.ped_zone - + robot_sf.nav.navigation->robot_sf.ped_npc.ped_zone - - + + robot_sf.nav.occupancy - -robot_sf.nav.occupancy + +robot_sf.nav.occupancy robot_sf.ped_npc - -robot_sf.ped_npc + +robot_sf.ped_npc robot_sf.ped_npc.ped_behavior - -robot_sf.ped_npc.ped_behavior + +robot_sf.ped_npc.ped_behavior - + robot_sf.ped_npc.ped_behavior->robot_sf.nav.map_config - - + + - + robot_sf.ped_npc.ped_behavior->robot_sf.nav.navigation - - + + robot_sf.ped_npc.ped_grouping - -robot_sf.ped_npc.ped_grouping + +robot_sf.ped_npc.ped_grouping - + robot_sf.ped_npc.ped_behavior->robot_sf.ped_npc.ped_grouping - - + + - + robot_sf.ped_npc.ped_behavior->robot_sf.ped_npc.ped_zone - - + + robot_sf.ped_npc.ped_population - -robot_sf.ped_npc.ped_population + +robot_sf.ped_npc.ped_population - + robot_sf.ped_npc.ped_population->robot_sf.nav.map_config - - + + - + robot_sf.ped_npc.ped_population->robot_sf.ped_npc.ped_behavior - - + + - + robot_sf.ped_npc.ped_population->robot_sf.ped_npc.ped_grouping - - + + - + robot_sf.ped_npc.ped_population->robot_sf.ped_npc.ped_zone - - + + robot_sf.ped_npc.ped_robot_force - -robot_sf.ped_npc.ped_robot_force + +robot_sf.ped_npc.ped_robot_force robot_sf.robot - -robot_sf.robot + +robot_sf.robot robot_sf.robot.bicycle_drive - -robot_sf.robot.bicycle_drive + +robot_sf.robot.bicycle_drive robot_sf.robot.differential_drive - -robot_sf.robot.differential_drive + +robot_sf.robot.differential_drive - + + +robot_sf.robot.robot_state + +robot_sf.robot.robot_state + + + +robot_sf.robot.robot_state->robot_sf.nav.navigation + + + + -robot_sf.robot_env->robot_sf.nav.map_config - - +robot_sf.robot.robot_state->robot_sf.nav.occupancy + + + + + +robot_sf.sensor.sensor_fusion + +robot_sf.sensor.sensor_fusion - + -robot_sf.robot_env->robot_sf.nav.navigation - - +robot_sf.robot.robot_state->robot_sf.sensor.sensor_fusion + + - - -robot_sf.robot_env->robot_sf.nav.occupancy - - + + +robot_sf.sensor + +robot_sf.sensor robot_sf.sensor.goal_sensor - -robot_sf.sensor.goal_sensor - - - -robot_sf.robot_env->robot_sf.sensor.goal_sensor - - + +robot_sf.sensor.goal_sensor robot_sf.sensor.range_sensor - -robot_sf.sensor.range_sensor - - - -robot_sf.robot_env->robot_sf.sensor.range_sensor - - - - - -robot_sf.sensor.sensor_fusion - -robot_sf.sensor.sensor_fusion - - - -robot_sf.robot_env->robot_sf.sensor.sensor_fusion - - - - - -robot_sf.sim.sim_view - -robot_sf.sim.sim_view - - - -robot_sf.robot_env->robot_sf.sim.sim_view - - - - - -robot_sf.sim.simulator - -robot_sf.sim.simulator - - - -robot_sf.robot_env->robot_sf.sim.simulator - - - - - -robot_sf.sim_config - -robot_sf.sim_config - - - -robot_sf.robot_env->robot_sf.sim_config - - - - - -robot_sf.sensor - -robot_sf.sensor + +robot_sf.sensor.range_sensor - + robot_sf.sensor.range_sensor->robot_sf.nav.occupancy - - + + robot_sf.sim - -robot_sf.sim + +robot_sf.sim robot_sf.sim.sim_config - -robot_sf.sim.sim_config + +robot_sf.sim.sim_config - + robot_sf.sim.sim_config->robot_sf.ped_npc.ped_robot_force - - + + + + + +robot_sf.sim.sim_view + +robot_sf.sim.sim_view - + robot_sf.sim.sim_view->robot_sf.nav.map_config - - + + - + robot_sf.sim.sim_view->robot_sf.robot.bicycle_drive - - + + - + robot_sf.sim.sim_view->robot_sf.robot.differential_drive - - + + + + + +robot_sf.sim.simulator + +robot_sf.sim.simulator - + robot_sf.sim.simulator->robot_sf.nav.map_config - - + + - + robot_sf.sim.simulator->robot_sf.nav.navigation - - + + - + robot_sf.sim.simulator->robot_sf.ped_npc.ped_behavior - - + + - + robot_sf.sim.simulator->robot_sf.ped_npc.ped_grouping - - + + - + robot_sf.sim.simulator->robot_sf.ped_npc.ped_population - - + + - + robot_sf.sim.simulator->robot_sf.ped_npc.ped_robot_force - - + + - + robot_sf.sim.simulator->robot_sf.robot.bicycle_drive - - + + - + robot_sf.sim.simulator->robot_sf.robot.differential_drive - - - - - -robot_sf.sim.simulator->robot_sf.sim_config - - - - - -robot_sf.sim_config->robot_sf.nav.map_config - - - - - -robot_sf.sim_config->robot_sf.robot.bicycle_drive - - - - - -robot_sf.sim_config->robot_sf.robot.differential_drive - - - - - -robot_sf.sim_config->robot_sf.sensor.range_sensor - - - - - -robot_sf.sim_config->robot_sf.sim.sim_config - - + + + + + +robot_sf.simple_robot_env + +robot_sf.simple_robot_env robot_sf.tb_logging - -robot_sf.tb_logging + +robot_sf.tb_logging - + robot_sf.tb_logging->robot_sf.eval - - + + diff --git a/robot_sf/robot/robot_state.py b/robot_sf/robot/robot_state.py new file mode 100644 index 0000000..fef11af --- /dev/null +++ b/robot_sf/robot/robot_state.py @@ -0,0 +1,124 @@ +""" +`RobotState`: A data class that represents the state of a robot in the simulation environment. +It includes information about navigation, occupancy (for collision detection), +sensor fusion, and simulation time. It also tracks various conditions such as collision states, +timeout condition, simulation time elapsed, and timestep count. +""" + +from math import ceil +from dataclasses import dataclass, field + +from robot_sf.nav.navigation import RouteNavigator +from robot_sf.sensor.sensor_fusion import SensorFusion +from robot_sf.nav.occupancy import ContinuousOccupancy + +@dataclass +class RobotState: + """ + Represents the state of a robot in a simulated environment, + including information about navigation, + occupancy, sensor fusion, and simulation time. + + Attributes: + nav (RouteNavigator): Object managing navigation and route planning. + occupancy (ContinuousOccupancy): Object tracking spatial occupation, + for collision detection. + sensors (SensorFusion): Object for managing sensor data and fusion. + d_t (float): The simulation timestep duration. + sim_time_limit (float): The maximum allowed simulation time. + + Additional attributes with default values initialized to False or 0 representing various + conditions such as collision states, timeout condition, simulation time elapsed, + and timestep count. + These are updated during the simulation. + """ + nav: RouteNavigator + occupancy: ContinuousOccupancy + sensors: SensorFusion + d_t: float + sim_time_limit: float + episode: int = field(init=False, default=0) + is_at_goal: bool = field(init=False, default=False) + is_collision_with_ped: bool = field(init=False, default=False) + is_collision_with_obst: bool = field(init=False, default=False) + is_collision_with_robot: bool = field(init=False, default=False) + is_timeout: bool = field(init=False, default=False) + sim_time_elapsed: float = field(init=False, default=0.0) + timestep: int = field(init=False, default=0) + + @property + def max_sim_steps(self) -> int: + """Calculates the maximum number of simulation steps based on time limit.""" + return int(ceil(self.sim_time_limit / self.d_t)) + + @property + def is_terminal(self) -> bool: + """ + Checks if the current state is terminal, i.e., if the robot has reached its goal, + timed out, or collided with any object or other robots. + """ + return (self.is_at_goal or self.is_timeout or self.is_collision_with_robot or + self.is_collision_with_ped or self.is_collision_with_obst) + + @property + def is_waypoint_complete(self) -> bool: + """Indicates whether the current waypoint has been reached.""" + return self.nav.reached_waypoint + + @property + def is_route_complete(self) -> bool: + """Indicates whether the entire planned route has been completed.""" + return self.nav.reached_destination + + def reset(self): + """ + Resets the robot's state for a new simulation episode, incrementing the episode counter, + resetting the timestep and elapsed time, clearing collision and goal flags, and refreshing + sensor data for the initial observation. + """ + self.episode += 1 + self.timestep = 0 + self.sim_time_elapsed = 0.0 + self.is_collision_with_ped = False + self.is_collision_with_obst = False + self.is_collision_with_robot = False + self.is_at_goal = False + self.is_timeout = False + self.sensors.reset_cache() + return self.sensors.next_obs() + + def step(self): + """ + Advances the robot's state by one simulation timestep, updating the elapsed time, + checking for collisions, goal achievement, and timeout. Returns the next observation + from sensors. + """ + # TODO: add check for robot-robot collisions as well + self.timestep += 1 + self.sim_time_elapsed += self.d_t + self.is_collision_with_ped = self.occupancy.is_pedestrian_collision + self.is_collision_with_obst = self.occupancy.is_obstacle_collision + self.is_collision_with_robot = self.occupancy.is_robot_robot_collision + self.is_at_goal = self.occupancy.is_robot_at_goal + self.is_timeout = self.sim_time_elapsed > self.sim_time_limit + return self.sensors.next_obs() + + def meta_dict(self) -> dict: + """ + Compiles a dictionary of metadata about the robot's state for logging or + monitoring purposes. + Includes information such as episode number, current timestep, collision status, + goal achievement, and completion status of the route. + """ + return { + "step": self.episode * self.max_sim_steps, + "episode": self.episode, + "step_of_episode": self.timestep, + "is_pedestrian_collision": self.is_collision_with_ped, + "is_robot_collision": self.is_collision_with_robot, + "is_obstacle_collision": self.is_collision_with_obst, + "is_robot_at_goal": self.is_waypoint_complete, + "is_route_complete": self.is_route_complete, + "is_timesteps_exceeded": self.is_timeout, + "max_sim_steps": self.max_sim_steps + } diff --git a/robot_sf/robot_env.py b/robot_sf/robot_env.py deleted file mode 100644 index 923af19..0000000 --- a/robot_sf/robot_env.py +++ /dev/null @@ -1,641 +0,0 @@ -""" -`robot_env.py` is a module that defines the simulation environment for a robot or multiple robots. -It includes classes and protocols for defining the robot's state, actions, and -observations within the environment. - -Key components of this module include: - -1. `Robot`: A protocol that outlines the necessary properties and methods a robot should have. -These include observation space, action space, and methods to apply actions, -reset state, and parse actions. - -2. `RobotState`: A data class that represents the state of a robot in the simulation environment. -It includes information about navigation, occupancy (for collision detection), -sensor fusion, and simulation time. It also tracks various conditions such as collision states, -timeout condition, simulation time elapsed, and timestep count. - -3. `RobotEnv`: A class that represents the robot's environment. It inherits from `VectorEnv` -from the `gymnasium` library, which is a base class for environments that operate over -vectorized actions and observations. It includes methods for stepping through the environment, -resetting it, rendering it, and closing it. -It also defines the action and observation spaces for the robot. - -4. `MultiRobotEnv`: A class that extends `RobotEnv` to handle multiple robots in the environment. -It overrides the `step_async` method to apply actions to all robots in the environment. -""" - -from math import ceil -from typing import Tuple, Callable, List, Protocol, Any -from dataclasses import dataclass, field -from copy import deepcopy -from multiprocessing.pool import ThreadPool - -import numpy as np -from gymnasium.vector import VectorEnv -from gymnasium import Env, spaces -from gymnasium.utils import seeding -from robot_sf.nav.map_config import MapDefinition -from robot_sf.nav.navigation import RouteNavigator - -from robot_sf.sim_config import EnvSettings -from robot_sf.nav.occupancy import ContinuousOccupancy -from robot_sf.sensor.range_sensor import lidar_ray_scan, lidar_sensor_space -from robot_sf.sensor.goal_sensor import target_sensor_obs, target_sensor_space -from robot_sf.sensor.sensor_fusion import ( - fused_sensor_space, SensorFusion, OBS_RAYS, OBS_DRIVE_STATE) -from robot_sf.sim.sim_view import SimulationView, VisualizableAction, VisualizableSimState -from robot_sf.sim.simulator import Simulator - - -Vec2D = Tuple[float, float] -PolarVec2D = Tuple[float, float] -RobotPose = Tuple[Vec2D, float] - - -class Robot(Protocol): - @property - def observation_space(self) -> spaces.Box: - raise NotImplementedError() - - @property - def action_space(self) -> spaces.Box: - raise NotImplementedError() - - @property - def pos(self) -> Vec2D: - raise NotImplementedError() - - @property - def pose(self) -> RobotPose: - raise NotImplementedError() - - @property - def current_speed(self) -> PolarVec2D: - raise NotImplementedError() - - def apply_action(self, action: Any, d_t: float): - raise NotImplementedError() - - def reset_state(self, new_pose: RobotPose): - raise NotImplementedError() - - def parse_action(self, action: Any) -> Any: - raise NotImplementedError() - - -@dataclass -class RobotState: - """ - Represents the state of a robot in a simulated environment, - including information about navigation, - occupancy, sensor fusion, and simulation time. - - Attributes: - nav (RouteNavigator): Object managing navigation and route planning. - occupancy (ContinuousOccupancy): Object tracking spatial occupation, - for collision detection. - sensors (SensorFusion): Object for managing sensor data and fusion. - d_t (float): The simulation timestep duration. - sim_time_limit (float): The maximum allowed simulation time. - - Additional attributes with default values initialized to False or 0 representing various - conditions such as collision states, timeout condition, simulation time elapsed, - and timestep count. - These are updated during the simulation. - """ - nav: RouteNavigator - occupancy: ContinuousOccupancy - sensors: SensorFusion - d_t: float - sim_time_limit: float - episode: int = field(init=False, default=0) - is_at_goal: bool = field(init=False, default=False) - is_collision_with_ped: bool = field(init=False, default=False) - is_collision_with_obst: bool = field(init=False, default=False) - is_collision_with_robot: bool = field(init=False, default=False) - is_timeout: bool = field(init=False, default=False) - sim_time_elapsed: float = field(init=False, default=0.0) - timestep: int = field(init=False, default=0) - - @property - def max_sim_steps(self) -> int: - """Calculates the maximum number of simulation steps based on time limit.""" - return int(ceil(self.sim_time_limit / self.d_t)) - - @property - def is_terminal(self) -> bool: - """ - Checks if the current state is terminal, i.e., if the robot has reached its goal, - timed out, or collided with any object or other robots. - """ - return (self.is_at_goal or self.is_timeout or self.is_collision_with_robot or - self.is_collision_with_ped or self.is_collision_with_obst) - - @property - def is_waypoint_complete(self) -> bool: - """Indicates whether the current waypoint has been reached.""" - return self.nav.reached_waypoint - - @property - def is_route_complete(self) -> bool: - """Indicates whether the entire planned route has been completed.""" - return self.nav.reached_destination - - def reset(self): - """ - Resets the robot's state for a new simulation episode, incrementing the episode counter, - resetting the timestep and elapsed time, clearing collision and goal flags, and refreshing - sensor data for the initial observation. - """ - self.episode += 1 - self.timestep = 0 - self.sim_time_elapsed = 0.0 - self.is_collision_with_ped = False - self.is_collision_with_obst = False - self.is_collision_with_robot = False - self.is_at_goal = False - self.is_timeout = False - self.sensors.reset_cache() - return self.sensors.next_obs() - - def step(self): - """ - Advances the robot's state by one simulation timestep, updating the elapsed time, - checking for collisions, goal achievement, and timeout. Returns the next observation - from sensors. - """ - # TODO: add check for robot-robot collisions as well - self.timestep += 1 - self.sim_time_elapsed += self.d_t - self.is_collision_with_ped = self.occupancy.is_pedestrian_collision - self.is_collision_with_obst = self.occupancy.is_obstacle_collision - self.is_collision_with_robot = self.occupancy.is_robot_robot_collision - self.is_at_goal = self.occupancy.is_robot_at_goal - self.is_timeout = self.sim_time_elapsed > self.sim_time_limit - return self.sensors.next_obs() - - def meta_dict(self) -> dict: - """ - Compiles a dictionary of metadata about the robot's state for logging or - monitoring purposes. - Includes information such as episode number, current timestep, collision status, - goal achievement, and completion status of the route. - """ - return { - "step": self.episode * self.max_sim_steps, - "episode": self.episode, - "step_of_episode": self.timestep, - "is_pedestrian_collision": self.is_collision_with_ped, - "is_robot_collision": self.is_collision_with_robot, - "is_obstacle_collision": self.is_collision_with_obst, - "is_robot_at_goal": self.is_waypoint_complete, - "is_route_complete": self.is_route_complete, - "is_timesteps_exceeded": self.is_timeout, - "max_sim_steps": self.max_sim_steps - } - - -def simple_reward( - meta: dict, - max_episode_step_discount: float=-0.1, - ped_coll_penalty: float=-5, - obst_coll_penalty: float=-2, - reach_waypoint_reward: float=1) -> float: - """ - Calculate the reward for the robot's current state. - - Parameters: - meta (dict): Metadata containing information about the robot's current state. - max_episode_step_discount (float): Discount factor for each step in the episode. - ped_coll_penalty (float): Penalty for colliding with a pedestrian. - obst_coll_penalty (float): Penalty for colliding with an obstacle. - reach_waypoint_reward (float): Reward for reaching a waypoint. - - Returns: - float: The calculated reward. - """ - - # Initialize reward with a discount based on the maximum simulation steps - reward = max_episode_step_discount / meta["max_sim_steps"] - - # If there's a collision with a pedestrian or another robot, apply penalty - if meta["is_pedestrian_collision"] or meta["is_robot_collision"]: - reward += ped_coll_penalty - - # If there's a collision with an obstacle, apply penalty - if meta["is_obstacle_collision"]: - reward += obst_coll_penalty - - # If the robot has reached its goal, apply reward - if meta["is_robot_at_goal"]: - reward += reach_waypoint_reward - - return reward - - -def init_simulators( - env_config: EnvSettings, - map_def: MapDefinition, - num_robots: int = 1, - random_start_pos: bool = True - ) -> List[Simulator]: - """ - Initialize simulators for the robot environment. - - Parameters: - env_config (EnvSettings): Configuration settings for the environment. - map_def (MapDefinition): Definition of the map for the environment. - num_robots (int): Number of robots in the environment. - random_start_pos (bool): Whether to start the robots at random positions. - - Returns: - List[Simulator]: A list of initialized Simulator objects. - """ - - # Calculate the number of simulators needed based on the number of robots and start positions - num_sims = ceil(num_robots / map_def.num_start_pos) - - # Calculate the proximity to the goal based on the robot radius and goal radius - goal_proximity = env_config.robot_config.radius + env_config.sim_config.goal_radius - - # Initialize an empty list to hold the simulators - sims: List[Simulator] = [] - - # Create the required number of simulators - for i in range(num_sims): - # Determine the number of robots for this simulator - n = map_def.num_start_pos if i < num_sims - 1 \ - else max(1, num_robots % map_def.num_start_pos) - - # Create the robots for this simulator - sim_robots = [env_config.robot_factory() for _ in range(n)] - - # Create the simulator with the robots and add it to the list - sim = Simulator( - env_config.sim_config, map_def, sim_robots, - goal_proximity, random_start_pos) - sims.append(sim) - - return sims - - -def init_collision_and_sensors( - sim: Simulator, env_config: EnvSettings, orig_obs_space: spaces.Dict): - """ - Initialize collision detection and sensor fusion for the robots in the simulator. - - Parameters: - sim (Simulator): The simulator object. - env_config (EnvSettings): Configuration settings for the environment. - orig_obs_space (spaces.Dict): Original observation space. - - Returns: - Tuple[List[ContinuousOccupancy], List[SensorFusion]]: - A tuple containing a list of occupancy objects for collision detection - and a list of sensor fusion objects for sensor data handling. - """ - - # Get the number of robots, simulation configuration, - # robot configuration, and lidar configuration - num_robots = len(sim.robots) - sim_config = env_config.sim_config - robot_config = env_config.robot_config - lidar_config = env_config.lidar_config - - # Initialize occupancy objects for each robot for collision detection - occupancies = [ContinuousOccupancy( - sim.map_def.width, sim.map_def.height, - lambda: sim.robot_pos[i], lambda: sim.goal_pos[i], - lambda: sim.pysf_sim.env.obstacles_raw[:, :4], lambda: sim.ped_pos, - robot_config.radius, sim_config.ped_radius, sim_config.goal_radius) - for i in range(num_robots)] - - # Initialize sensor fusion objects for each robot for sensor data handling - sensor_fusions: List[SensorFusion] = [] - for r_id in range(num_robots): - # Define the ray sensor, target sensor, and speed sensor for each robot - ray_sensor = lambda r_id=r_id: lidar_ray_scan( - sim.robots[r_id].pose, occupancies[r_id], lidar_config)[0] - target_sensor = lambda r_id=r_id: target_sensor_obs( - sim.robots[r_id].pose, sim.goal_pos[r_id], sim.next_goal_pos[r_id]) - speed_sensor = lambda r_id=r_id: sim.robots[r_id].current_speed - - # Create the sensor fusion object and add it to the list - sensor_fusions.append(SensorFusion( - ray_sensor, speed_sensor, target_sensor, - orig_obs_space, sim_config.use_next_goal)) - - return occupancies, sensor_fusions - - -def init_spaces(env_config: EnvSettings, map_def: MapDefinition): - """ - Initialize the action and observation spaces for the environment. - - This function creates action and observation space using the factory method - provided in the environment - configuration, and then uses the robot's action space and observation space as the - basis for the environment's action and observation spaces. The observation space is - further extended with additional sensors. - - Parameters - ---------- - env_config : EnvSettings - The configuration settings for the environment. - map_def : MapDefinition - The definition of the map for the environment. - - Returns - ------- - Tuple[Space, Space, Space] - A tuple containing the action space, the extended observation space, and the - original observation space of the robot. - """ - # Create a robot using the factory method in the environment configuration - robot = env_config.robot_factory() - # Get the action space from the robot - action_space = robot.action_space - - # Extend the robot's observation space with additional sensors - observation_space, orig_obs_space = fused_sensor_space( - env_config.sim_config.stack_steps, - robot.observation_space, - target_sensor_space(map_def.max_target_dist), - lidar_sensor_space( - env_config.lidar_config.num_rays, - env_config.lidar_config.max_scan_dist) - ) - - # Return the action space, the extended observation space, and the original - # observation space - return action_space, observation_space, orig_obs_space - - -class RobotEnv(Env): - """ - Representing a Gymnasium environment for training a self-driving robot - with reinforcement learning. - """ - - def __init__( - self, - env_config: EnvSettings = EnvSettings(), - reward_func: Callable[[dict], float] = simple_reward, - debug: bool = False - ): - """ - Initialize the Robot Environment. - - Parameters: - - env_config (EnvSettings): Configuration for environment settings. - - reward_func (Callable[[dict], float]): Reward function that takes - a dictionary as input and returns a float as reward. - - debug (bool): If True, enables debugging information such as - visualizations. - """ - - # Environment configuration details - self.env_config = env_config - # Extract first map definition; currently only supports using the first map - map_def = env_config.map_pool.map_defs[0] - - # Initialize spaces based on the environment configuration and map - self.action_space, self.observation_space, orig_obs_space = \ - init_spaces(env_config, map_def) - - # Assign the reward function and debug flag - self.reward_func, self.debug = reward_func, debug - - # Initialize simulator with a random start position - self.simulator = init_simulators( - env_config, - map_def, - random_start_pos=True - )[0] - - # Delta time per simulation step and maximum episode time - d_t = env_config.sim_config.time_per_step_in_secs - max_ep_time = env_config.sim_config.sim_time_in_secs - - # Initialize collision detectors and sensor data processors - occupancies, sensors = init_collision_and_sensors( - self.simulator, - env_config, - orig_obs_space - ) - - # Setup initial state of the robot - self.state = RobotState( - self.simulator.robot_navs[0], - occupancies[0], - sensors[0], - d_t, - max_ep_time) - - # Store last action executed by the robot - self.last_action = None - - # If in debug mode, create a simulation view to visualize the state - if debug: - self.sim_ui = SimulationView( - scaling=10, - obstacles=map_def.obstacles, - robot_radius=env_config.robot_config.radius, - ped_radius=env_config.sim_config.ped_radius, - goal_radius=env_config.sim_config.goal_radius) - - # Display the simulation UI - self.sim_ui.show() - - def step(self, action): - """ - Execute one time step within the environment. - - Parameters: - - action: Action to be executed. - - Returns: - - obs: Observation after taking the action. - - reward: Calculated reward for the taken action. - - term: Boolean indicating if the episode has terminated. - - info: Additional information as dictionary. - """ - # Process the action through the simulator - action = self.simulator.robots[0].parse_action(action) - self.last_action = action - # Perform simulation step - self.simulator.step_once([action]) - # Get updated observation - obs = self.state.step() - # Fetch metadata about the current state - meta = self.state.meta_dict() - # Determine if the episode has reached terminal state - term = self.state.is_terminal - # Compute the reward using the provided reward function - reward = self.reward_func(meta) - return obs, reward, term, {"step": meta["step"], "meta": meta} - - def reset(self): - """ - Reset the environment state to start a new episode. - - Returns: - - obs: The initial observation after resetting the environment. - """ - # Reset internal simulator state - self.simulator.reset_state() - # Reset the environment's state and return the initial observation - obs = self.state.reset() - return obs - - def render(self): - """ - Render the environment visually if in debug mode. - - Raises RuntimeError if debug mode is not enabled. - """ - if not self.sim_ui: - raise RuntimeError( - 'Debug mode is not activated! Consider setting ' - 'debug=True!') - - # Prepare action visualization, if any action was executed - action = None if not self.last_action else VisualizableAction( - self.simulator.robot_poses[0], self.last_action, - self.simulator.goal_pos[0]) - - # Robot position and LIDAR scanning visualization preparation - robot_pos = self.simulator.robot_poses[0][0] - distances, directions = lidar_ray_scan( - self.simulator.robot_poses[0], self.state.occupancy, - self.env_config.lidar_config) - - # Construct ray vectors for visualization - ray_vecs = zip(np.cos(directions) * distances, np.sin(directions) * distances) - ray_vecs_np = np.array([[ - [robot_pos[0], robot_pos[1]], - [robot_pos[0] + x, robot_pos[1] + y] - ] for x, y in ray_vecs]) - - # Prepare pedestrian action visualization - ped_actions = zip( - self.simulator.pysf_sim.peds.pos(), - self.simulator.pysf_sim.peds.pos() + - self.simulator.pysf_sim.peds.vel() * 2) - ped_actions_np = np.array([[pos, vel] for pos, vel in ped_actions]) - - # Package the state for visualization - state = VisualizableSimState( - self.state.timestep, action, self.simulator.robot_poses[0], - deepcopy(self.simulator.ped_pos), ray_vecs_np, ped_actions_np) - - # Execute rendering of the state through the simulation UI - self.sim_ui.render(state) - - def seed(self, seed=None): - """ - Set the seed for this env's random number generator(s). - - Note: - Some environments use multiple pseudorandom number generators. - We want to capture all such seeds used in order to ensure that - there aren't accidental correlations between multiple generators. - - Returns: - list: Returns the list of seeds used in this env's random - number generators. The first value in the list should be the - "main" seed, or the value which a reproducer should pass to - 'seed'. Often, the main seed equals the provided 'seed', but - this won't be true if seed=None, for example. - - TODO: validate this method - """ - self.np_random, seed = seeding.np_random(seed) - return [seed] - - def exit(self): - """ - Clean up and exit the simulation UI, if it exists. - """ - if self.sim_ui: - self.sim_ui.exit() - - -class MultiRobotEnv(VectorEnv): - """Representing a Gymnasium environment for training - multiple self-driving robots with reinforcement learning""" - - def __init__( - self, env_config: EnvSettings = EnvSettings(), - reward_func: Callable[[dict], float] = simple_reward, - debug: bool = False, num_robots: int = 1): - - map_def = env_config.map_pool.map_defs[0] # info: only use first map - action_space, observation_space, orig_obs_space = init_spaces(env_config, map_def) - super(MultiRobotEnv, self).__init__(num_robots, observation_space, action_space) - self.action_space = spaces.Box( - low=np.array([self.single_action_space.low for _ in range(num_robots)]), - high=np.array([self.single_action_space.high for _ in range(num_robots)]), - dtype=self.single_action_space.low.dtype) - - self.reward_func, self.debug = reward_func, debug - self.simulators = init_simulators(env_config, map_def, num_robots, random_start_pos=False) - self.states: List[RobotState] = [] - d_t = env_config.sim_config.time_per_step_in_secs - max_ep_time = env_config.sim_config.sim_time_in_secs - - for sim in self.simulators: - occupancies, sensors = init_collision_and_sensors(sim, env_config, orig_obs_space) - states = [RobotState(nav, occ, sen, d_t, max_ep_time) - for nav, occ, sen in zip(sim.robot_navs, occupancies, sensors)] - self.states.extend(states) - - self.sim_worker_pool = ThreadPool(len(self.simulators)) - self.obs_worker_pool = ThreadPool(num_robots) - - def step(self, actions): - actions = [self.simulators[0].robots[0].parse_action(a) for a in actions] - i = 0 - actions_per_simulator = [] - for sim in self.simulators: - num_robots = len(sim.robots) - actions_per_simulator.append(actions[i:i+num_robots]) - i += num_robots - - self.sim_worker_pool.map( - lambda s_a: s_a[0].step_once(s_a[1]), - zip(self.simulators, actions_per_simulator)) - - obs = self.obs_worker_pool.map(lambda s: s.step(), self.states) - - metas = [state.meta_dict() for state in self.states] - masked_metas = [{ "step": meta["step"], "meta": meta } for meta in metas] - masked_metas = (*masked_metas,) - terms = [state.is_terminal for state in self.states] - rewards = [self.reward_func(meta) for meta in metas] - - for i, (sim, state, term) in enumerate(zip(self.simulators, self.states, terms)): - if term: - sim.reset_state() - obs[i] = state.reset() - - obs = { OBS_DRIVE_STATE: np.array([o[OBS_DRIVE_STATE] for o in obs]), - OBS_RAYS: np.array([o[OBS_RAYS] for o in obs])} - - return obs, rewards, terms, masked_metas - - def reset(self): - self.sim_worker_pool.map(lambda sim: sim.reset_state(), self.simulators) - obs = self.obs_worker_pool.map(lambda s: s.reset(), self.states) - - obs = { OBS_DRIVE_STATE: np.array([o[OBS_DRIVE_STATE] for o in obs]), - OBS_RAYS: np.array([o[OBS_RAYS] for o in obs]) } - return obs - - def render(self, robot_id: int=0): - # TODO: add support for PyGame rendering - pass - - def close_extras(self, **kwargs): - self.sim_worker_pool.close() - self.obs_worker_pool.close() diff --git a/robot_sf/sim/simulator.py b/robot_sf/sim/simulator.py index cb75eba..e711674 100644 --- a/robot_sf/sim/simulator.py +++ b/robot_sf/sim/simulator.py @@ -1,3 +1,4 @@ +from math import ceil from dataclasses import dataclass, field from typing import List, Tuple, Union @@ -6,7 +7,7 @@ from pysocialforce.config import SimulatorConfig as PySFSimConfig from pysocialforce.forces import Force as PySFForce, ObstacleForce -from robot_sf.sim_config import SimulationSettings +from robot_sf.gym_env.env_config import SimulationSettings, EnvSettings from robot_sf.nav.map_config import MapDefinition from robot_sf.ped_npc.ped_population import PedSpawnConfig, populate_simulation from robot_sf.ped_npc.ped_robot_force import PedRobotForce @@ -102,3 +103,49 @@ def step_once(self, actions: List[RobotAction]): for robot, nav, action in zip(self.robots, self.robot_navs, actions): robot.apply_action(action, self.config.time_per_step_in_secs) nav.update_position(robot.pos) + + +def init_simulators( + env_config: EnvSettings, + map_def: MapDefinition, + num_robots: int = 1, + random_start_pos: bool = True + ) -> List[Simulator]: + """ + Initialize simulators for the robot environment. + + Parameters: + env_config (EnvSettings): Configuration settings for the environment. + map_def (MapDefinition): Definition of the map for the environment. + num_robots (int): Number of robots in the environment. + random_start_pos (bool): Whether to start the robots at random positions. + + Returns: + List[Simulator]: A list of initialized Simulator objects. + """ + + # Calculate the number of simulators needed based on the number of robots and start positions + num_sims = ceil(num_robots / map_def.num_start_pos) + + # Calculate the proximity to the goal based on the robot radius and goal radius + goal_proximity = env_config.robot_config.radius + env_config.sim_config.goal_radius + + # Initialize an empty list to hold the simulators + sims: List[Simulator] = [] + + # Create the required number of simulators + for i in range(num_sims): + # Determine the number of robots for this simulator + n = map_def.num_start_pos if i < num_sims - 1 \ + else max(1, num_robots % map_def.num_start_pos) + + # Create the robots for this simulator + sim_robots = [env_config.robot_factory() for _ in range(n)] + + # Create the simulator with the robots and add it to the list + sim = Simulator( + env_config.sim_config, map_def, sim_robots, + goal_proximity, random_start_pos) + sims.append(sim) + + return sims diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 8619dfa..0930d1e 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -1,7 +1,7 @@ import time from stable_baselines3 import PPO from scalene import scalene_profiler -from robot_sf.robot_env import RobotEnv +from robot_sf.gym_env.robot_env import RobotEnv def benchmark(): diff --git a/scripts/debug_random_policy.py b/scripts/debug_random_policy.py index 6ff32af..0938fbe 100644 --- a/scripts/debug_random_policy.py +++ b/scripts/debug_random_policy.py @@ -1,7 +1,7 @@ """ Generate a random policy to test the environment """ -from robot_sf.robot_env import RobotEnv +from robot_sf.gym_env.robot_env import RobotEnv def benchmark(): diff --git a/scripts/debug_trained_policy.py b/scripts/debug_trained_policy.py index 53aa8e1..2f2d6d3 100644 --- a/scripts/debug_trained_policy.py +++ b/scripts/debug_trained_policy.py @@ -1,5 +1,5 @@ from stable_baselines3 import PPO -from robot_sf.robot_env import RobotEnv +from robot_sf.gym_env.robot_env import RobotEnv def training(): diff --git a/scripts/evaluate.py b/scripts/evaluate.py index ff05479..46d35cc 100644 --- a/scripts/evaluate.py +++ b/scripts/evaluate.py @@ -9,7 +9,7 @@ from stable_baselines3 import PPO, A2C from robot_sf.sensor.sensor_fusion import OBS_DRIVE_STATE, OBS_RAYS -from robot_sf.robot_env import RobotEnv, EnvSettings +from robot_sf.gym_env.robot_env import RobotEnv, EnvSettings from robot_sf.eval import EnvMetrics from robot_sf.robot.bicycle_drive import BicycleDriveSettings from robot_sf.robot.differential_drive import DifferentialDriveSettings diff --git a/scripts/hparam_opt.py b/scripts/hparam_opt.py index cc16601..86016b8 100644 --- a/scripts/hparam_opt.py +++ b/scripts/hparam_opt.py @@ -11,8 +11,8 @@ from stable_baselines3.common.vec_env import SubprocVecEnv from stable_baselines3.common.callbacks import CallbackList, BaseCallback -from robot_sf.robot_env import RobotEnv, simple_reward -from robot_sf.sim_config import EnvSettings +from robot_sf.gym_env.robot_env import RobotEnv, simple_reward +from robot_sf.gym_env.env_config import EnvSettings from robot_sf.feature_extractor import DynamicsExtractor from robot_sf.tb_logging import DrivingMetricsCallback, VecEnvMetrics diff --git a/scripts/training_a2c.py b/scripts/training_a2c.py index 214c13b..a100e22 100644 --- a/scripts/training_a2c.py +++ b/scripts/training_a2c.py @@ -1,7 +1,7 @@ from stable_baselines3 import A2C from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import SubprocVecEnv -from robot_sf.robot_env import RobotEnv +from robot_sf.gym_env.robot_env import RobotEnv def training(): diff --git a/scripts/training_ppo.py b/scripts/training_ppo.py index 5321ce1..66d9b8a 100644 --- a/scripts/training_ppo.py +++ b/scripts/training_ppo.py @@ -3,8 +3,8 @@ from stable_baselines3.common.vec_env import SubprocVecEnv from stable_baselines3.common.callbacks import CheckpointCallback, CallbackList -from robot_sf.robot_env import RobotEnv -from robot_sf.sim_config import EnvSettings +from robot_sf.gym_env.robot_env import RobotEnv +from robot_sf.gym_env.env_config import EnvSettings from robot_sf.feature_extractor import DynamicsExtractor from robot_sf.tb_logging import DrivingMetricsCallback diff --git a/tests/env_test.py b/tests/env_test.py index 4b2b7b4..db926e9 100644 --- a/tests/env_test.py +++ b/tests/env_test.py @@ -1,5 +1,6 @@ from gymnasium import spaces -from robot_sf.robot_env import RobotEnv, OBS_DRIVE_STATE, OBS_RAYS +from robot_sf.gym_env.robot_env import RobotEnv +from robot_sf.sensor.sensor_fusion import OBS_RAYS, OBS_DRIVE_STATE def test_can_create_env(): diff --git a/tests/robot_env_simple_reward_test.py b/tests/robot_env_simple_reward_test.py index 0f0c6de..3dd3def 100644 --- a/tests/robot_env_simple_reward_test.py +++ b/tests/robot_env_simple_reward_test.py @@ -1,4 +1,4 @@ -from robot_sf.robot_env import simple_reward +from robot_sf.gym_env.robot_env import simple_reward def test_simple_reward(): meta = { diff --git a/tests/sb3_test.py b/tests/sb3_test.py index b390f0d..52c8fa7 100644 --- a/tests/sb3_test.py +++ b/tests/sb3_test.py @@ -4,7 +4,7 @@ from stable_baselines3.common.env_util import make_vec_env from stable_baselines3.common.vec_env import SubprocVecEnv -from robot_sf.robot_env import RobotEnv +from robot_sf.gym_env.robot_env import RobotEnv from robot_sf.feature_extractor import DynamicsExtractor diff --git a/tests/sim_config_test.py b/tests/sim_config_test.py index 05a310b..7d729c9 100644 --- a/tests/sim_config_test.py +++ b/tests/sim_config_test.py @@ -1,5 +1,5 @@ import pytest -from robot_sf.sim_config import ( +from robot_sf.gym_env.env_config import ( EnvSettings, SimulationSettings, LidarScannerSettings,