diff --git a/carla_birdeye_view/__init__.py b/carla_birdeye_view/__init__.py index 765dfa3..8df17f9 100644 --- a/carla_birdeye_view/__init__.py +++ b/carla_birdeye_view/__init__.py @@ -1,6 +1,7 @@ import carla import logging import numpy as np +import cv2.cv2 as cv from enum import IntEnum, auto, Enum from pathlib import Path @@ -21,15 +22,11 @@ Dimensions, ) -__all__ = ["BirdViewProducer", "DEFAULT_HEIGHT", "DEFAULT_WIDTH"] - LOGGER = logging.getLogger(__name__) -DEFAULT_HEIGHT = 336 # its 84m when density is 4px/m -DEFAULT_WIDTH = 150 # its 37.5m when density is 4px/m -BirdView = np.ndarray # [np.uint8] with shape (level, y, x) -RgbCanvas = np.ndarray # [np.uint8] with shape (y, x, 3) +BirdView = np.ndarray # [np.uint8] with shape (height, width, channel) +RgbCanvas = np.ndarray # [np.uint8] with shape (height, width, 3) class BirdViewCropType(Enum): @@ -37,6 +34,11 @@ class BirdViewCropType(Enum): FRONT_AREA_ONLY = auto() # Like in "Learning by Cheating" +DEFAULT_HEIGHT = 336 # its 84m when density is 4px/m +DEFAULT_WIDTH = 150 # its 37.5m when density is 4px/m +DEFAULT_CROP_TYPE = BirdViewCropType.FRONT_AND_REAR_AREA + + class BirdViewMasks(IntEnum): PEDESTRIANS = 8 RED_LIGHTS = 7 @@ -69,11 +71,6 @@ def bottom_to_top() -> List[int]: BirdViewMasks.ROAD: RGB.DIM_GRAY, } -BIRDVIEW_SHAPE_CHW = (len(RGB_BY_MASK), DEFAULT_HEIGHT, DEFAULT_WIDTH) -BIRDVIEW_SHAPE_HWC = (DEFAULT_HEIGHT, DEFAULT_WIDTH, len(RGB_BY_MASK)) - -import cv2.cv2 as cv2 - def rotate(image, angle, center=None, scale=1.0): assert image.dtype == np.uint8 @@ -88,13 +85,13 @@ def rotate(image, angle, center=None, scale=1.0): center = (w // 2, h // 2) # perform the rotation - M = cv2.getRotationMatrix2D(center, angle, scale) - rotated = cv2.warpAffine( + M = cv.getRotationMatrix2D(center, angle, scale) + rotated = cv.warpAffine( image, M, (w, h), - flags=cv2.INTER_NEAREST, - borderMode=cv2.BORDER_CONSTANT, + flags=cv.INTER_NEAREST, + borderMode=cv.BORDER_CONSTANT, borderValue=0, ) @@ -131,20 +128,27 @@ def __init__( self, client: carla.Client, target_size: PixelDimensions, + render_lanes_on_junctions: bool, pixels_per_meter: int = 4, - crop_type: BirdViewCropType=BirdViewCropType.FRONT_AND_REAR_AREA + crop_type: BirdViewCropType = BirdViewCropType.FRONT_AND_REAR_AREA, ) -> None: self.client = client self.target_size = target_size - self._pixels_per_meter = pixels_per_meter + self.pixels_per_meter = pixels_per_meter self._crop_type = crop_type if crop_type is BirdViewCropType.FRONT_AND_REAR_AREA: - rendering_square_size = round(square_fitting_rect_at_any_rotation(self.target_size)) + rendering_square_size = round( + square_fitting_rect_at_any_rotation(self.target_size) + ) elif crop_type is BirdViewCropType.FRONT_AREA_ONLY: # We must keep rendering size from FRONT_AND_REAR_AREA (in order to avoid rotation issues) - enlarged_size = PixelDimensions(width=target_size.width, height=target_size.height * 2) - rendering_square_size = round(square_fitting_rect_at_any_rotation(enlarged_size)) + enlarged_size = PixelDimensions( + width=target_size.width, height=target_size.height * 2 + ) + rendering_square_size = round( + square_fitting_rect_at_any_rotation(enlarged_size) + ) else: raise NotImplementedError self.rendering_area = PixelDimensions( @@ -153,46 +157,50 @@ def __init__( self._world = client.get_world() self._map = self._world.get_map() self.masks_generator = MapMaskGenerator( - client, pixels_per_meter=pixels_per_meter + client, + pixels_per_meter=pixels_per_meter, + render_lanes_on_junctions=render_lanes_on_junctions, ) cache_path = self.parametrized_cache_path() - if Path(cache_path).is_file(): - LOGGER.info(f"Loading cache from {cache_path}") - with FileLock(f"{cache_path}.lock"): + with FileLock(f"{cache_path}.lock"): + if Path(cache_path).is_file(): + LOGGER.info(f"Loading cache from {cache_path}") static_cache = np.load(cache_path) self.full_road_cache = static_cache[0] self.full_lanes_cache = static_cache[1] self.full_centerlines_cache = static_cache[2] - LOGGER.info(f"Loaded static layers from cache file: {cache_path}") - else: - LOGGER.warning( - f"Cache file does not exist, generating cache at {cache_path}" - ) - self.full_road_cache = self.masks_generator.road_mask() - self.full_lanes_cache = self.masks_generator.lanes_mask() - self.full_centerlines_cache = self.masks_generator.centerlines_mask() - static_cache = np.stack([self.full_road_cache, self.full_lanes_cache, self.full_centerlines_cache]) - with FileLock(f"{cache_path}.lock"): + LOGGER.info(f"Loaded static layers from cache file: {cache_path}") + else: + LOGGER.warning( + f"Cache file does not exist, generating cache at {cache_path}" + ) + self.full_road_cache = self.masks_generator.road_mask() + self.full_lanes_cache = self.masks_generator.lanes_mask() + self.full_centerlines_cache = self.masks_generator.centerlines_mask() + static_cache = np.stack( + [ + self.full_road_cache, + self.full_lanes_cache, + self.full_centerlines_cache, + ] + ) np.save(cache_path, static_cache, allow_pickle=False) - LOGGER.info(f"Saved static layers to cache file: {cache_path}") + LOGGER.info(f"Saved static layers to cache file: {cache_path}") def parametrized_cache_path(self) -> str: - cache_dir = Path("birdview_v2_cache") + cache_dir = Path("birdview_v3_cache") cache_dir.mkdir(parents=True, exist_ok=True) opendrive_content_hash = cache.generate_opendrive_content_hash(self._map) cache_filename = ( f"{self._map.name}__" - f"px_per_meter={self._pixels_per_meter}__" + f"px_per_meter={self.pixels_per_meter}__" f"opendrive_hash={opendrive_content_hash}__" f"margin={mask.MAP_BOUNDARY_MARGIN}.npy" ) return str(cache_dir / cache_filename) - def produce( - self, - agent_vehicle: carla.Actor, - ) -> BirdView: + def produce(self, agent_vehicle: carla.Actor) -> BirdView: all_actors = actors.query_all(world=self._world) segregated_actors = actors.segregate_by_type(actors=all_actors) agent_vehicle_loc = agent_vehicle.get_location() @@ -233,20 +241,21 @@ def produce( self.masks_generator.enable_local_rendering_mode(rendering_window) masks = self._render_actors_masks(agent_vehicle, segregated_actors, masks) cropped_masks = self.apply_agent_following_transformation_to_masks( - agent_vehicle, masks, + agent_vehicle, masks ) ordered_indices = [mask.value for mask in BirdViewMasks.bottom_to_top()] - return cropped_masks[ordered_indices] + return cropped_masks[:, :, ordered_indices] @staticmethod def as_rgb(birdview: BirdView) -> RgbCanvas: - _, h, w = birdview.shape + h, w, d = birdview.shape + assert d == len(BirdViewMasks) rgb_canvas = np.zeros(shape=(h, w, 3), dtype=np.uint8) nonzero_indices = lambda arr: arr == COLOR_ON for mask_type in BirdViewMasks.bottom_to_top(): rgb_color = RGB_BY_MASK[mask_type] - mask = birdview[mask_type] + mask = birdview[:, :, mask_type] # If mask above contains 0, don't overwrite content of canvas (0 indicates transparency) rgb_canvas[nonzero_indices(mask)] = rgb_color return rgb_canvas @@ -279,8 +288,9 @@ def _render_actors_masks( return masks def apply_agent_following_transformation_to_masks( - self, agent_vehicle: carla.Actor, masks: np.ndarray, + self, agent_vehicle: carla.Actor, masks: np.ndarray ) -> np.ndarray: + """Returns image of shape: height, width, channels""" agent_transform = agent_vehicle.get_transform() angle = ( agent_transform.rotation.yaw + 90 @@ -296,13 +306,14 @@ def apply_agent_following_transformation_to_masks( crop_with_car_in_the_center, axes=(1, 2, 0) ) rotated = rotate(crop_with_centered_car, angle, center=rotation_center) - rotated = np.transpose(rotated, axes=(2, 0, 1)) half_width = self.target_size.width // 2 hslice = slice(rotation_center.x - half_width, rotation_center.x + half_width) if self._crop_type is BirdViewCropType.FRONT_AREA_ONLY: - vslice = slice(rotation_center.y - self.target_size.height, rotation_center.y) + vslice = slice( + rotation_center.y - self.target_size.height, rotation_center.y + ) elif self._crop_type is BirdViewCropType.FRONT_AND_REAR_AREA: half_height = self.target_size.height // 2 vslice = slice( @@ -313,5 +324,5 @@ def apply_agent_following_transformation_to_masks( assert ( vslice.start > 0 and hslice.start > 0 ), "Trying to access negative indexes is not allowed, check for calculation errors!" - car_on_the_bottom = rotated[:, vslice, hslice] + car_on_the_bottom = rotated[vslice, hslice] return car_on_the_bottom diff --git a/carla_birdeye_view/__main__.py b/carla_birdeye_view/__main__.py index 767a8a4..4701fdf 100644 --- a/carla_birdeye_view/__main__.py +++ b/carla_birdeye_view/__main__.py @@ -48,6 +48,7 @@ def main(): PixelDimensions(width=DEFAULT_WIDTH, height=DEFAULT_HEIGHT), pixels_per_meter=4, crop_type=BirdViewCropType.FRONT_AND_REAR_AREA, + render_lanes_on_junctions=False, ) stuck_frames_count = 0 diff --git a/carla_birdeye_view/mask.py b/carla_birdeye_view/mask.py index 163bcca..9018325 100644 --- a/carla_birdeye_view/mask.py +++ b/carla_birdeye_view/mask.py @@ -76,7 +76,9 @@ class MapMaskGenerator: to become a regular RGB renderer (just change all `color` arguments to 3-element tuples) """ - def __init__(self, client, pixels_per_meter: int) -> None: + def __init__( + self, client, pixels_per_meter: int, render_lanes_on_junctions: bool + ) -> None: self.client = client self.pixels_per_meter = pixels_per_meter self.rendering_window: Optional[RenderingWindow] = None @@ -88,6 +90,7 @@ def __init__(self, client, pixels_per_meter: int) -> None: self._map_boundaries = self._find_map_boundaries() self._each_road_waypoints = self._generate_road_waypoints() self._mask_size: PixelDimensions = self.calculate_mask_size() + self._render_lanes_on_junctions = render_lanes_on_junctions def _find_map_boundaries(self) -> MapBoundaries: """Find extreme locations on a map. @@ -226,31 +229,32 @@ def road_mask(self) -> Mask: def lanes_mask(self) -> Mask: canvas = self.make_empty_mask() for road_waypoints in self._each_road_waypoints: - # if not road_waypoints[0].is_junction: - # NOTE This block was inside if statement - some junctions may not have proper lane markings drawn - # Left Side - lanes.draw_lane_marking_single_side( - canvas, - road_waypoints, - side=LaneSide.LEFT, - location_to_pixel_func=self.location_to_pixel, - color=COLOR_ON, - ) + if self._render_lanes_on_junctions or not road_waypoints[0].is_junction: + # Left Side + lanes.draw_lane_marking_single_side( + canvas, + road_waypoints, + side=LaneSide.LEFT, + location_to_pixel_func=self.location_to_pixel, + color=COLOR_ON, + ) - # Right Side - lanes.draw_lane_marking_single_side( - canvas, - road_waypoints, - side=LaneSide.RIGHT, - location_to_pixel_func=self.location_to_pixel, - color=COLOR_ON, - ) + # Right Side + lanes.draw_lane_marking_single_side( + canvas, + road_waypoints, + side=LaneSide.RIGHT, + location_to_pixel_func=self.location_to_pixel, + color=COLOR_ON, + ) return canvas def centerlines_mask(self) -> Mask: canvas = self.make_empty_mask() for road_waypoints in self._each_road_waypoints: - polygon = [self.location_to_pixel(wp.transform.location) for wp in road_waypoints] + polygon = [ + self.location_to_pixel(wp.transform.location) for wp in road_waypoints + ] if len(polygon) > 2: polygon = np.array([polygon], dtype=np.int32) cv.polylines(