diff --git a/argoverse/data_loading/argoverse_forecasting_loader.py b/argoverse/data_loading/argoverse_forecasting_loader.py index fb8da5dc..655a6a6d 100644 --- a/argoverse/data_loading/argoverse_forecasting_loader.py +++ b/argoverse/data_loading/argoverse_forecasting_loader.py @@ -77,7 +77,7 @@ def seq_df(self) -> pd.DataFrame: return _read_csv(self.current_seq) @property - def agent_traj(self) -> np.ndarray: + def agent_traj(self) -> Any: """Get the trajectory for the track of type 'AGENT' in the current sequence. Returns: diff --git a/argoverse/data_loading/argoverse_tracking_loader.py b/argoverse/data_loading/argoverse_tracking_loader.py index 7f834f9c..f4e0a158 100644 --- a/argoverse/data_loading/argoverse_tracking_loader.py +++ b/argoverse/data_loading/argoverse_tracking_loader.py @@ -6,6 +6,7 @@ from typing import Dict, Iterator, List, Optional, Union, cast import numpy as np +from numpy.typing import NDArray import argoverse.data_loading.object_label_record as object_label from argoverse.data_loading.object_label_record import ObjectLabelRecord @@ -25,7 +26,7 @@ def __init__(self, root_dir: str) -> None: self.CAMERA_LIST = CAMERA_LIST self._log_list: Optional[List[str]] = None self._image_list: Optional[Dict[str, Dict[str, List[str]]]] = None - self._image_list_sync: Optional[Dict[str, Dict[str, List[np.ndarray]]]] = None + self._image_list_sync: Optional[Dict[str, Dict[str, List[NDArray[np.float64]]]]] = None self._lidar_list: Optional[Dict[str, List[str]]] = None self._image_timestamp_list: Optional[Dict[str, Dict[str, List[int]]]] = None self._timestamp_image_dict: Optional[Dict[str, Dict[str, Dict[int, str]]]] = None @@ -128,7 +129,7 @@ def image_list(self) -> Dict[str, List[str]]: return self._image_list[self.current_log] @property - def image_list_sync(self) -> Dict[str, List[np.ndarray]]: + def image_list_sync(self) -> Dict[str, List[NDArray[np.float64]]]: """return list of image path (str) for all cameras for the current log. The different between image_list and image_list_sync is that image_list_sync @@ -429,7 +430,7 @@ def get_image_at_timestamp( camera: str, log_id: Optional[str] = None, load: bool = True, - ) -> Optional[Union[str, np.ndarray]]: + ) -> Optional[Union[str, NDArray[np.float64]]]: """get image or image path at a specific timestamp Args: @@ -459,7 +460,7 @@ def get_image_at_timestamp( def get_image( self, idx: int, camera: str, log_id: Optional[str] = None, load: bool = True - ) -> Union[str, np.ndarray]: + ) -> Union[str, NDArray[np.float64]]: """get image or image path at a specific index (in image index) Args: @@ -488,7 +489,7 @@ def get_image( def get_image_sync( self, idx: int, camera: str, log_id: Optional[str] = None, load: bool = True - ) -> Union[str, np.ndarray]: + ) -> Union[str, NDArray[np.float64]]: """get image or image path at a specific index (in lidar index) Args: @@ -515,7 +516,7 @@ def get_image_sync( return load_image(image_path) return image_path - def get_lidar(self, idx: int, log_id: Optional[str] = None, load: bool = True) -> Union[str, np.ndarray]: + def get_lidar(self, idx: int, log_id: Optional[str] = None, load: bool = True) -> Union[str, NDArray[np.float64]]: """Get lidar corresponding to frame index idx (in lidar frame). Args: diff --git a/argoverse/data_loading/frame_label_accumulator.py b/argoverse/data_loading/frame_label_accumulator.py index 08b2fb35..4ee5abfc 100644 --- a/argoverse/data_loading/frame_label_accumulator.py +++ b/argoverse/data_loading/frame_label_accumulator.py @@ -10,6 +10,7 @@ from typing import Dict, List, Optional, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.data_loading.frame_record import FrameRecord from argoverse.data_loading.object_label_record import ObjectLabelRecord @@ -64,11 +65,11 @@ def __init__( # coordinate system is the map world frame - self.per_city_traj_dict: Dict[str, List[Tuple[np.ndarray, str]]] = { + self.per_city_traj_dict: Dict[str, List[Tuple[NDArray[np.float64], str]]] = { "MIA": [], "PIT": [], } # all the trajectories for these 2 cities - self.log_egopose_dict: Dict[str, Dict[int, Dict[str, np.ndarray]]] = {} + self.log_egopose_dict: Dict[str, Dict[int, Dict[str, NDArray[np.float64]]]] = {} self.log_timestamp_dict: Dict[str, Dict[int, List[FrameRecord]]] = {} self.sdb = SynchronizationDB(self.dataset_dir) @@ -146,7 +147,7 @@ def get_log_trajectory_labels(self, log_id: str) -> Optional[List[TrajectoryLabe else: return None - def place_trajectory_in_city_frame(self, traj_label: TrajectoryLabel, log_id: str) -> np.ndarray: + def place_trajectory_in_city_frame(self, traj_label: TrajectoryLabel, log_id: str) -> NDArray[np.float64]: """Place trajectory in the city frame Args: traj_label (TrajectoryLabel): instance of the TrajectoryLabel class. @@ -218,8 +219,8 @@ def convert_bbox_to_city_frame( lidar_timestamp_ns: int, dataset_dir: str, log_id: str, - bbox_ego_frame: np.ndarray, - ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + bbox_ego_frame: NDArray[np.float64], + ) -> Tuple[NDArray[np.float64], Dict[str, NDArray[np.float64]]]: """Convert bounding box to city frame. Args: lidar_timestamp_ns (int): Lidar timestamp. diff --git a/argoverse/data_loading/frame_record.py b/argoverse/data_loading/frame_record.py index 1a3b1a59..c6aeee2a 100644 --- a/argoverse/data_loading/frame_record.py +++ b/argoverse/data_loading/frame_record.py @@ -5,6 +5,7 @@ from typing import Tuple import numpy as np +from numpy.typing import NDArray class FrameRecord: @@ -15,8 +16,8 @@ class FrameRecord: def __init__( self, - bbox_city_fr: np.ndarray, - bbox_ego_frame: np.ndarray, + bbox_city_fr: NDArray[np.float64], + bbox_ego_frame: NDArray[np.float64], occlusion_val: int, color: Tuple[float, float, float], track_uuid: str, diff --git a/argoverse/data_loading/object_label_record.py b/argoverse/data_loading/object_label_record.py index c11df8d5..9f83323b 100644 --- a/argoverse/data_loading/object_label_record.py +++ b/argoverse/data_loading/object_label_record.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List, Optional, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.utils.calibration import CameraConfig, proj_cam_to_uv from argoverse.utils.cv2_plotting_utils import add_text_cv2, draw_clipped_line_segment @@ -45,8 +46,8 @@ class ObjectLabelRecord: def __init__( self, - quaternion: np.ndarray, - translation: np.ndarray, + quaternion: NDArray[np.float64], + translation: NDArray[np.float64], length: float, width: float, height: float, @@ -77,7 +78,7 @@ def __init__( self.track_id = track_id self.score = score - def as_2d_bbox(self) -> np.ndarray: + def as_2d_bbox(self) -> Any: """Convert the object cuboid to a 2D bounding box, with vertices provided in the egovehicle's reference frame. Length is x, width is y, and z is height @@ -102,7 +103,7 @@ def as_2d_bbox(self) -> np.ndarray: bbox_in_egovehicle_frame = egovehicle_SE3_object.transform_point_cloud(bbox_object_frame) return bbox_in_egovehicle_frame - def as_3d_bbox(self) -> np.ndarray: + def as_3d_bbox(self) -> Any: r"""Calculate the 8 bounding box corners (returned as points inside the egovehicle's frame). Returns: @@ -126,9 +127,9 @@ def as_3d_bbox(self) -> np.ndarray: The last four are the ones facing backwards. """ # 3D bounding box corners. (Convention: x points forward, y to the left, z up.) - x_corners = self.length / 2 * np.array([1, 1, 1, 1, -1, -1, -1, -1]) - y_corners = self.width / 2 * np.array([1, -1, -1, 1, 1, -1, -1, 1]) - z_corners = self.height / 2 * np.array([1, 1, -1, -1, 1, 1, -1, -1]) + x_corners: NDArray[np.float64] = self.length / 2 * np.array([1, 1, 1, 1, -1, -1, -1, -1]) + y_corners: NDArray[np.float64] = self.width / 2 * np.array([1, -1, -1, 1, 1, -1, -1, 1]) + z_corners: NDArray[np.float64] = self.height / 2 * np.array([1, 1, -1, -1, 1, 1, -1, -1]) corners_object_frame = np.vstack((x_corners, y_corners, z_corners)).T egovehicle_SE3_object = SE3(rotation=quat2rotmat(self.quaternion), translation=self.translation) @@ -137,9 +138,13 @@ def as_3d_bbox(self) -> np.ndarray: def render_clip_frustum_cv2( self, - img: np.ndarray, - corners: np.ndarray, - planes: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]], + img: NDArray[np.float64], + corners: NDArray[np.float64], + planes: List[ + Tuple[ + NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64] + ] + ], camera_config: CameraConfig, colors: Tuple[Tuple[int, int, int], Tuple[int, int, int], Tuple[int, int, int]] = ( BLUE_RGB, @@ -147,7 +152,7 @@ def render_clip_frustum_cv2( GREEN_RGB, ), linewidth: int = 2, - ) -> np.ndarray: + ) -> NDArray[np.float64]: r"""We bring the 3D points into each camera, and do the clipping there. Renders box using OpenCV2. Edge coloring and vertex ordering is roughly based on @@ -182,7 +187,7 @@ def render_clip_frustum_cv2( img: Numpy array of shape (M,N,3), representing updated image """ - def draw_rect(selected_corners: np.ndarray, color: Tuple[int, int, int]) -> None: + def draw_rect(selected_corners: NDArray[np.float64], color: Tuple[int, int, int]) -> None: prev = selected_corners[-1] for corner in selected_corners: draw_clipped_line_segment( @@ -241,13 +246,13 @@ def draw_rect(selected_corners: np.ndarray, color: Tuple[int, int, int]) -> None return img -def uv_coord_is_valid(uv: np.ndarray, img: np.ndarray) -> bool: +def uv_coord_is_valid(uv: NDArray[np.float64], img: NDArray[np.float64]) -> bool: """Check if 2d-point lies within 3-channel color image boundaries""" h, w, _ = img.shape return bool(uv[0] >= 0 and uv[1] >= 0 and uv[0] < w and uv[1] < h) -def label_is_closeby(box_point: np.ndarray) -> bool: +def label_is_closeby(box_point: NDArray[np.float64]) -> bool: """Check if 3d cuboid pt (in egovehicle frame) is within range from egovehicle to prevent plot overcrowding. """ @@ -255,12 +260,12 @@ def label_is_closeby(box_point: np.ndarray) -> bool: def draw_alpha_rectangle( - img: np.ndarray, + img: NDArray[np.float64], top_left: Tuple[int, int], bottom_right: Tuple[int, int], color_rgb: Tuple[int, int, int], alpha: float, -) -> np.ndarray: +) -> NDArray[np.float64]: """Alpha blend colored rectangle into image. Corner coords given as (x,y) tuples""" img_h, img_w, _ = img.shape mask = np.zeros((img_h, img_w), dtype=np.uint8) @@ -268,7 +273,7 @@ def draw_alpha_rectangle( return vis_mask(img, mask, np.array(list(color_rgb[::-1])), alpha) -def form_obj_label_from_json(label: Dict[str, Any]) -> Tuple[np.ndarray, str]: +def form_obj_label_from_json(label: Dict[str, Any]) -> Tuple[NDArray[np.float64], str]: """Construct object from loaded json. The dictionary loaded from saved json file is expected to have the diff --git a/argoverse/data_loading/stereo_dataloader.py b/argoverse/data_loading/stereo_dataloader.py index 09482864..b8fce3c1 100644 --- a/argoverse/data_loading/stereo_dataloader.py +++ b/argoverse/data_loading/stereo_dataloader.py @@ -5,6 +5,7 @@ import cv2 import numpy as np +from numpy.typing import NDArray from argoverse.utils.json_utils import read_json_file @@ -76,7 +77,7 @@ def get_ordered_log_disparity_map_fpaths(self, log_id: str, disparity_name: str) return disparity_map_fpaths - def get_rectified_stereo_image(self, stereo_img_path: str) -> np.ndarray: + def get_rectified_stereo_image(self, stereo_img_path: str) -> Any: """Get the rectified stereo image. Args: @@ -87,7 +88,7 @@ def get_rectified_stereo_image(self, stereo_img_path: str) -> np.ndarray: """ return cv2.cvtColor(cv2.imread(stereo_img_path), cv2.COLOR_BGR2RGB) - def get_disparity_map(self, disparity_map_path: str) -> np.ndarray: + def get_disparity_map(self, disparity_map_path: str) -> NDArray[np.float64]: """Get the disparity map. The disparity maps are saved as uint16 PNG images. A zero-value ("0") indicates that no ground truth exists diff --git a/argoverse/data_loading/synchronization_database.py b/argoverse/data_loading/synchronization_database.py index 34b5889b..a081fe30 100644 --- a/argoverse/data_loading/synchronization_database.py +++ b/argoverse/data_loading/synchronization_database.py @@ -8,6 +8,7 @@ from typing import Dict, Iterable, Optional, Tuple, cast import numpy as np +from numpy.typing import NDArray from typing_extensions import Final from argoverse.sensor_dataset_config import ArgoverseConfig @@ -38,7 +39,7 @@ LIDAR_SWEEP_INTERVAL_W_BUFFER_MS = LIDAR_SWEEP_INTERVAL_MS + ALLOWED_TIMESTAMP_BUFFER_MS -def get_timestamps_from_sensor_folder(sensor_folder_wildcard: str) -> np.ndarray: +def get_timestamps_from_sensor_folder(sensor_folder_wildcard: str) -> NDArray[np.float64]: """Timestamp always lies at end of filename Args: @@ -55,7 +56,7 @@ def get_timestamps_from_sensor_folder(sensor_folder_wildcard: str) -> np.ndarray return np.array([int(Path(jpg_fpath).stem.split("_")[-1]) for jpg_fpath in path_generator]) -def find_closest_integer_in_ref_arr(query_int: int, ref_arr: np.ndarray) -> Tuple[int, int]: +def find_closest_integer_in_ref_arr(query_int: int, ref_arr: NDArray[np.float64]) -> Tuple[int, int]: """ Find the closest integer to any integer inside a reference array, and the corresponding difference. @@ -121,8 +122,8 @@ def __init__(self, dataset_dir: str, collect_single_log_id: Optional[str] = None else: log_fpaths = [f"{dataset_dir}/{collect_single_log_id}"] - self.per_log_camtimestamps_index: Dict[str, Dict[str, np.ndarray]] = {} - self.per_log_lidartimestamps_index: Dict[str, np.ndarray] = {} + self.per_log_camtimestamps_index: Dict[str, Dict[str, NDArray[np.float64]]] = {} + self.per_log_lidartimestamps_index: Dict[str, NDArray[np.float64]] = {} for log_fpath in log_fpaths: log_id = Path(log_fpath).name diff --git a/argoverse/data_loading/trajectory_loader.py b/argoverse/data_loading/trajectory_loader.py index cc156644..451133e7 100644 --- a/argoverse/data_loading/trajectory_loader.py +++ b/argoverse/data_loading/trajectory_loader.py @@ -10,6 +10,7 @@ from typing import List, NamedTuple import numpy as np +from numpy.typing import NDArray from argoverse.data_loading.object_classes import OBJ_CLASS_MAPPING_DICT @@ -36,20 +37,20 @@ class TrajectoryLabel(NamedTuple): heights (np.array): Array of heights for trajectory. """ - timestamps: np.ndarray - quaternions: np.ndarray - translations: np.ndarray + timestamps: NDArray[np.float64] + quaternions: NDArray[np.float64] + translations: NDArray[np.float64] obj_class: int obj_class_str: str - occlusion: np.ndarray + occlusion: NDArray[np.float64] track_uuid: str log_id: str max_length: float max_width: float max_height: float - lengths: np.ndarray - widths: np.ndarray - heights: np.ndarray + lengths: NDArray[np.float64] + widths: NDArray[np.float64] + heights: NDArray[np.float64] def load_json_track_labels(log_track_labels_dir: str) -> List[TrajectoryLabel]: diff --git a/argoverse/data_loading/vector_map_loader.py b/argoverse/data_loading/vector_map_loader.py index f25f8750..39cc46cb 100644 --- a/argoverse/data_loading/vector_map_loader.py +++ b/argoverse/data_loading/vector_map_loader.py @@ -45,6 +45,7 @@ from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Tuple, Union, cast import numpy as np +from numpy.typing import NDArray from argoverse.map_representation.lane_segment import LaneSegment @@ -189,7 +190,7 @@ def get_lane_identifier(child: ET.Element) -> int: return int(child.attrib["lane_id"]) -def convert_node_id_list_to_xy(node_id_list: List[int], all_graph_nodes: Mapping[int, Node]) -> np.ndarray: +def convert_node_id_list_to_xy(node_id_list: List[int], all_graph_nodes: Mapping[int, Node]) -> NDArray[np.float64]: """ convert node id list to centerline xy coordinate diff --git a/argoverse/evaluation/competition_util.py b/argoverse/evaluation/competition_util.py index f88d47a1..76cca230 100644 --- a/argoverse/evaluation/competition_util.py +++ b/argoverse/evaluation/competition_util.py @@ -6,10 +6,11 @@ import shutil import tempfile import uuid -from typing import Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import h5py import numpy as np +from numpy.typing import NDArray from scipy.spatial import ConvexHull from shapely.geometry import Polygon from sklearn.cluster import DBSCAN @@ -19,7 +20,7 @@ from argoverse.utils.se3 import SE3 from argoverse.utils.transform import yaw_to_quaternion3d -TYPE_LIST = Union[List[np.ndarray], np.ndarray] +TYPE_LIST = Union[List[NDArray[np.float64]], NDArray[np.float64]] def generate_forecasting_h5( @@ -47,7 +48,7 @@ def generate_forecasting_h5( os.makedirs(output_path) hf = h5py.File(os.path.join(output_path, filename + ".h5"), "w") future_frames = 30 - d_all: List[np.ndarray] = [] + d_all: List[NDArray[np.float64]] = [] counter = 0 for key, value in data.items(): print("\r" + str(counter + 1) + "/" + str(len(data)), end="") @@ -121,7 +122,7 @@ def generate_tracking_zip(input_path: str, output_path: str, filename: str = "ar shutil.rmtree(dirpath) -def get_polygon_from_points(points: np.ndarray) -> Polygon: +def get_polygon_from_points(points: NDArray[np.float64]) -> Polygon: """ function to generate (convex hull) shapely polygon from set of points @@ -145,7 +146,7 @@ def get_polygon_from_points(points: np.ndarray) -> Polygon: return Polygon(poly) -def get_rotated_bbox_from_points(points: np.ndarray) -> Polygon: +def get_rotated_bbox_from_points(points: NDArray[np.float64]) -> Polygon: """ function to generate mininum_rotated_rectangle from list of point coordinate @@ -229,7 +230,9 @@ def poly_to_label(poly: Polygon, category: str = "VEHICLE", track_id: str = "") ) -def get_objects(clustering: DBSCAN, pts: np.ndarray, category: str = "VEHICLE") -> List[Tuple[np.ndarray, uuid.UUID]]: +def get_objects( + clustering: DBSCAN, pts: NDArray[np.float64], category: str = "VEHICLE" +) -> List[Tuple[NDArray[np.float64], uuid.UUID]]: core_samples_mask = np.zeros_like(clustering.labels_, dtype=bool) core_samples_mask[clustering.core_sample_indices_] = True @@ -300,7 +303,7 @@ def save_label(argoverse_data: ArgoverseTrackingLoader, labels: List[ObjectLabel json.dump(labels_json_data, json_file) -def transform_xyz(xyz: np.ndarray, pose1: SE3, pose2: SE3) -> np.ndarray: +def transform_xyz(xyz: NDArray[np.float64], pose1: SE3, pose2: SE3) -> Any: # transform xyz from pose1 to pose2 # convert to city coordinate diff --git a/argoverse/evaluation/detection/eval.py b/argoverse/evaluation/detection/eval.py index 50c1bc8c..c4e51a72 100644 --- a/argoverse/evaluation/detection/eval.py +++ b/argoverse/evaluation/detection/eval.py @@ -52,7 +52,7 @@ Results: The results are represented as a (C + 1, P) table, where C + 1 represents the number of evaluation classes - in addition to the mean statistics average across all classes, and P refers to the number of included statistics, + in addition to the mean statistics average across all classes, and P refers to the number of included statistics, e.g. AP, ATE, ASE, AOE, CDS by default. Note: The `evaluate` function will use all available logical cores on the machine. @@ -68,6 +68,7 @@ import numpy as np import pandas as pd +from numpy.typing import NDArray from tqdm.contrib.concurrent import process_map from argoverse.evaluation.detection.constants import N_TP_ERRORS, SIGNIFICANT_DIGITS, STATISTIC_NAMES @@ -117,7 +118,7 @@ def evaluate(self) -> pd.DataFrame: gt_fpaths = list(self.gt_root_fpath.glob("*/per_sweep_annotations_amodal/*.json")) assert len(dt_fpaths) == len(gt_fpaths) - data: DefaultDict[str, List[np.ndarray]] = defaultdict(list) + data: DefaultDict[str, List[NDArray[np.float64]]] = defaultdict(list) cls_to_ninst: DefaultDict[str, int] = defaultdict(int) jobs = [AccumulateJob(self.dt_root_fpath, gt_fpath, self.cfg, self.avm) for gt_fpath in gt_fpaths] @@ -149,7 +150,7 @@ def evaluate(self) -> pd.DataFrame: return summary def summarize( - self, data: Dict[str, np.ndarray], cls_to_ninst: DefaultDict[str, int] + self, data: Dict[str, NDArray[np.float64]], cls_to_ninst: DefaultDict[str, int] ) -> DefaultDict[str, List[float]]: """Calculate and print the detection metrics. diff --git a/argoverse/evaluation/detection/utils.py b/argoverse/evaluation/detection/utils.py index 494a9332..a39b3fc2 100644 --- a/argoverse/evaluation/detection/utils.py +++ b/argoverse/evaluation/detection/utils.py @@ -16,11 +16,12 @@ from dataclasses import dataclass from enum import Enum, auto from pathlib import Path -from typing import DefaultDict, List, NamedTuple, Optional, Tuple, Union +from typing import Any, DefaultDict, List, NamedTuple, Optional, Tuple, Union import matplotlib import numpy as np import pandas as pd +from numpy.typing import NDArray from scipy.spatial.distance import cdist from scipy.spatial.transform import Rotation as R @@ -94,8 +95,10 @@ class DetectionCfg(NamedTuple): dt_metric: FilterMetric = FilterMetric.EUCLIDEAN max_dt_range: float = 100.0 # Meters save_figs: bool = False - tp_normalization_terms: np.ndarray = np.array([tp_thresh, MAX_SCALE_ERROR, MAX_YAW_ERROR]) - summary_default_vals: np.ndarray = np.array([MIN_AP, tp_thresh, MAX_NORMALIZED_ASE, MAX_NORMALIZED_AOE, MIN_CDS]) + tp_normalization_terms: NDArray[np.float64] = np.array([tp_thresh, MAX_SCALE_ERROR, MAX_YAW_ERROR]) + summary_default_vals: NDArray[np.float64] = np.array( + [MIN_AP, tp_thresh, MAX_NORMALIZED_ASE, MAX_NORMALIZED_AOE, MIN_CDS] + ) eval_only_roi_instances: bool = True map_root: _PathLike = Path(__file__).parent.parent.parent.parent / "map_files" # argoverse-api/map_files @@ -117,7 +120,7 @@ class AccumulateJob: avm: Optional[ArgoverseMap] -def accumulate(job: AccumulateJob) -> Tuple[DefaultDict[str, np.ndarray], DefaultDict[str, int]]: +def accumulate(job: AccumulateJob) -> Tuple[DefaultDict[str, NDArray[np.float64]], DefaultDict[str, int]]: """Accumulate the true/false positives (boolean flags) and true positive errors for each class. Args: @@ -182,7 +185,7 @@ def accumulate(job: AccumulateJob) -> Tuple[DefaultDict[str, np.ndarray], Defaul return cls_to_accum, cls_to_ninst -def remove_duplicate_instances(instances: np.ndarray, cfg: DetectionCfg) -> np.ndarray: +def remove_duplicate_instances(instances: NDArray[np.float64], cfg: DetectionCfg) -> Any: """Remove any duplicate cuboids in ground truth. Any ground truth cuboid of the same object class that shares the same centroid @@ -216,7 +219,7 @@ def remove_duplicate_instances(instances: np.ndarray, cfg: DetectionCfg) -> np.n return instances[unique_ids] -def assign(dts: np.ndarray, gts: np.ndarray, cfg: DetectionCfg) -> np.ndarray: +def assign(dts: NDArray[np.float64], gts: NDArray[np.float64], cfg: DetectionCfg) -> NDArray[np.float64]: """Attempt assignment of each detection to a ground truth label. Args: @@ -279,8 +282,8 @@ def assign(dts: np.ndarray, gts: np.ndarray, cfg: DetectionCfg) -> np.ndarray: def filter_objs_to_roi( - instances: np.ndarray, avm: ArgoverseMap, city_SE3_egovehicle: SE3, city_name: str -) -> np.ndarray: + instances: NDArray[np.float64], avm: ArgoverseMap, city_SE3_egovehicle: SE3, city_name: str +) -> Any: """Filter objects to the region of interest (5 meter dilation of driveable area). We ignore instances outside of region of interest (ROI) during evaluation. @@ -309,7 +312,7 @@ def filter_instances( target_class_name: str, filter_metric: FilterMetric, max_detection_range: float, -) -> np.ndarray: +) -> NDArray[np.float64]: """Filter object instances based on a set of conditions (class name and distance from egovehicle). Args: @@ -336,7 +339,7 @@ def filter_instances( return filtered_instances -def rank(dts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def rank(dts: NDArray[np.float64]) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Rank the detections in descending order according to score (detector confidence). @@ -357,7 +360,7 @@ def rank(dts: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: return ranked_dts[:MAX_NUM_BOXES], ranked_scores[:MAX_NUM_BOXES] -def interp(prec: np.ndarray, method: InterpType = InterpType.ALL) -> np.ndarray: +def interp(prec: NDArray[np.float64], method: InterpType = InterpType.ALL) -> Any: """Interpolate the precision over all recall levels. See equation 2 in http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.167.6629&rep=rep1&type=pdf for more information. @@ -376,9 +379,7 @@ def interp(prec: np.ndarray, method: InterpType = InterpType.ALL) -> np.ndarray: return prec_interp -def compute_affinity_matrix( - dts: List[ObjectLabelRecord], gts: List[ObjectLabelRecord], metric: AffFnType -) -> np.ndarray: +def compute_affinity_matrix(dts: List[ObjectLabelRecord], gts: List[ObjectLabelRecord], metric: AffFnType) -> Any: """Calculate the affinity matrix between detections and ground truth labels, using a specified affinity function type. @@ -399,7 +400,9 @@ def compute_affinity_matrix( return sims -def calc_ap(gt_ranked: np.ndarray, recalls_interp: np.ndarray, ninst: int) -> Tuple[float, np.ndarray]: +def calc_ap( + gt_ranked: NDArray[np.float64], recalls_interp: NDArray[np.float64], ninst: int +) -> Tuple[float, NDArray[np.float64]]: """Compute precision and recall, interpolated over n fixed recall points. Args: @@ -414,17 +417,17 @@ def calc_ap(gt_ranked: np.ndarray, recalls_interp: np.ndarray, ninst: int) -> Tu cumulative_tp = np.cumsum(tp, dtype=int) cumulative_fp = np.cumsum(~tp, dtype=int) - cumulative_fn = ninst - cumulative_tp + cumulative_fn: int = ninst - cumulative_tp precisions = cumulative_tp / (cumulative_tp + cumulative_fp + np.finfo(float).eps) - recalls = cumulative_tp / (cumulative_tp + cumulative_fn) + recalls: int = cumulative_tp / (cumulative_tp + cumulative_fn) precisions = interp(precisions) precisions_interp = np.interp(recalls_interp, recalls, precisions, right=0) avg_precision = precisions_interp.mean() return avg_precision, precisions_interp -def dist_fn(dts: pd.DataFrame, gts: pd.DataFrame, metric: DistFnType) -> np.ndarray: +def dist_fn(dts: pd.DataFrame, gts: pd.DataFrame, metric: DistFnType) -> Any: """Distance functions between detections and ground truth. Args: @@ -459,7 +462,7 @@ def dist_fn(dts: pd.DataFrame, gts: pd.DataFrame, metric: DistFnType) -> np.ndar raise NotImplementedError("This distance metric is not implemented!") -def iou_aligned_3d(dt_dims: pd.DataFrame, gt_dims: pd.DataFrame) -> np.ndarray: +def iou_aligned_3d(dt_dims: pd.DataFrame, gt_dims: pd.DataFrame) -> Any: """Calculate the 3d, axis-aligned (vertical axis alignment) intersection-over-union (IoU) between the detections and the ground truth labels. Both objects are aligned to their +x axis and their centroids are placed at the origin before computation of the IoU. @@ -478,7 +481,7 @@ def iou_aligned_3d(dt_dims: pd.DataFrame, gt_dims: pd.DataFrame) -> np.ndarray: return (inter / union).values -def wrap_angle(angles: np.ndarray, period: float = np.pi) -> np.ndarray: +def wrap_angle(angles: NDArray[np.float64], period: float = np.pi) -> NDArray[np.float64]: """Map angles (in radians) from domain [-∞, ∞] to [0, π). This function is the inverse of `np.unwrap`. @@ -501,7 +504,7 @@ def wrap_angle(angles: np.ndarray, period: float = np.pi) -> np.ndarray: return angles -def plot(rec_interp: np.ndarray, prec_interp: np.ndarray, cls_name: str, figs_fpath: Path) -> Path: +def plot(rec_interp: NDArray[np.float64], prec_interp: NDArray[np.float64], cls_name: str, figs_fpath: Path) -> Path: """Plot and save the precision recall curve. Args: diff --git a/argoverse/evaluation/eval_forecasting.py b/argoverse/evaluation/eval_forecasting.py index 4aed8ff1..1292139b 100644 --- a/argoverse/evaluation/eval_forecasting.py +++ b/argoverse/evaluation/eval_forecasting.py @@ -6,13 +6,14 @@ from typing import Dict, List, Optional import numpy as np +from numpy.typing import NDArray from argoverse.map_representation.map_api import ArgoverseMap LOW_PROB_THRESHOLD_FOR_METRICS = 0.05 -def get_ade(forecasted_trajectory: np.ndarray, gt_trajectory: np.ndarray) -> float: +def get_ade(forecasted_trajectory: NDArray[np.float64], gt_trajectory: NDArray[np.float64]) -> float: """Compute Average Displacement Error. Args: @@ -37,7 +38,7 @@ def get_ade(forecasted_trajectory: np.ndarray, gt_trajectory: np.ndarray) -> flo return ade -def get_fde(forecasted_trajectory: np.ndarray, gt_trajectory: np.ndarray) -> float: +def get_fde(forecasted_trajectory: NDArray[np.float64], gt_trajectory: NDArray[np.float64]) -> float: """Compute Final Displacement Error. Args: @@ -56,8 +57,8 @@ def get_fde(forecasted_trajectory: np.ndarray, gt_trajectory: np.ndarray) -> flo def get_displacement_errors_and_miss_rate( - forecasted_trajectories: Dict[int, List[np.ndarray]], - gt_trajectories: Dict[int, np.ndarray], + forecasted_trajectories: Dict[int, List[NDArray[np.float64]]], + gt_trajectories: Dict[int, NDArray[np.float64]], max_guesses: int, horizon: int, miss_threshold: float, @@ -147,7 +148,7 @@ def get_displacement_errors_and_miss_rate( def get_drivable_area_compliance( - forecasted_trajectories: Dict[int, List[np.ndarray]], + forecasted_trajectories: Dict[int, List[NDArray[np.float64]]], city_names: Dict[int, str], max_n_guesses: int, ) -> float: @@ -182,8 +183,8 @@ def get_drivable_area_compliance( def compute_forecasting_metrics( - forecasted_trajectories: Dict[int, List[np.ndarray]], - gt_trajectories: Dict[int, np.ndarray], + forecasted_trajectories: Dict[int, List[NDArray[np.float64]]], + gt_trajectories: Dict[int, NDArray[np.float64]], city_names: Dict[int, str], max_n_guesses: int, horizon: int, diff --git a/argoverse/evaluation/eval_tracking.py b/argoverse/evaluation/eval_tracking.py index 53911cf3..8b6c14f7 100644 --- a/argoverse/evaluation/eval_tracking.py +++ b/argoverse/evaluation/eval_tracking.py @@ -10,6 +10,7 @@ import motmetrics as mm import numpy as np +from numpy.typing import NDArray from shapely.geometry.polygon import Polygon from argoverse.evaluation.detection.utils import wrap_angle @@ -31,7 +32,9 @@ """ -def in_distance_range_pose(ego_center: np.ndarray, pose: np.ndarray, d_min: float, d_max: float) -> bool: +def in_distance_range_pose( + ego_center: NDArray[np.float64], pose: NDArray[np.float64], d_min: float, d_max: float +) -> bool: """Determine whether a pose in the ego-vehicle frame falls within a specified distance range of the egovehicle's origin. @@ -55,7 +58,9 @@ def iou_polygon(poly1: Polygon, poly2: Polygon) -> float: return float(1 - inter / union) -def get_distance_iou_3d(x1: Dict[str, np.ndarray], x2: Dict[str, np.ndarray], name: str = "bbox") -> float: +def get_distance_iou_3d( + x1: Dict[str, NDArray[np.float64]], x2: Dict[str, NDArray[np.float64]], name: str = "bbox" +) -> float: """ Note this is not traditional 2d or 3d iou, but rather we align two cuboids along their x-axes, and compare 3d volume differences. @@ -78,7 +83,7 @@ def get_distance_iou_3d(x1: Dict[str, np.ndarray], x2: Dict[str, np.ndarray], na return float(score) -def get_distance(x1: Dict[str, np.ndarray], x2: Dict[str, np.ndarray], name: str) -> float: +def get_distance(x1: Dict[str, NDArray[np.float64]], x2: Dict[str, NDArray[np.float64]], name: str) -> float: """Get the distance between two poses, returns nan if distance is larger than detection threshold. Args: diff --git a/argoverse/evaluation/eval_utils.py b/argoverse/evaluation/eval_utils.py index 2bd06ed4..f2610a6a 100644 --- a/argoverse/evaluation/eval_utils.py +++ b/argoverse/evaluation/eval_utils.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.utils.transform import quat2rotmat @@ -13,7 +14,7 @@ _LabelType = Dict[str, Any] -def get_pc_inside_bbox(pc_raw: np.ndarray, bbox: np.ndarray) -> np.ndarray: +def get_pc_inside_bbox(pc_raw: NDArray[np.float64], bbox: NDArray[np.float64]) -> Any: """Get part of raw point cloud inside a given bounding box. Args: @@ -77,7 +78,7 @@ def get_pc_inside_bbox(pc_raw: np.ndarray, bbox: np.ndarray) -> np.ndarray: return pc_raw[flag[0, :]] -def label_to_bbox(label: _LabelType) -> Tuple[np.ndarray, float]: +def label_to_bbox(label: _LabelType) -> Tuple[NDArray[np.float64], float]: """Convert a label into a parameterized bounding box that lives in the ego-vehicle coordinate frame. @@ -116,7 +117,9 @@ def label_to_bbox(label: _LabelType) -> Tuple[np.ndarray, float]: return transform_bounding_box_3d(bbox, R, t), orientation -def transform_bounding_box_3d(bbox: np.ndarray, R: np.ndarray, t: np.ndarray) -> List[np.ndarray]: +def transform_bounding_box_3d( + bbox: NDArray[np.float64], R: NDArray[np.float64], t: NDArray[np.float64] +) -> List[NDArray[np.float64]]: """Transform bounding box with rotation and translation. Args: @@ -136,7 +139,7 @@ def transform_bounding_box_3d(bbox: np.ndarray, R: np.ndarray, t: np.ndarray) -> return [p0, p1, p2, bbox[3]] -def in_between_matrix(x: np.ndarray, v1: np.ndarray, v2: np.ndarray) -> np.ndarray: +def in_between_matrix(x: NDArray[np.float64], v1: NDArray[np.float64], v2: NDArray[np.float64]) -> Any: """Element-by-element check to see if x_ij is between v1_ij and v2_ij, without knowing v1 > v2 order. Args: diff --git a/argoverse/evaluation/stereo/utils.py b/argoverse/evaluation/stereo/utils.py index b6d1b39d..8e45c04e 100644 --- a/argoverse/evaluation/stereo/utils.py +++ b/argoverse/evaluation/stereo/utils.py @@ -9,6 +9,7 @@ import numpy as np import pandas as pd from numba import njit +from numpy.typing import NDArray from argoverse.evaluation.stereo.constants import ( DEFAULT_ABS_ERROR_THRESHOLDS, @@ -129,7 +130,7 @@ def accumulate_stereo_metrics(abs_error_thresholds: List[int]) -> pd.DataFrame: @njit(nogil=True) # type: ignore -def interpolate_disparity(disparity: np.ndarray) -> np.ndarray: +def interpolate_disparity(disparity: NDArray[np.float64]) -> NDArray[np.float64]: """Interpolate disparity image to inpaint holes. The predicted disparity map might contain holes which need to be interpolated for a dense result. @@ -202,8 +203,8 @@ def interpolate_disparity(disparity: np.ndarray) -> np.ndarray: def write_disparity_error_image( - pred_disparity: np.ndarray, - gt_disparity: np.ndarray, + pred_disparity: NDArray[np.float64], + gt_disparity: NDArray[np.float64], timestamp: int, write_dir_path: Path, abs_error_thresholds: List[int] = DEFAULT_ABS_ERROR_THRESHOLDS, @@ -233,11 +234,11 @@ def write_disparity_error_image( def compute_disparity_error_image( - pred_disparity: np.ndarray, - gt_disparity: np.ndarray, + pred_disparity: NDArray[np.float64], + gt_disparity: NDArray[np.float64], abs_error_thresholds: List[int] = DEFAULT_ABS_ERROR_THRESHOLDS, rel_error_thresholds: List[float] = DEFAULT_REL_ERROR_THRESHOLDS, -) -> np.ndarray: +) -> NDArray[np.float64]: """Compute the disparity error image as in the KITTI Stereo 2015 benchmark. The disparity error map uses a log colormap depicting correct estimates in blue and wrong estimates in red color tones. We define correct disparity estimates when the absolute disparity error is less than 10 pixels and the diff --git a/argoverse/map_representation/lane_segment.py b/argoverse/map_representation/lane_segment.py index ea0c652c..7d9ce013 100644 --- a/argoverse/map_representation/lane_segment.py +++ b/argoverse/map_representation/lane_segment.py @@ -2,6 +2,7 @@ from typing import List, Optional import numpy as np +from numpy.typing import NDArray class LaneSegment: @@ -15,7 +16,7 @@ def __init__( r_neighbor_id: Optional[int], predecessors: List[int], successors: Optional[List[int]], - centerline: np.ndarray, + centerline: NDArray[np.float64], ) -> None: """Initialize the lane segment. diff --git a/argoverse/map_representation/map_api.py b/argoverse/map_representation/map_api.py index 3524ec5b..823d6407 100644 --- a/argoverse/map_representation/map_api.py +++ b/argoverse/map_representation/map_api.py @@ -7,6 +7,7 @@ import matplotlib.pyplot as plt import numpy as np +from numpy.typing import NDArray from shapely.geometry import LineString from argoverse.data_loading.vector_map_loader import load_lane_segments_from_xml @@ -72,10 +73,10 @@ def __init__(self, root: _PathLike = ROOT) -> None: self.city_rasterized_ground_height_dict = self.build_city_ground_height_index() # get hallucinated lane extends and driveable area from binary img - self.city_to_lane_polygons_dict: Mapping[str, np.ndarray] = {} - self.city_to_driveable_areas_dict: Mapping[str, np.ndarray] = {} - self.city_to_lane_bboxes_dict: Mapping[str, np.ndarray] = {} - self.city_to_da_bboxes_dict: Mapping[str, np.ndarray] = {} + self.city_to_lane_polygons_dict: Mapping[str, NDArray[np.float64]] = {} + self.city_to_driveable_areas_dict: Mapping[str, NDArray[np.float64]] = {} + self.city_to_lane_bboxes_dict: Mapping[str, NDArray[np.float64]] = {} + self.city_to_da_bboxes_dict: Mapping[str, NDArray[np.float64]] = {} for city_name in self.city_name_to_city_id_dict.keys(): lane_polygons = np.array(self.get_vector_map_lane_polygons(city_name), dtype=object) @@ -94,7 +95,7 @@ def map_files_root(self) -> Path: raise ValueError("Map root directory cannot be None!") return Path(self.root).resolve() - def get_vector_map_lane_polygons(self, city_name: str) -> List[np.ndarray]: + def get_vector_map_lane_polygons(self, city_name: str) -> List[NDArray[np.float64]]: """ Get list of lane polygons for a specified city @@ -112,7 +113,7 @@ def get_vector_map_lane_polygons(self, city_name: str) -> List[np.ndarray]: return lane_polygons - def get_vector_map_driveable_areas(self, city_name: str) -> List[np.ndarray]: + def get_vector_map_driveable_areas(self, city_name: str) -> List[NDArray[np.float64]]: """ Get driveable area for a specified city @@ -127,7 +128,7 @@ def get_vector_map_driveable_areas(self, city_name: str) -> List[np.ndarray]: """ return self.get_da_contours(city_name) - def get_da_contours(self, city_name: str) -> List[np.ndarray]: + def get_da_contours(self, city_name: str) -> List[NDArray[np.float64]]: """ We threshold the binary driveable area or ROI image and obtain contour lines. These contour lines represent the boundary. @@ -148,7 +149,7 @@ def get_da_contours(self, city_name: str) -> List[np.ndarray]: npyimage_SE2_city = SE2(rotation=R, translation=t) city_SE2_npyimage = npyimage_SE2_city.inverse() - city_contours: List[np.ndarray] = [] + city_contours: List[NDArray[np.float64]] = [] for i, contour_im_coords in enumerate(contours): contour_im_coords = contour_im_coords.squeeze() contour_im_coords = contour_im_coords.astype(np.float64) @@ -175,7 +176,7 @@ def build_centerline_index(self) -> Mapping[str, Mapping[int, LaneSegment]]: def build_city_driveable_area_roi_index( self, - ) -> Mapping[str, Mapping[str, np.ndarray]]: + ) -> Mapping[str, Mapping[str, NDArray[np.float64]]]: """ Load driveable area files from disk. Dilate driveable area to get ROI (takes about 1/2 second). @@ -186,7 +187,7 @@ def build_city_driveable_area_roi_index( city_to_pkl_image_se2: SE(2) that produces takes point in pkl image to city coordinates, e.g. p_city = city_Transformation_pklimage * p_pklimage """ - city_rasterized_da_roi_dict: Dict[str, Dict[str, np.ndarray]] = {} + city_rasterized_da_roi_dict: Dict[str, Dict[str, NDArray[np.float64]]] = {} for city_name, city_id in self.city_name_to_city_id_dict.items(): city_id = self.city_name_to_city_id_dict[city_name] city_rasterized_da_roi_dict[city_name] = {} @@ -200,7 +201,7 @@ def build_city_driveable_area_roi_index( return city_rasterized_da_roi_dict - def build_city_ground_height_index(self) -> Mapping[str, Mapping[str, np.ndarray]]: + def build_city_ground_height_index(self) -> Mapping[str, Mapping[str, NDArray[np.float64]]]: """ Build index of rasterized ground height. @@ -210,7 +211,7 @@ def build_city_ground_height_index(self) -> Mapping[str, Mapping[str, np.ndarray city_to_pkl_image_se2: SE(2) that produces takes point in pkl image to city coordinates, e.g. p_city = city_Transformation_pklimage * p_pklimage """ - city_rasterized_ground_height_dict: Dict[str, Dict[str, np.ndarray]] = {} + city_rasterized_ground_height_dict: Dict[str, Dict[str, NDArray[np.float64]]] = {} for city_name, city_id in self.city_name_to_city_id_dict.items(): city_rasterized_ground_height_dict[city_name] = {} npy_fpath = self.map_files_root / f"{city_name}_{city_id}_ground_height_mat_2019_05_28.npy" @@ -223,7 +224,7 @@ def build_city_ground_height_index(self) -> Mapping[str, Mapping[str, np.ndarray return city_rasterized_ground_height_dict - def get_rasterized_driveable_area(self, city_name: str) -> Tuple[np.ndarray, np.ndarray]: + def get_rasterized_driveable_area(self, city_name: str) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Get the driveable area. @@ -241,7 +242,7 @@ def get_rasterized_driveable_area(self, city_name: str) -> Tuple[np.ndarray, np. self.city_rasterized_da_roi_dict[city_name]["npyimage_to_city_se2"], ) - def get_rasterized_roi(self, city_name: str) -> Tuple[np.ndarray, np.ndarray]: + def get_rasterized_roi(self, city_name: str) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Get the region of interest (5 meter dilation of driveable area). @@ -294,7 +295,7 @@ def render_city_centerlines(self, city_name: str) -> None: das = self.city_to_driveable_areas_dict[city_name] render_global_city_map_bev(lane_centerlines, das, city_name, self, centerline_color_scheme="indegree") - def get_rasterized_ground_height(self, city_name: str) -> Tuple[np.ndarray, np.ndarray]: + def get_rasterized_ground_height(self, city_name: str) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Get ground height matrix along with se2 that convert to city coordinate @@ -313,8 +314,8 @@ def get_rasterized_ground_height(self, city_name: str) -> Tuple[np.ndarray, np.n ) def remove_ground_surface( - self, point_cloud: np.ndarray, city_name: str, return_logicals: bool = False - ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: + self, point_cloud: NDArray[np.float64], city_name: str, return_logicals: bool = False + ) -> Any: """ Get a lidar point, snap it to the grid, perform the O(1) raster map query. If our z-height is within THRESHOLD of that grid's z-height, then we keep it; otherwise, discard it @@ -336,7 +337,7 @@ def remove_ground_surface( return point_cloud[not_ground_indxs] - def remove_non_driveable_area_points(self, point_cloud: np.ndarray, city_name: str) -> np.ndarray: + def remove_non_driveable_area_points(self, point_cloud: NDArray[np.float64], city_name: str) -> Any: """ Get a lidar point, snap it to the grid, perform the O(1) raster map query. If our z-height is within THRESHOLD of that grid's z-height, then we keep it; otherwise, discard it @@ -352,7 +353,7 @@ def remove_non_driveable_area_points(self, point_cloud: np.ndarray, city_name: s is_da_boolean_arr = self.get_raster_layer_points_boolean(point_cloud, city_name, "driveable_area") return point_cloud[is_da_boolean_arr] - def remove_non_roi_points(self, point_cloud: np.ndarray, city_name: str) -> np.ndarray: + def remove_non_roi_points(self, point_cloud: NDArray[np.float64], city_name: str) -> NDArray[np.float64]: """ Remove any points that does't fall within the region of interest (ROI) @@ -366,7 +367,7 @@ def remove_non_roi_points(self, point_cloud: np.ndarray, city_name: str) -> np.n is_roi_boolean_arr = self.get_raster_layer_points_boolean(point_cloud, city_name, "roi") return point_cloud[is_roi_boolean_arr] - def get_ground_points_boolean(self, point_cloud: np.ndarray, city_name: str) -> np.ndarray: + def get_ground_points_boolean(self, point_cloud: NDArray[np.float64], city_name: str) -> Any: """ Check whether each point is likely to be from the ground surface @@ -384,7 +385,7 @@ def get_ground_points_boolean(self, point_cloud: np.ndarray, city_name: str) -> ) return is_ground_boolean_arr - def get_ground_height_at_xy(self, point_cloud: np.ndarray, city_name: str) -> np.ndarray: + def get_ground_height_at_xy(self, point_cloud: NDArray[np.float64], city_name: str) -> NDArray[np.float64]: """ Get ground height for each of the xy location in point_cloud @@ -416,7 +417,9 @@ def get_ground_height_at_xy(self, point_cloud: np.ndarray, city_name: str) -> np return ground_height_values - def append_height_to_2d_city_pt_cloud(self, pt_cloud_xy: np.ndarray, city_name: str) -> np.ndarray: + def append_height_to_2d_city_pt_cloud( + self, pt_cloud_xy: NDArray[np.float64], city_name: str + ) -> NDArray[np.float64]: """ Accept 2d point cloud in xy plane and return 3d point cloud (xyz) @@ -430,7 +433,7 @@ def append_height_to_2d_city_pt_cloud(self, pt_cloud_xy: np.ndarray, city_name: pts_z = self.get_ground_height_at_xy(pt_cloud_xy, city_name) return np.hstack([pt_cloud_xy, pts_z[:, np.newaxis]]) - def get_raster_layer_points_boolean(self, point_cloud: np.ndarray, city_name: str, layer_name: str) -> np.ndarray: + def get_raster_layer_points_boolean(self, point_cloud: NDArray[np.float64], city_name: str, layer_name: str) -> Any: """ driveable area is "da" @@ -477,8 +480,8 @@ def get_raster_layer_points_boolean(self, point_cloud: np.ndarray, city_name: st return is_layer_boolean_arr def get_nearest_centerline( - self, query_xy_city_coords: np.ndarray, city_name: str, visualize: bool = False - ) -> Tuple[LaneSegment, float, np.ndarray]: + self, query_xy_city_coords: NDArray[np.float64], city_name: str, visualize: bool = False + ) -> Tuple[LaneSegment, float, NDArray[np.float64]]: """ KD Tree with k-closest neighbors or a fixed radius search on the lane centroids is unreliable since (1) there is highly variable density throughout the map and (2) @@ -555,8 +558,8 @@ def get_nearest_centerline( return closest_lane_obj, conf, dense_centerline def get_lane_direction( - self, query_xy_city_coords: np.ndarray, city_name: str, visualize: bool = False - ) -> Tuple[np.ndarray, float]: + self, query_xy_city_coords: NDArray[np.float64], city_name: str, visualize: bool = False + ) -> Tuple[NDArray[np.float64], float]: """ Get vector direction of the lane you're in. We ignore the sparse version of the centerline that we could @@ -712,7 +715,7 @@ def get_lane_segment_adjacent_ids(self, lane_segment_id: int, city_name: str) -> adjacent_ids = [r_neighbor, l_neighbor] return adjacent_ids - def get_lane_segment_centerline(self, lane_segment_id: int, city_name: str) -> np.ndarray: + def get_lane_segment_centerline(self, lane_segment_id: int, city_name: str) -> NDArray[np.float64]: """ We return a 3D centerline for any particular lane segment. @@ -729,7 +732,7 @@ def get_lane_segment_centerline(self, lane_segment_id: int, city_name: str) -> n return lane_centerline - def get_lane_segment_polygon(self, lane_segment_id: int, city_name: str) -> np.ndarray: + def get_lane_segment_polygon(self, lane_segment_id: int, city_name: str) -> NDArray[np.float64]: """ Hallucinate a 3d lane polygon based around the centerline. We rely on the average lane width within our cities to hallucinate the boundaries. We rely upon the @@ -788,7 +791,7 @@ def lane_has_traffic_control_measure(self, lane_segment_id: int, city_name: str) return self.city_lane_centerlines_dict[city_name][lane_segment_id].has_traffic_control def remove_extended_predecessors( - self, lane_seqs: List[List[int]], xy: np.ndarray, city_name: str + self, lane_seqs: List[List[int]], xy: NDArray[np.float64], city_name: str ) -> List[List[int]]: """ Remove lane_ids which are obtained by finding way too many predecessors from lane sequences. @@ -814,7 +817,7 @@ def remove_extended_predecessors( filtered_lane_seq.append(new_lane_seq) return filtered_lane_seq - def get_cl_from_lane_seq(self, lane_seqs: Iterable[List[int]], city_name: str) -> List[np.ndarray]: + def get_cl_from_lane_seq(self, lane_seqs: Iterable[List[int]], city_name: str) -> List[NDArray[np.float64]]: """Get centerlines corresponding to each lane sequence in lane_sequences Args: @@ -836,11 +839,11 @@ def get_cl_from_lane_seq(self, lane_seqs: Iterable[List[int]], city_name: str) - def get_candidate_centerlines_for_traj( self, - xy: np.ndarray, + xy: NDArray[np.float64], city_name: str, viz: bool = False, max_search_radius: float = 50.0, - ) -> List[np.ndarray]: + ) -> List[NDArray[np.float64]]: """Get centerline candidates upto a threshold. . Algorithm: @@ -1057,7 +1060,9 @@ def plot_nearby_halluc_lanes( halluc_lane_polygon = self.get_lane_segment_polygon(nearby_lane_id, city_name) plot_lane_segment_patch(halluc_lane_polygon, ax, color=patch_color, alpha=0.3) - def find_local_lane_polygons(self, query_bbox: Tuple[float, float, float, float], city_name: str) -> np.ndarray: + def find_local_lane_polygons( + self, query_bbox: Tuple[float, float, float, float], city_name: str + ) -> NDArray[np.float64]: """ Find land polygons within specified area @@ -1082,7 +1087,9 @@ def find_local_lane_polygons(self, query_bbox: Tuple[float, float, float, float] ) return local_lane_polygons - def find_local_driveable_areas(self, query_bbox: Tuple[float, float, float, float], city_name: str) -> np.ndarray: + def find_local_driveable_areas( + self, query_bbox: Tuple[float, float, float, float], city_name: str + ) -> NDArray[np.float64]: """ Find local driveable areas within specified area @@ -1113,7 +1120,7 @@ def find_local_lane_centerlines( query_y: float, city_name: str, query_search_range_manhattan: float = 80.0, - ) -> np.ndarray: + ) -> NDArray[np.float64]: """ Find local lane centerline to the specified x,y location diff --git a/argoverse/map_representation/map_viz_helper.py b/argoverse/map_representation/map_viz_helper.py index f568afcc..89f53047 100644 --- a/argoverse/map_representation/map_viz_helper.py +++ b/argoverse/map_representation/map_viz_helper.py @@ -3,11 +3,12 @@ import copy import math import warnings -from typing import Mapping, Tuple +from typing import Any, Mapping, Tuple import cv2 import numpy as np from colour import Color +from numpy.typing import NDArray from typing_extensions import Protocol from argoverse.utils.cv2_plotting_utils import draw_polygon_cv2, draw_polyline_cv2 @@ -23,7 +24,7 @@ def _find_min_coords_das( - driveable_areas: np.ndarray, xmin: float, ymin: float, xmax: float, ymax: float + driveable_areas: NDArray[np.float64], xmin: float, ymin: float, xmax: float, ymax: float ) -> Tuple[float, float, float, float]: for da in driveable_areas: xmin = min(da[:, 0].min(), xmin) @@ -47,7 +48,7 @@ def _find_min_coords_centerlines( return xmin, ymin, xmax, ymax -def _get_opencv_green_to_red_colormap(num_color_bins: int) -> np.ndarray: +def _get_opencv_green_to_red_colormap(num_color_bins: int) -> Any: """Create a red to green BGR colormap with a finite number of steps. Args: @@ -63,7 +64,7 @@ def _get_opencv_green_to_red_colormap(num_color_bins: int) -> np.ndarray: def _get_int_image_bounds_from_city_coords( - driveable_areas: np.ndarray, lane_centerlines: LaneCenterline + driveable_areas: NDArray[np.float64], lane_centerlines: LaneCenterline ) -> Tuple[int, int, int, int, int, int]: """Get the internal iamge bounds based on the city coordinates @@ -93,13 +94,13 @@ def _get_int_image_bounds_from_city_coords( class MapProtocol(Protocol): - def remove_non_driveable_area_points(self, point_cloud: np.ndarray, city_name: str) -> np.ndarray: + def remove_non_driveable_area_points(self, point_cloud: NDArray[np.float64], city_name: str) -> NDArray[np.float64]: ... def render_global_city_map_bev( lane_centerlines: LaneCenterline, - driveable_areas: np.ndarray, + driveable_areas: NDArray[np.float64], city_name: str, avm: MapProtocol, plot_rasterized_roi: bool = True, diff --git a/argoverse/utils/calibration.py b/argoverse/utils/calibration.py index bcd4924c..8a08d524 100644 --- a/argoverse/utils/calibration.py +++ b/argoverse/utils/calibration.py @@ -8,6 +8,7 @@ import imageio import numpy as np +from numpy.typing import NDArray from typing_extensions import Literal from argoverse.data_loading.pose_loader import get_city_SE3_egovehicle_at_sensor_t @@ -27,11 +28,11 @@ class CameraConfig(NamedTuple): img_height: image height """ - extrinsic: np.ndarray - intrinsic: np.ndarray + extrinsic: NDArray[np.float64] + intrinsic: NDArray[np.float64] img_width: int img_height: int - distortion_coeffs: np.ndarray + distortion_coeffs: NDArray[np.float64] class Calibration: @@ -86,7 +87,7 @@ def __init__(self, camera_config: CameraConfig, calib: Dict[str, Any]) -> None: self.camera = calib["key"][10:] - def cart2hom(self, pts_3d: np.ndarray) -> np.ndarray: + def cart2hom(self, pts_3d: NDArray[np.float64]) -> NDArray[np.float64]: """Convert Cartesian coordinates to Homogeneous. Args: @@ -99,7 +100,7 @@ def cart2hom(self, pts_3d: np.ndarray) -> np.ndarray: pts_3d_hom = np.hstack((pts_3d, np.ones((n, 1)))) return pts_3d_hom - def project_ego_to_image(self, pts_3d_ego: np.ndarray) -> np.ndarray: + def project_ego_to_image(self, pts_3d_ego: NDArray[np.float64]) -> NDArray[np.float64]: """Project egovehicle coordinate to image. Args: @@ -112,7 +113,7 @@ def project_ego_to_image(self, pts_3d_ego: np.ndarray) -> np.ndarray: uv_cam = self.project_ego_to_cam(pts_3d_ego) return self.project_cam_to_image(uv_cam) - def project_ego_to_cam(self, pts_3d_ego: np.ndarray) -> np.ndarray: + def project_ego_to_cam(self, pts_3d_ego: NDArray[np.float64]) -> Any: """Project egovehicle point onto camera frame. Args: @@ -126,7 +127,7 @@ def project_ego_to_cam(self, pts_3d_ego: np.ndarray) -> np.ndarray: return uv_cam.transpose()[:, 0:3] - def project_cam_to_ego(self, pts_3d_rect: np.ndarray) -> np.ndarray: + def project_cam_to_ego(self, pts_3d_rect: NDArray[np.float64]) -> Any: """Project point in camera frame to egovehicle frame. Args: @@ -137,7 +138,7 @@ def project_cam_to_ego(self, pts_3d_rect: np.ndarray) -> np.ndarray: """ return np.linalg.inv((self.extrinsic)).dot(self.cart2hom(pts_3d_rect).transpose()).transpose()[:, 0:3] - def project_image_to_ego(self, uv_depth: np.ndarray) -> np.ndarray: + def project_image_to_ego(self, uv_depth: NDArray[np.float64]) -> Any: """Project 2D image with depth to egovehicle coordinate. Args: @@ -150,7 +151,7 @@ def project_image_to_ego(self, uv_depth: np.ndarray) -> np.ndarray: uv_cam = self.project_image_to_cam(uv_depth) return self.project_cam_to_ego(uv_cam) - def project_image_to_cam(self, uv_depth: np.ndarray) -> np.ndarray: + def project_image_to_cam(self, uv_depth: NDArray[np.float64]) -> NDArray[np.float64]: """Project 2D image with depth to camera coordinate. Args: @@ -172,7 +173,7 @@ def project_image_to_cam(self, uv_depth: np.ndarray) -> np.ndarray: pts_3d_cam[:, 2] = uv_depth[:, 2] return pts_3d_cam - def project_cam_to_image(self, pts_3d_rect: np.ndarray) -> np.ndarray: + def project_cam_to_image(self, pts_3d_rect: NDArray[np.float64]) -> NDArray[np.float64]: """Project camera coordinate to image. Args: @@ -187,7 +188,7 @@ def project_cam_to_image(self, pts_3d_rect: np.ndarray) -> np.ndarray: return uv.transpose() -def load_image(img_filename: Union[str, Path]) -> np.ndarray: +def load_image(img_filename: Union[str, Path]) -> Any: """Load image. Args: @@ -255,7 +256,7 @@ def load_stereo_calib(calib_filepath: Union[str, Path]) -> Dict[Any, Calibration return calib_list -def get_camera_extrinsic_matrix(config: Dict[str, Any]) -> np.ndarray: +def get_camera_extrinsic_matrix(config: Dict[str, Any]) -> NDArray[np.float64]: """Load camera calibration rotation and translation. Note that the camera calibration file contains the SE3 for sensor frame to the vehicle frame, i.e. @@ -279,7 +280,7 @@ def get_camera_extrinsic_matrix(config: Dict[str, Any]) -> np.ndarray: return egovehicle_T_camera.inverse().transform_matrix -def get_camera_intrinsic_matrix(camera_config: Dict[str, Any]) -> np.ndarray: +def get_camera_intrinsic_matrix(camera_config: Dict[str, Any]) -> NDArray[np.float64]: """Load camera calibration data and constructs intrinsic matrix. Args: @@ -333,7 +334,7 @@ def get_calibration_config(calibration: Dict[str, Any], camera_name: str) -> Cam ) -def point_cloud_to_homogeneous(points: np.ndarray) -> np.ndarray: +def point_cloud_to_homogeneous(points: NDArray[np.float64]) -> NDArray[np.float64]: """ Args: points: Numpy array of shape (N,3) @@ -345,7 +346,9 @@ def point_cloud_to_homogeneous(points: np.ndarray) -> np.ndarray: return np.hstack([points, np.ones((num_pts, 1))]) -def remove_nan_values(uv: np.ndarray, uv_cam: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def remove_nan_values( + uv: NDArray[np.float64], uv_cam: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Accept corrupt array""" uv = uv.T @@ -359,7 +362,9 @@ def remove_nan_values(uv: np.ndarray, uv_cam: np.ndarray) -> Tuple[np.ndarray, n return uv.T, uv_cam.T -def determine_valid_cam_coords(uv: np.ndarray, uv_cam: np.ndarray, camera_config: CameraConfig) -> np.ndarray: +def determine_valid_cam_coords( + uv: NDArray[np.float64], uv_cam: NDArray[np.float64], camera_config: CameraConfig +) -> Any: """ Given a set of coordinates in the image plane and corresponding points in the camera coordinate reference frame, determine those points @@ -392,13 +397,13 @@ def determine_valid_cam_coords(uv: np.ndarray, uv_cam: np.ndarray, camera_config # uv_cam: Numpy array of shape (N,3) with dtype np.float32 # valid_pts_bool: Numpy array of shape (N,) with dtype bool # camera configuration : (only if you asked for it). -_ReturnWithConfig = Tuple[np.ndarray, np.ndarray, np.ndarray, CameraConfig] -_ReturnWithoutConfig = Tuple[np.ndarray, np.ndarray, np.ndarray] +_ReturnWithConfig = Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], CameraConfig] +_ReturnWithoutConfig = Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]] @overload def project_lidar_to_img( - lidar_points_h: np.ndarray, + lidar_points_h: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, return_camera_config: Literal[True], @@ -409,7 +414,7 @@ def project_lidar_to_img( @overload def project_lidar_to_img( - lidar_points_h: np.ndarray, + lidar_points_h: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, return_camera_config: Literal[False], @@ -420,7 +425,7 @@ def project_lidar_to_img( @overload def project_lidar_to_img( - lidar_points_h: np.ndarray, + lidar_points_h: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, return_camera_config: bool = False, @@ -430,7 +435,7 @@ def project_lidar_to_img( def project_lidar_to_img( - lidar_points_h: np.ndarray, + lidar_points_h: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, return_camera_config: bool = False, @@ -471,8 +476,8 @@ def project_lidar_to_img( def proj_cam_to_uv( - uv_cam: np.ndarray, camera_config: CameraConfig -) -> Tuple[np.ndarray, np.ndarray, np.ndarray, CameraConfig]: + uv_cam: NDArray[np.float64], camera_config: CameraConfig +) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], CameraConfig]: num_points = uv_cam.shape[0] uvh = np.zeros((num_points, 3)) # (x_transformed_m, y_transformed_m, z_transformed_m) @@ -529,11 +534,11 @@ def distort_single(radius_undist: float, distort_coeffs: List[float]) -> float: def project_lidar_to_undistorted_img( - lidar_points_h: np.ndarray, + lidar_points_h: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, remove_nan: bool = False, -) -> Tuple[np.ndarray, np.ndarray, np.ndarray, CameraConfig]: +) -> Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], CameraConfig]: camera_config = get_calibration_config(calib_data, camera_name) R = camera_config.extrinsic[:3, :3] @@ -556,17 +561,19 @@ def project_lidar_to_undistorted_img( # valid_pts_bool: Numpy array of shape (N,) with dtype bool # camera configuration : (only if you asked for it). _ReturnWithOptConfig = Tuple[ - Optional[np.ndarray], - Optional[np.ndarray], - Optional[np.ndarray], + Optional[NDArray[np.float64]], + Optional[NDArray[np.float64]], + Optional[NDArray[np.float64]], Optional[CameraConfig], ] -_ReturnWithoutOptConfig = Tuple[Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray]] +_ReturnWithoutOptConfig = Tuple[ + Optional[NDArray[np.float64]], Optional[NDArray[np.float64]], Optional[NDArray[np.float64]] +] @overload def project_lidar_to_img_motion_compensated( - pts_h_lidar_time: np.ndarray, + pts_h_lidar_time: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, cam_timestamp: int, @@ -580,7 +587,7 @@ def project_lidar_to_img_motion_compensated( @overload def project_lidar_to_img_motion_compensated( - pts_h_lidar_time: np.ndarray, + pts_h_lidar_time: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, cam_timestamp: int, @@ -594,7 +601,7 @@ def project_lidar_to_img_motion_compensated( @overload def project_lidar_to_img_motion_compensated( - pts_h_lidar_time: np.ndarray, + pts_h_lidar_time: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, cam_timestamp: int, @@ -607,7 +614,7 @@ def project_lidar_to_img_motion_compensated( def project_lidar_to_img_motion_compensated( - pts_h_lidar_time: np.ndarray, + pts_h_lidar_time: NDArray[np.float64], calib_data: Dict[str, Any], camera_name: str, cam_timestamp: int, diff --git a/argoverse/utils/centerline_utils.py b/argoverse/utils/centerline_utils.py index 94cbfa62..f77a4ecd 100644 --- a/argoverse/utils/centerline_utils.py +++ b/argoverse/utils/centerline_utils.py @@ -6,6 +6,7 @@ import matplotlib.pyplot as plt import numpy as np +from numpy.typing import NDArray from shapely.geometry import LinearRing, LineString, Point from argoverse.map_representation.lane_segment import LaneSegment @@ -15,8 +16,8 @@ def swap_left_and_right( - condition: np.ndarray, left_centerline: np.ndarray, right_centerline: np.ndarray -) -> Iterable[np.ndarray]: + condition: NDArray[np.float64], left_centerline: NDArray[np.float64], right_centerline: NDArray[np.float64] +) -> Iterable[NDArray[np.float64]]: """ Swap points in left and right centerline according to condition. @@ -40,8 +41,8 @@ def swap_left_and_right( def centerline_to_polygon( - centerline: np.ndarray, width_scaling_factor: float = 1.0, visualize: bool = False -) -> np.ndarray: + centerline: NDArray[np.float64], width_scaling_factor: float = 1.0, visualize: bool = False +) -> NDArray[np.float64]: """ Convert a lane centerline polyline into a rough polygon of the lane's area. @@ -75,8 +76,8 @@ def centerline_to_polygon( y_disp = 3.8 * width_scaling_factor / 2.0 * np.sin(thetas) displacement = np.hstack([x_disp[:, np.newaxis], y_disp[:, np.newaxis]]) - right_centerline = centerline + displacement - left_centerline = centerline - displacement + right_centerline: NDArray[np.float64] = centerline + displacement + left_centerline: NDArray[np.float64] = centerline - displacement # right centerline position depends on sign of dx and dy subtract_cond1 = np.logical_and(dx > 0, dy < 0) @@ -100,7 +101,9 @@ def centerline_to_polygon( return convert_lane_boundaries_to_polygon(right_centerline, left_centerline) -def convert_lane_boundaries_to_polygon(right_lane_bounds: np.ndarray, left_lane_bounds: np.ndarray) -> np.ndarray: +def convert_lane_boundaries_to_polygon( + right_lane_bounds: NDArray[np.float64], left_lane_bounds: NDArray[np.float64] +) -> NDArray[np.float64]: """ Take a left and right lane boundary and make a polygon of the lane segment, closing both ends of the segment. @@ -120,11 +123,11 @@ def convert_lane_boundaries_to_polygon(right_lane_bounds: np.ndarray, left_lane_ def filter_candidate_centerlines( - xy: np.ndarray, - candidate_cl: List[np.ndarray], + xy: NDArray[np.float64], + candidate_cl: List[NDArray[np.float64]], stationary_threshold: float = 2.0, max_dist_margin: float = 2.0, -) -> List[np.ndarray]: +) -> List[NDArray[np.float64]]: """Filter candidate centerlines based on the distance travelled along the centerline. Args: @@ -197,7 +200,7 @@ def is_overlapping_lane_seq(lane_seq1: Sequence[int], lane_seq2: Sequence[int]) def get_normal_and_tangential_distance_point( - x: float, y: float, centerline: np.ndarray, delta: float = 0.01, last: bool = False + x: float, y: float, centerline: NDArray[np.float64], delta: float = 0.01, last: bool = False ) -> Tuple[float, float]: """Get normal (offset from centerline) and tangential (distance along centerline) for the given point, along the given centerline @@ -240,7 +243,7 @@ def get_normal_and_tangential_distance_point( return (tang_dist, -norm_dist) -def get_nt_distance(xy: np.ndarray, centerline: np.ndarray, viz: bool = False) -> np.ndarray: +def get_nt_distance(xy: NDArray[np.float64], centerline: NDArray[np.float64], viz: bool = False) -> NDArray[np.float64]: """Get normal (offset from centerline) and tangential (distance along centerline) distances for the given xy trajectory, along the given centerline. @@ -278,7 +281,9 @@ def get_nt_distance(xy: np.ndarray, centerline: np.ndarray, viz: bool = False) - return nt_distance -def get_oracle_from_candidate_centerlines(candidate_centerlines: List[np.ndarray], xy: np.ndarray) -> LineString: +def get_oracle_from_candidate_centerlines( + candidate_centerlines: List[NDArray[np.float64]], xy: NDArray[np.float64] +) -> LineString: """Get oracle centerline from candidate centerlines. Chose based on direction of travel and maximum offset. First find the centerlines along which the distance travelled is close to maximum. If there are multiple candidates, then chose the one which has minimum max offset @@ -320,7 +325,9 @@ def get_oracle_from_candidate_centerlines(candidate_centerlines: List[np.ndarray return oracle_centerline -def get_centerlines_most_aligned_with_trajectory(xy: np.ndarray, candidate_cl: List[np.ndarray]) -> List[np.ndarray]: +def get_centerlines_most_aligned_with_trajectory( + xy: NDArray[np.float64], candidate_cl: List[NDArray[np.float64]] +) -> List[NDArray[np.float64]]: """Get the centerline from candidate_cl along which the trajectory travelled maximum distance Args: @@ -371,8 +378,8 @@ def remove_overlapping_lane_seq(lane_seqs: List[List[int]]) -> List[List[int]]: def lane_waypt_to_query_dist( - query_xy_city_coords: np.ndarray, nearby_lane_objs: List[LaneSegment] -) -> Tuple[np.ndarray, np.ndarray, List[np.ndarray]]: + query_xy_city_coords: NDArray[np.float64], nearby_lane_objs: List[LaneSegment] +) -> Tuple[NDArray[np.float64], NDArray[np.float64], List[NDArray[np.float64]]]: """ Compute the distance from a query to the closest waypoint in nearby lanes. @@ -386,7 +393,7 @@ def lane_waypt_to_query_dist( dense_centerlines: list of arrays, each representing (N,2) centerline """ per_lane_dists: List[float] = [] - dense_centerlines: List[np.ndarray] = [] + dense_centerlines: List[NDArray[np.float64]] = [] for nn_idx, lane_obj in enumerate(nearby_lane_objs): centerline = lane_obj.centerline # densely sample more points diff --git a/argoverse/utils/city_visibility_utils.py b/argoverse/utils/city_visibility_utils.py index 5b6975c9..026fc0ef 100644 --- a/argoverse/utils/city_visibility_utils.py +++ b/argoverse/utils/city_visibility_utils.py @@ -2,11 +2,12 @@ import matplotlib.pyplot as plt import numpy as np +from numpy.typing import NDArray def clip_point_cloud_to_visible_region( - egovehicle_pts: np.ndarray, lidar_pts: np.ndarray, n_polar_bins: int = 100 -) -> np.ndarray: + egovehicle_pts: NDArray[np.float64], lidar_pts: NDArray[np.float64], n_polar_bins: int = 100 +) -> NDArray[np.float64]: """ LiDAR points provide visibility information for a map point cloud in city coordinates. We bin the world into polar bins, and then if you've gone past the farthest LiDAR point @@ -65,7 +66,9 @@ def clip_point_cloud_to_visible_region( return egovehicle_pts -def viz_polar_bin_contents(bin_lidar_pts: np.ndarray, invalid_egovehicle_pts: np.ndarray, filename: str) -> None: +def viz_polar_bin_contents( + bin_lidar_pts: NDArray[np.float64], invalid_egovehicle_pts: NDArray[np.float64], filename: str +) -> None: """ Visualize what the utility is doing within each polar bin. diff --git a/argoverse/utils/cuboid_interior.py b/argoverse/utils/cuboid_interior.py index 4669be6d..6371eb57 100644 --- a/argoverse/utils/cuboid_interior.py +++ b/argoverse/utils/cuboid_interior.py @@ -15,13 +15,14 @@ # import copy -from typing import Optional, Tuple +from typing import Any, Optional, Tuple import numpy as np +from numpy.typing import NDArray from scipy.spatial import Delaunay -def filter_point_cloud_to_bbox(bbox: np.ndarray, velodyne_pts: np.ndarray) -> Optional[np.ndarray]: +def filter_point_cloud_to_bbox(bbox: NDArray[np.float64], velodyne_pts: NDArray[np.float64]) -> Any: """ Given 2 orthogonal directions "u", "v" defined by 3 bbox vertices, s.t.:: @@ -74,7 +75,9 @@ def filter_point_cloud_to_bbox(bbox: np.ndarray, velodyne_pts: np.ndarray) -> Op return interior_pts -def filter_point_cloud_to_bbox_2D_vectorized(bbox: np.ndarray, pc_raw: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def filter_point_cloud_to_bbox_2D_vectorized( + bbox: NDArray[np.float64], pc_raw: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Args: bbox: NumPy array of shape (4,2) representing 2D bbox @@ -98,6 +101,7 @@ def filter_point_cloud_to_bbox_2D_vectorized(bbox: np.ndarray, pc_raw: np.ndarra P1 = np.array([bbox[0][0:2]]) P2 = np.array([bbox[1][0:2]]) P4 = np.array([bbox[2][0:2]]) + P5 = np.array([]) dot1 = np.matmul(U, pc_2d.transpose(1, 0)) dot2 = np.matmul(V, pc_2d.transpose(1, 0)) @@ -112,7 +116,7 @@ def filter_point_cloud_to_bbox_2D_vectorized(bbox: np.ndarray, pc_raw: np.ndarra return pc_seg, flag -def filter_point_cloud_to_bbox_3D(bbox: np.ndarray, pc_raw: np.ndarray) -> np.ndarray: +def filter_point_cloud_to_bbox_3D(bbox: NDArray[np.float64], pc_raw: NDArray[np.float64]) -> Any: """ Args: bbox has shape object array: [(3,), (3,), (3,), height] @@ -152,11 +156,11 @@ def filter_point_cloud_to_bbox_3D(bbox: np.ndarray, pc_raw: np.ndarray) -> np.nd return pc_seg -def in_between_matrix(x: np.ndarray, v1: np.ndarray, v2: np.ndarray) -> np.ndarray: +def in_between_matrix(x: NDArray[np.float64], v1: NDArray[np.float64], v2: NDArray[np.float64]) -> Any: return np.logical_or(np.logical_and(x <= v1, x >= v2), np.logical_and(x <= v2, x >= v1)) -def filter_point_cloud_to_bbox_3D_single_pt(bbox: np.ndarray, x: np.ndarray) -> np.ndarray: # pc_raw): +def filter_point_cloud_to_bbox_3D_single_pt(bbox: NDArray[np.float64], x: NDArray[np.float64]) -> Any: # pc_raw): r""" Args: @@ -208,7 +212,9 @@ def filter_point_cloud_to_bbox_3D_single_pt(bbox: np.ndarray, x: np.ndarray) -> return valid -def filter_point_cloud_to_bbox_3D_vectorized(bbox: np.ndarray, pc_raw: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def filter_point_cloud_to_bbox_3D_vectorized( + bbox: NDArray[np.float64], pc_raw: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: r""" Args: @@ -266,7 +272,9 @@ def filter_point_cloud_to_bbox_3D_vectorized(bbox: np.ndarray, pc_raw: np.ndarra return segment_pc, is_valid -def extract_pc_in_box3d_hull(pc: np.ndarray, bbox_3d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def extract_pc_in_box3d_hull( + pc: NDArray[np.float64], bbox_3d: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Find points that fall within a 3d cuboid, by treating the 3d cuboid as a hull. Scipy.spatial's Delaunay class performs tesselation in N dimensions. By finding diff --git a/argoverse/utils/cv2_plotting_utils.py b/argoverse/utils/cv2_plotting_utils.py index 3a95706a..9ced2387 100644 --- a/argoverse/utils/cv2_plotting_utils.py +++ b/argoverse/utils/cv2_plotting_utils.py @@ -1,16 +1,19 @@ # """OpenCV plotting utility functions.""" -from typing import List, Tuple +from typing import Any, List, Tuple import cv2 import numpy as np +from numpy.typing import NDArray from .calibration import CameraConfig, proj_cam_to_uv from .frustum_clipping import clip_segment_v3_plane_n -def add_text_cv2(img: np.ndarray, text: str, x: int, y: int, color: Tuple[int, int, int], thickness: int = 3) -> None: +def add_text_cv2( + img: NDArray[np.float64], text: str, x: int, y: int, color: Tuple[int, int, int], thickness: int = 3 +) -> None: """Add text to image using OpenCV. Color should be BGR order""" img = cv2.putText( img, @@ -25,12 +28,14 @@ def add_text_cv2(img: np.ndarray, text: str, x: int, y: int, color: Tuple[int, i def draw_clipped_line_segment( - img: np.ndarray, - vert_a: np.ndarray, - vert_b: np.ndarray, + img: NDArray[np.float64], + vert_a: NDArray[np.float64], + vert_b: NDArray[np.float64], camera_config: CameraConfig, linewidth: int, - planes: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]], + planes: List[ + Tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]] + ], color: Tuple[int, int, int], ) -> None: """Plot the portion of a line segment that lives within a parameterized 3D camera frustum. @@ -62,7 +67,9 @@ def draw_clipped_line_segment( ) -def draw_point_cloud_in_img_cv2(img: np.ndarray, xy: np.ndarray, colors: np.ndarray, radius: int = 5) -> np.ndarray: +def draw_point_cloud_in_img_cv2( + img: NDArray[np.float64], xy: NDArray[np.float64], colors: NDArray[np.float64], radius: int = 5 +) -> NDArray[np.float64]: """Plot a point cloud in an image by drawing small circles centered at (x,y) locations. Note these are not (u,v) but rather (v,u) coordinate pairs. @@ -84,8 +91,8 @@ def draw_point_cloud_in_img_cv2(img: np.ndarray, xy: np.ndarray, colors: np.ndar def draw_polyline_cv2( - line_segments_arr: np.ndarray, - image: np.ndarray, + line_segments_arr: NDArray[np.float64], + image: NDArray[np.float64], color: Tuple[int, int, int], im_h: int, im_w: int, @@ -113,7 +120,9 @@ def draw_polyline_cv2( image = cv2.line(image, (x1, y1), (x2, y2), color, 2, cv2.LINE_AA) -def draw_polygon_cv2(points: np.ndarray, image: np.ndarray, color: Tuple[int, int, int]) -> np.ndarray: +def draw_polygon_cv2( + points: NDArray[np.float64], image: NDArray[np.float64], color: Tuple[int, int, int] +) -> NDArray[np.float64]: """Draw a polygon onto an image using the given points and fill color. These polygons are often non-convex, so we cannot use cv2.fillConvexPoly(). @@ -135,7 +144,9 @@ def draw_polygon_cv2(points: np.ndarray, image: np.ndarray, color: Tuple[int, in return image -def plot_bbox_polygon_cv2(img: np.ndarray, track_id: str, color: np.ndarray, bbox: np.ndarray) -> np.ndarray: +def plot_bbox_polygon_cv2( + img: NDArray[np.float64], track_id: str, color: NDArray[np.float64], bbox: NDArray[np.float64] +) -> NDArray[np.float64]: """Draw a colored bounding box with a red border. We use OpenCV's rectangle rendering to draw the thin red border. @@ -176,7 +187,7 @@ def plot_bbox_polygon_cv2(img: np.ndarray, track_id: str, color: np.ndarray, bbo return img -def get_img_contours(img: np.ndarray) -> np.ndarray: +def get_img_contours(img: NDArray[np.float64]) -> Any: """ Uses diff --git a/argoverse/utils/cv2_video_utils.py b/argoverse/utils/cv2_video_utils.py index f2c414c5..1a238374 100644 --- a/argoverse/utils/cv2_video_utils.py +++ b/argoverse/utils/cv2_video_utils.py @@ -4,6 +4,7 @@ import cv2 import numpy as np +from numpy.typing import NDArray """ Python-based utilities to avoid blowing up the disk with images, as FFMPEG requires. @@ -42,7 +43,7 @@ def init_outf(self, height: int, width: int) -> None: isColor=True, ) - def add_frame(self, rgb_frame: np.ndarray) -> None: + def add_frame(self, rgb_frame: NDArray[np.float64]) -> None: """Append a frame of shape (h,w,3) to the end of the video file.""" h, w, _ = rgb_frame.shape if self.writer is None: diff --git a/argoverse/utils/dilation_utils.py b/argoverse/utils/dilation_utils.py index fed74c89..6773ef6d 100644 --- a/argoverse/utils/dilation_utils.py +++ b/argoverse/utils/dilation_utils.py @@ -1,11 +1,14 @@ # """Utility functions for dilation.""" +from typing import Any + import cv2 import numpy as np +from numpy.typing import NDArray -def dilate_by_l2(img: np.ndarray, dilation_thresh: float = 5.0) -> np.ndarray: +def dilate_by_l2(img: NDArray[np.float64], dilation_thresh: float = 5.0) -> Any: """Dilate a mask using the L2 distance from a zero pixel. OpenCV's distance transform calculates the DISTANCE TO THE CLOSEST ZERO PIXEL for each @@ -23,7 +26,7 @@ def dilate_by_l2(img: np.ndarray, dilation_thresh: float = 5.0) -> np.ndarray: Returns: An image with the same size with the dilated mask """ - mask_diff = np.ones_like(img).astype(np.uint8) - img + mask_diff: int = np.ones_like(img).astype(np.uint8) - img distance_mask = cv2.distanceTransform(mask_diff, distanceType=cv2.DIST_L2, maskSize=cv2.DIST_MASK_PRECISE) distance_mask = distance_mask.astype(np.float32) return (distance_mask <= dilation_thresh).astype(np.uint8) diff --git a/argoverse/utils/forecasting_evaluation.py b/argoverse/utils/forecasting_evaluation.py index 9e40bf83..33d3e882 100644 --- a/argoverse/utils/forecasting_evaluation.py +++ b/argoverse/utils/forecasting_evaluation.py @@ -1,9 +1,12 @@ # +from typing import Any, Optional + import numpy as np +from numpy.typing import NDArray -def compute_summed_distance_point_cloud2D(points_a: np.ndarray, points_b: np.ndarray) -> np.ndarray: +def compute_summed_distance_point_cloud2D(points_a: NDArray[np.float64], points_b: NDArray[np.float64]) -> Any: """ Args: points_a: numpy n-d array with dims (N x 2) @@ -16,10 +19,10 @@ def compute_summed_distance_point_cloud2D(points_a: np.ndarray, points_b: np.nda def evaluate_prediction( - pred_traj: np.ndarray, - ground_truth_traj: np.ndarray, + pred_traj: NDArray[np.float64], + ground_truth_traj: NDArray[np.float64], eval_method: str = "EVAL_DESTINATION_ONLY", -) -> np.ndarray: +) -> Optional[Any]: """Compute the error as L2 norm in trajectories Args: diff --git a/argoverse/utils/frustum_clipping.py b/argoverse/utils/frustum_clipping.py index 1f200b1f..b6b34fe7 100644 --- a/argoverse/utils/frustum_clipping.py +++ b/argoverse/utils/frustum_clipping.py @@ -9,12 +9,13 @@ from typing import Any, List, Optional, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.utils.camera_stats import get_image_dims_for_camera from argoverse.utils.manhattan_search import compute_point_cloud_bbox -def fit_plane_to_point_cloud(pc: np.ndarray) -> Tuple[Any, Any, Any, Any]: +def fit_plane_to_point_cloud(pc: NDArray[np.float64]) -> Tuple[Any, Any, Any, Any]: """Use SVD with at least 3 points to fit a plane. Args: @@ -33,7 +34,7 @@ def fit_plane_to_point_cloud(pc: np.ndarray) -> Tuple[Any, Any, Any, Any]: return a, b, c, d -def form_right_clipping_plane(fx: float, img_width: int) -> np.ndarray: +def form_right_clipping_plane(fx: float, img_width: int) -> NDArray[np.float64]: """Form the right clipping plane for a camera view frustum. In the camera coordinate frame, y is down the imager, x is across the imager, @@ -71,7 +72,7 @@ def form_right_clipping_plane(fx: float, img_width: int) -> np.ndarray: return right_plane -def form_left_clipping_plane(fx: float, img_width: int) -> np.ndarray: +def form_left_clipping_plane(fx: float, img_width: int) -> NDArray[np.float64]: r"""Form the left clipping plane for a camera view frustum. In the camera coordinate frame, y is down the imager, x is across the imager, @@ -107,7 +108,7 @@ def form_left_clipping_plane(fx: float, img_width: int) -> np.ndarray: return left_plane -def form_top_clipping_plane(fx: float, img_height: int) -> np.ndarray: +def form_top_clipping_plane(fx: float, img_height: int) -> NDArray[np.float64]: r"""Form the top clipping plane for a camera view frustum. In the camera coordinate frame, y is down the imager, x is across the imager, @@ -143,7 +144,7 @@ def form_top_clipping_plane(fx: float, img_height: int) -> np.ndarray: return top_plane -def form_low_clipping_plane(fx: float, img_height: int) -> np.ndarray: +def form_low_clipping_plane(fx: float, img_height: int) -> NDArray[np.float64]: r"""Form the low clipping plane for a camera view frustum. Use 3 points to fit the low clipping plane. In the camera coordinate frame, @@ -183,7 +184,7 @@ def form_low_clipping_plane(fx: float, img_height: int) -> np.ndarray: return low_plane -def form_near_clipping_plane(near_clip_dist: float) -> np.ndarray: +def form_near_clipping_plane(near_clip_dist: float) -> NDArray[np.float64]: """Form the near clipping plane for a camera view frustum. In the camera coordinate frame, y is down the imager, x is across the imager, @@ -201,7 +202,9 @@ def form_near_clipping_plane(near_clip_dist: float) -> np.ndarray: return np.array([0.0, 0.0, 1.0, -near_clip_dist]) -def generate_frustum_planes(K: np.ndarray, camera_name: str, near_clip_dist: float = 0.5) -> Optional[List[np.ndarray]]: +def generate_frustum_planes( + K: NDArray[np.float64], camera_name: str, near_clip_dist: float = 0.5 +) -> Optional[List[NDArray[np.float64]]]: """Compute the planes enclosing the field of view (viewing frustum) for a single camera. We do this using similar triangles. @@ -251,8 +254,8 @@ def generate_frustum_planes(K: np.ndarray, camera_name: str, near_clip_dist: flo def clip_segment_v3_plane_n( - p1: np.ndarray, p2: np.ndarray, planes: List[np.ndarray] -) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: + p1: NDArray[np.float64], p2: NDArray[np.float64], planes: List[NDArray[np.float64]] +) -> Tuple[Optional[NDArray[np.float64]], Optional[NDArray[np.float64]]]: """Iterate over the frustum planes and intersect them with the segment. This updating the min/max, bailing out early if the min > max. @@ -275,7 +278,7 @@ def clip_segment_v3_plane_n( Returns: 2 vector triplets (the clipped segment) or (None, None) meaning the segment is entirely outside the frustum. """ - dp = p2 - p1 + dp: NDArray[np.float64] = p2 - p1 p1_fac = 0.0 p2_fac = 1.0 @@ -308,12 +311,12 @@ def clip_segment_v3_plane_n( if p1_fac > p2_fac: return None, None - p1_clip = p1 + (dp * p1_fac) - p2_clip = p1 + (dp * p2_fac) + p1_clip: NDArray[np.float64] = p1 + (dp * p1_fac) + p2_clip: NDArray[np.float64] = p1 + (dp * p2_fac) return p1_clip, p2_clip -def plane_point_side_v3(p: np.ndarray, v: np.ndarray) -> Any: +def plane_point_side_v3(p: NDArray[np.float64], v: NDArray[np.float64]) -> Any: """Get sign of point to plane distance. This function does not compute the actual distance. @@ -331,7 +334,9 @@ def plane_point_side_v3(p: np.ndarray, v: np.ndarray) -> Any: return p[:3].dot(v) + p[3] -def cuboid_to_2d_frustum_bbox(corners: np.ndarray, planes: List[np.ndarray], K: np.ndarray) -> Optional[np.ndarray]: +def cuboid_to_2d_frustum_bbox( + corners: NDArray[np.float64], planes: List[NDArray[np.float64]], K: NDArray[np.float64] +) -> Optional[NDArray[np.float64]]: """Convert a 3D cuboid to a 2D frustum bounding box. We bring the 3D points into each camera, and do the clipping there. @@ -345,7 +350,9 @@ def cuboid_to_2d_frustum_bbox(corners: np.ndarray, planes: List[np.ndarray], K: bbox_2d: Numpy array of shape (4,) with entries [x_min,y_min,x_max,y_max] """ - def clip_line_segment(pt_a: np.ndarray, pt_b: np.ndarray, K: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + def clip_line_segment( + pt_a: NDArray[np.float64], pt_b: NDArray[np.float64], K: NDArray[np.float64] + ) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """Clip a line segment based on two points and the camera instrinc matrix. Args: @@ -364,7 +371,7 @@ def clip_line_segment(pt_a: np.ndarray, pt_b: np.ndarray, K: np.ndarray) -> Tupl return np.round(pt_a).astype(np.int32), np.round(pt_b).astype(np.int32) - def clip_rect(selected_corners: np.ndarray, clipped_uv_verts: np.ndarray) -> np.ndarray: + def clip_rect(selected_corners: NDArray[np.float64], clipped_uv_verts: NDArray[np.float64]) -> NDArray[np.float64]: """Clip a rectangle based on the selected corners and clipped vertices coordinates. Args: diff --git a/argoverse/utils/geometry.py b/argoverse/utils/geometry.py index c71a0a60..0b0ade69 100644 --- a/argoverse/utils/geometry.py +++ b/argoverse/utils/geometry.py @@ -4,13 +4,16 @@ For filtering to 3D polygons, please see cuboid_interior.py or iou_3d.py instead. """ -from typing import Optional, cast +from typing import Any, Optional, cast import numpy as np +from numpy.typing import NDArray from shapely.geometry import Point, Polygon -def rotate_polygon_about_pt(pts: np.ndarray, rotmat: np.ndarray, center_pt: np.ndarray) -> np.ndarray: +def rotate_polygon_about_pt( + pts: NDArray[np.float64], rotmat: NDArray[np.float64], center_pt: NDArray[np.float64] +) -> Any: """Rotate a polygon about a point with a given rotation matrix. Args: @@ -27,7 +30,7 @@ def rotate_polygon_about_pt(pts: np.ndarray, rotmat: np.ndarray, center_pt: np.n return rot_pts -def filter_point_cloud_to_polygon(polygon: np.ndarray, point_cloud: np.ndarray) -> Optional[np.ndarray]: +def filter_point_cloud_to_polygon(polygon: NDArray[np.float64], point_cloud: NDArray[np.float64]) -> Any: """Filter a point cloud to the points within a polygon. Args: @@ -53,8 +56,8 @@ def filter_point_cloud_to_polygon(polygon: np.ndarray, point_cloud: np.ndarray) def point_inside_polygon( n_vertices: int, - poly_x_pts: np.ndarray, - poly_y_pts: np.ndarray, + poly_x_pts: NDArray[np.float64], + poly_y_pts: NDArray[np.float64], test_x: float, test_y: float, ) -> bool: diff --git a/argoverse/utils/grid_interpolation.py b/argoverse/utils/grid_interpolation.py index 0539b5ba..fdf42796 100644 --- a/argoverse/utils/grid_interpolation.py +++ b/argoverse/utils/grid_interpolation.py @@ -1,15 +1,18 @@ # +from typing import Any + import numpy as np +from numpy.typing import NDArray from scipy.interpolate import RegularGridInterpolator def interp_square_grid( - grid_data: np.ndarray, + grid_data: NDArray[np.float64], in_dim: int = 200, out_dim: int = 30, interp_type: str = "linear", -) -> np.ndarray: +) -> Any: """ Interpolate a square grid Thousands of times faster than scipy.interpolate.interp2d. diff --git a/argoverse/utils/helpers.py b/argoverse/utils/helpers.py index 002ccf77..e0eeff83 100644 --- a/argoverse/utils/helpers.py +++ b/argoverse/utils/helpers.py @@ -3,9 +3,10 @@ from typing import Optional, Sequence import numpy as np +from numpy.typing import NDArray -def assert_np_array_shape(array: np.ndarray, target_shape: Sequence[Optional[int]]) -> None: +def assert_np_array_shape(array: NDArray[np.float64], target_shape: Sequence[Optional[int]]) -> None: """Check for shape correctness. Args: diff --git a/argoverse/utils/heuristic_ground_removal.py b/argoverse/utils/heuristic_ground_removal.py index 7c53bf1c..9b395387 100644 --- a/argoverse/utils/heuristic_ground_removal.py +++ b/argoverse/utils/heuristic_ground_removal.py @@ -3,6 +3,7 @@ from typing import List import numpy as np +from numpy.typing import NDArray LIDAR_RANGE = 250 GRID_DIST = 0.4 @@ -11,7 +12,7 @@ NUM_ANGLE_BINS = 2000 -def filter_ground_pts_polar_grid_mean_var(lidar_pts: np.ndarray) -> np.ndarray: +def filter_ground_pts_polar_grid_mean_var(lidar_pts: NDArray[np.float64]) -> NDArray[np.float64]: """ We divide the world into polar voxels. We aggregate the height statistics of all of the points that fall into each polar voxel. @@ -25,7 +26,7 @@ def filter_ground_pts_polar_grid_mean_var(lidar_pts: np.ndarray) -> np.ndarray: non_ground_lidar_pts: NumPy n-d array of shape (n,3) """ print("Total number of points (before filtering): ", lidar_pts.shape) - non_ground_lidar_pts: List[List[List[np.ndarray]]] = [] + non_ground_lidar_pts: List[List[List[NDArray[np.float64]]]] = [] xyz_mean = np.mean(lidar_pts, axis=0) @@ -38,7 +39,9 @@ def filter_ground_pts_polar_grid_mean_var(lidar_pts: np.ndarray) -> np.ndarray: ang_voxel_mean = np.zeros((NUM_ANGLE_BINS, num_radial_bins)) ang_voxel_variance = np.zeros((NUM_ANGLE_BINS, num_radial_bins)) num_elements_per_bin = np.zeros((NUM_ANGLE_BINS, num_radial_bins)) - pts_per_bin: List[List[List[np.ndarray]]] = [[[] for _ in range(num_radial_bins)] for _ in range(NUM_ANGLE_BINS)] + pts_per_bin: List[List[List[NDArray[np.float64]]]] = [ + [[] for _ in range(num_radial_bins)] for _ in range(NUM_ANGLE_BINS) + ] for i in range(lidar_pts.shape[0]): x = lidar_pts[i, 0] diff --git a/argoverse/utils/interpolate.py b/argoverse/utils/interpolate.py index 3024ec5b..421f548a 100644 --- a/argoverse/utils/interpolate.py +++ b/argoverse/utils/interpolate.py @@ -1,14 +1,15 @@ # -from typing import Tuple, cast +from typing import Any, Tuple, cast import numpy as np +from numpy.typing import NDArray # For a single line segment NUM_CENTERLINE_INTERP_PTS = 10 -def compute_lane_width(left_even_pts: np.ndarray, right_even_pts: np.ndarray) -> float: +def compute_lane_width(left_even_pts: NDArray[np.float64], right_even_pts: NDArray[np.float64]) -> float: """ Compute the width of a lane, given an explicit left and right boundary. Requires an equal number of waypoints on each boundary. @@ -25,7 +26,9 @@ def compute_lane_width(left_even_pts: np.ndarray, right_even_pts: np.ndarray) -> return lane_width -def compute_mid_pivot_arc(single_pt: np.ndarray, arc_pts: np.ndarray) -> Tuple[np.ndarray, float]: +def compute_mid_pivot_arc( + single_pt: NDArray[np.float64], arc_pts: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], float]: """ Given a line of points on one boundary, and a single point on the other side, produce the middle arc we get by pivoting around the single point. @@ -47,10 +50,10 @@ def compute_mid_pivot_arc(single_pt: np.ndarray, arc_pts: np.ndarray) -> Tuple[n def compute_midpoint_line( - left_ln_bnds: np.ndarray, - right_ln_bnds: np.ndarray, + left_ln_bnds: NDArray[np.float64], + right_ln_bnds: NDArray[np.float64], num_interp_pts: int = NUM_CENTERLINE_INTERP_PTS, -) -> Tuple[np.ndarray, float]: +) -> Tuple[NDArray[np.float64], float]: """ Compute the lane segment centerline by interpolating n points along each boundary, and then averaging left and right waypoints. @@ -91,7 +94,7 @@ def compute_midpoint_line( return centerline_pts, lane_width -def get_duplicate_indices_1d(coords_1d: np.ndarray) -> np.ndarray: +def get_duplicate_indices_1d(coords_1d: NDArray[np.float64]) -> Any: """ Given a 1D polyline, remove consecutive duplicate coordinates. @@ -111,7 +114,7 @@ def get_duplicate_indices_1d(coords_1d: np.ndarray) -> np.ndarray: return dup_inds -def assert_consecutive(shared_dup_inds: int, num_pts: int, coords_1d: np.ndarray) -> None: +def assert_consecutive(shared_dup_inds: int, num_pts: int, coords_1d: NDArray[np.float64]) -> None: """ Args: shared_dup_inds @@ -131,7 +134,9 @@ def assert_consecutive(shared_dup_inds: int, num_pts: int, coords_1d: np.ndarray assert left_dup or right_dup -def eliminate_duplicates_2d(px: np.ndarray, py: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def eliminate_duplicates_2d( + px: NDArray[np.float64], py: NDArray[np.float64] +) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ We compare indices to ensure that deleted values are exactly adjacent to each other in the polyline sequence. @@ -155,7 +160,7 @@ def eliminate_duplicates_2d(px: np.ndarray, py: np.ndarray) -> Tuple[np.ndarray, return px, py -def interp_arc(t: int, px: np.ndarray, py: np.ndarray) -> np.ndarray: +def interp_arc(t: int, px: NDArray[np.float64], py: NDArray[np.float64]) -> Any: """Linearly interpolate equally-spaced points along a polyline. We use a chordal parameterization so that interpolated arc-lengths diff --git a/argoverse/utils/line_projection.py b/argoverse/utils/line_projection.py index 5168e084..42d03680 100644 --- a/argoverse/utils/line_projection.py +++ b/argoverse/utils/line_projection.py @@ -3,14 +3,15 @@ from typing import List, Sequence, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.utils.interpolate import interp_arc from argoverse.utils.polyline_density import interpolate_polyline_to_ref_density def project_to_line_seq( - trajectory: np.ndarray, lines: Sequence[np.ndarray], interpolate_more: bool = True -) -> Tuple[float, np.ndarray]: + trajectory: NDArray[np.float64], lines: Sequence[NDArray[np.float64]], interpolate_more: bool = True +) -> Tuple[float, NDArray[np.float64]]: """Project a trajectory onto a line sequence. Args: @@ -21,7 +22,7 @@ def project_to_line_seq( Returns: Projected distance along the centerline, Polyline centerline_trajectory. """ - linestrings: List[np.ndarray] = [] + linestrings: List[NDArray[np.float64]] = [] for line in lines: if interpolate_more: linestrings += [interp_arc(100, line[:, 0], line[:, 1])] @@ -33,10 +34,10 @@ def project_to_line_seq( def project_to_line( - trajectory: np.ndarray, - center_polyline: np.ndarray, + trajectory: NDArray[np.float64], + center_polyline: NDArray[np.float64], enforce_same_density: bool = False, -) -> Tuple[float, np.ndarray]: +) -> Tuple[float, NDArray[np.float64]]: """Project a trajectory onto a polyline. Args: @@ -51,7 +52,7 @@ def project_to_line( if enforce_same_density: center_polyline = interpolate_polyline_to_ref_density(center_polyline, trajectory) - centerline_trajectory: List[np.ndarray] = [] + centerline_trajectory: List[NDArray[np.float64]] = [] for i, pt in enumerate(trajectory): closest_idx = np.linalg.norm(center_polyline - pt, axis=1).argmin() closest_pt = center_polyline[closest_idx] diff --git a/argoverse/utils/make_att_files.py b/argoverse/utils/make_att_files.py index 65236d44..7c894fd8 100644 --- a/argoverse/utils/make_att_files.py +++ b/argoverse/utils/make_att_files.py @@ -9,6 +9,7 @@ import scipy.interpolate as interpolate import torch import torch.nn.functional as F +from numpy.typing import NDArray from argoverse.data_loading.argoverse_tracking_loader import ArgoverseTrackingLoader from argoverse.utils.json_utils import read_json_file @@ -45,7 +46,7 @@ def save_bev_img( dataset_name: str, log_id: str, lidar_timestamp: int, - pc: np.ndarray, + pc: NDArray[np.float64], ) -> None: """ Plot results on bev images and save @@ -147,7 +148,7 @@ def save_bev_img( ) -def bspline_1d(x: np.ndarray, y: np.ndarray, s: float = 20.0, k: int = 3) -> np.ndarray: +def bspline_1d(x: NDArray[np.float64], y: NDArray[np.float64], s: float = 20.0, k: int = 3) -> Any: """Perform B-Spline smoothing of trajectories for temporal noise reduction Args: @@ -168,7 +169,7 @@ def bspline_1d(x: np.ndarray, y: np.ndarray, s: float = 20.0, k: int = 3) -> np. return interpolate.splev(np.arange(y.shape[0]), tck) -def derivative(x: np.ndarray) -> np.ndarray: +def derivative(x: NDArray[np.float64]) -> Any: """Compute time derivatives for velocity and acceleration Args: @@ -190,7 +191,7 @@ def derivative(x: np.ndarray) -> np.ndarray: return F.conv1d(x_padded, filters)[0, 0].numpy() -def compute_v_a(traj: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: +def compute_v_a(traj: NDArray[np.float64]) -> Tuple[NDArray[np.float64], NDArray[np.float64]]: """ Compute velocity and acceleration diff --git a/argoverse/utils/manhattan_search.py b/argoverse/utils/manhattan_search.py index 53e40e34..ff69c189 100644 --- a/argoverse/utils/manhattan_search.py +++ b/argoverse/utils/manhattan_search.py @@ -3,14 +3,15 @@ """Fast search functions of nearest neighbor based on Manhattan distance.""" import logging -from typing import List, Tuple +from typing import Any, List, Tuple import numpy as np +from numpy.typing import NDArray logger = logging.getLogger(__name__) -def compute_polygon_bboxes(polygons: np.ndarray) -> np.ndarray: +def compute_polygon_bboxes(polygons: NDArray[np.float64]) -> NDArray[np.float64]: """Compute the minimum size enclosing xy bounding box for each polygon that is provided as input. Args: polygons: an array of type 'O' (object) with shape (n,). Each object has shape (m, 3+). @@ -18,7 +19,7 @@ def compute_polygon_bboxes(polygons: np.ndarray) -> np.ndarray: Returns: polygon_bboxes: a float array with shape (n, 4). """ - bboxes: List[np.ndarray] = [] + bboxes: List[NDArray[np.float64]] = [] for polygon in polygons: bbox = compute_point_cloud_bbox(polygon) @@ -28,7 +29,7 @@ def compute_polygon_bboxes(polygons: np.ndarray) -> np.ndarray: return polygon_bboxes -def compute_point_cloud_bbox(point_cloud: np.ndarray, verbose: bool = False) -> np.ndarray: +def compute_point_cloud_bbox(point_cloud: NDArray[np.float64], verbose: bool = False) -> NDArray[np.float64]: """Given a set of 2D or 3D points, find the minimum size axis-aligned bounding box in the xy plane (ground plane). Args: @@ -54,7 +55,9 @@ def compute_point_cloud_bbox(point_cloud: np.ndarray, verbose: bool = False) -> return bbox -def find_all_polygon_bboxes_overlapping_query_bbox(polygon_bboxes: np.ndarray, query_bbox: np.ndarray) -> np.ndarray: +def find_all_polygon_bboxes_overlapping_query_bbox( + polygon_bboxes: NDArray[np.float64], query_bbox: NDArray[np.float64] +) -> Any: """Find all the overlapping polygon bounding boxes. Each bounding box has the following structure: @@ -111,13 +114,13 @@ def find_all_polygon_bboxes_overlapping_query_bbox(polygon_bboxes: np.ndarray, q def find_local_polygons( - lane_polygons: np.ndarray, - lane_bboxes: np.ndarray, + lane_polygons: NDArray[np.float64], + lane_bboxes: NDArray[NDArray[np.float64]], query_min_x: float, query_max_x: float, query_min_y: float, query_max_y: float, -) -> Tuple[np.ndarray, np.ndarray]: +) -> Tuple[NDArray[np.float64], NDArray[NDArray[np.float64]]]: """Find local polygons. We always also return indices. Take a collection of precomputed polygon bounding boxes, and compare with a query bounding box then returns the @@ -146,10 +149,10 @@ def find_local_polygons( def prune_polygons_manhattan_dist( - query_pt: np.ndarray, - points_xyz: np.ndarray, + query_pt: NDArray[np.float64], + points_xyz: NDArray[np.float64], query_search_range_manhattan: float = 200.0, -) -> np.ndarray: +) -> Any: """Prune polygon points based on a search area defined by the manhattan distance. Take a collection of small point clouds and return only point clouds that fall within a manhattan search radius of diff --git a/argoverse/utils/mesh_grid.py b/argoverse/utils/mesh_grid.py index a875dc52..2e733d75 100644 --- a/argoverse/utils/mesh_grid.py +++ b/argoverse/utils/mesh_grid.py @@ -2,11 +2,12 @@ """Mesh grid utility functions.""" import numpy as np +from numpy.typing import NDArray def get_mesh_grid_as_point_cloud( min_x: int, max_x: int, min_y: int, max_y: int, downsample_factor: float = 1.0 -) -> np.ndarray: +) -> NDArray[np.float64]: """Sample regular grid and return the (x, y) coordinates. Args: diff --git a/argoverse/utils/mpl_plotting_utils.py b/argoverse/utils/mpl_plotting_utils.py index 61f622b8..6e517f3b 100644 --- a/argoverse/utils/mpl_plotting_utils.py +++ b/argoverse/utils/mpl_plotting_utils.py @@ -8,12 +8,13 @@ from descartes.patch import PolygonPatch from matplotlib.animation import FuncAnimation from matplotlib.lines import Line2D +from numpy.typing import NDArray from shapely.geometry import LineString, Polygon def draw_polygon_mpl( ax: plt.Axes, - polygon: np.ndarray, + polygon: NDArray[np.float64], color: Union[Tuple[float, float, float], str], linewidth: Optional[float] = None, ) -> None: @@ -51,7 +52,7 @@ def draw_polygonpatch_matplotlib(points: Any, color: Union[Tuple[float, float, f def draw_lane_polygons( ax: plt.Axes, - lane_polygons: np.ndarray, + lane_polygons: NDArray[np.float64], color: Union[Tuple[float, float, float], str] = "y", ) -> None: """Draw a lane using polygons. @@ -67,7 +68,7 @@ def draw_lane_polygons( def plot_bbox_2D( ax: plt.Axes, - pts: np.ndarray, + pts: NDArray[np.float64], color: Union[Tuple[float, float, float], str], linestyle: str = "-", ) -> None: @@ -93,7 +94,7 @@ def plot_bbox_2D( ax.plot(pts[np.array([0, 2]), 0], pts[np.array([0, 2]), 1], c=color, linestyle=linestyle) -def animate_polyline(polyline: np.ndarray, axes_margin: int = 5, show_plot: bool = True) -> None: +def animate_polyline(polyline: NDArray[np.float64], axes_margin: int = 5, show_plot: bool = True) -> None: """Draw and animate a polyline on a plot. Args: @@ -128,7 +129,7 @@ def update(frame: List[Any]) -> Tuple[Line2D]: def plot_lane_segment_patch( - polygon_pts: np.ndarray, + polygon_pts: NDArray[np.float64], ax: plt.Axes, color: Union[Tuple[float, float, float], str] = "y", alpha: float = 0.3, diff --git a/argoverse/utils/plane_visualization_utils.py b/argoverse/utils/plane_visualization_utils.py index 3f9de265..a7893ee5 100644 --- a/argoverse/utils/plane_visualization_utils.py +++ b/argoverse/utils/plane_visualization_utils.py @@ -3,6 +3,7 @@ from typing import List, Optional import numpy as np +from numpy.typing import NDArray from argoverse.utils import mayavi_wrapper from argoverse.utils.mesh_grid import get_mesh_grid_as_point_cloud @@ -14,7 +15,7 @@ ) -def populate_frustum_voxels(planes: List[np.ndarray], fig: Figure, axis_pair: str) -> Figure: +def populate_frustum_voxels(planes: List[NDArray[np.float64]], fig: Figure, axis_pair: str) -> Figure: """ Generate grid in xy plane, and then treat it as grid in xz (ground) plane in camera coordinate system. @@ -50,8 +51,8 @@ def populate_frustum_voxels(planes: List[np.ndarray], fig: Figure, axis_pair: st def plot_frustum_planes_and_normals( - planes: List[np.ndarray], - cuboid_verts: Optional[np.ndarray] = None, + planes: List[NDArray[np.float64]], + cuboid_verts: Optional[NDArray[np.float64]] = None, near_clip_dist: float = 0.5, ) -> None: """ @@ -127,7 +128,7 @@ def plot_frustum_planes_and_normals( mayavi_wrapper.mlab.show() # type: ignore -def get_perpendicular(n: np.ndarray) -> np.ndarray: +def get_perpendicular(n: NDArray[np.float64]) -> NDArray[np.float64]: """ n guarantees that dot(n, getPerpendicular(n)) is zero, which is the orthogonality condition, while also keeping the magnitude of the vector @@ -155,7 +156,9 @@ def get_perpendicular(n: np.ndarray) -> np.ndarray: return result -def generate_grid_on_plane(a: float, b: float, c: float, d: float, P: np.ndarray, radius: float = 15) -> np.ndarray: +def generate_grid_on_plane( + a: float, b: float, c: float, d: float, P: NDArray[np.float64], radius: float = 15 +) -> NDArray[np.float64]: """ Args: a,b,c,d: Coefficients of ``ax + by + cz = d`` defining plane diff --git a/argoverse/utils/ply_loader.py b/argoverse/utils/ply_loader.py index 6f74da36..a85d51a9 100644 --- a/argoverse/utils/ply_loader.py +++ b/argoverse/utils/ply_loader.py @@ -2,15 +2,16 @@ """Point cloud loading utility functions.""" import os -from typing import Optional, Union +from typing import Any, Optional, Union import numpy as np import pyntcloud +from numpy.typing import NDArray _PathLike = Union[str, "os.PathLike[str]"] -def load_ply(ply_fpath: _PathLike) -> np.ndarray: +def load_ply(ply_fpath: _PathLike) -> Any: """Load a point cloud file from a filepath. Args: @@ -28,7 +29,7 @@ def load_ply(ply_fpath: _PathLike) -> np.ndarray: return np.concatenate((x, y, z), axis=1) -def load_ply_by_attrib(ply_fpath: _PathLike, attrib_spec: str = "xyzil") -> Optional[np.ndarray]: +def load_ply_by_attrib(ply_fpath: _PathLike, attrib_spec: str = "xyzil") -> Optional[NDArray[np.float64]]: """Load a point cloud file from a filepath. Args: diff --git a/argoverse/utils/polyline_density.py b/argoverse/utils/polyline_density.py index 50f1e4d9..5a50817b 100644 --- a/argoverse/utils/polyline_density.py +++ b/argoverse/utils/polyline_density.py @@ -1,15 +1,16 @@ # -from typing import Tuple +from typing import Any, Tuple import numpy as np +from numpy.typing import NDArray from argoverse.utils.interpolate import interp_arc NUM_PTS_PER_TRAJ = 50 -def get_polyline_length(polyline: np.ndarray) -> float: +def get_polyline_length(polyline: NDArray[np.float64]) -> float: """Calculate the length of a polyline. Args: @@ -22,7 +23,9 @@ def get_polyline_length(polyline: np.ndarray) -> float: return float(np.linalg.norm(np.diff(polyline, axis=0), axis=1).sum()) -def interpolate_polyline_to_ref_density(polyline_to_interp: np.ndarray, ref_polyline: np.ndarray) -> np.ndarray: +def interpolate_polyline_to_ref_density( + polyline_to_interp: NDArray[np.float64], ref_polyline: NDArray[np.float64] +) -> Any: """ Interpolate a polyline so that its density matches the density of a reference polyline. @@ -46,7 +49,9 @@ def interpolate_polyline_to_ref_density(polyline_to_interp: np.ndarray, ref_poly return dense_interp_polyline -def traverse_polyline_by_specific_dist(polyline_to_walk: np.ndarray, l2_dist_to_walk: float) -> Tuple[np.ndarray, bool]: +def traverse_polyline_by_specific_dist( + polyline_to_walk: NDArray[np.float64], l2_dist_to_walk: float +) -> Tuple[NDArray[np.float64], bool]: """ Walk a distance along a polyline, and return the points along which you walked. diff --git a/argoverse/utils/se2.py b/argoverse/utils/se2.py index 61704537..6bb404af 100644 --- a/argoverse/utils/se2.py +++ b/argoverse/utils/se2.py @@ -1,14 +1,16 @@ # """Module for `SE2`.""" +from typing import Any import numpy as np +from numpy.typing import NDArray from argoverse.utils.helpers import assert_np_array_shape class SE2: - def __init__(self, rotation: np.ndarray, translation: np.ndarray) -> None: + def __init__(self, rotation: NDArray[np.float64], translation: NDArray[np.float64]) -> None: """Initialize. Args: @@ -26,7 +28,7 @@ def __init__(self, rotation: np.ndarray, translation: np.ndarray) -> None: self.transform_matrix[:2, :2] = self.rotation self.transform_matrix[:2, 2] = self.translation - def transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: + def transform_point_cloud(self, point_cloud: NDArray[np.float64]) -> Any: """Apply the SE(2) transformation to point_cloud. Args: @@ -54,7 +56,7 @@ def inverse(self) -> "SE2": """ return SE2(rotation=self.rotation.T, translation=self.rotation.T.dot(-self.translation)) - def inverse_transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: + def inverse_transform_point_cloud(self, point_cloud: NDArray[np.float64]) -> Any: """Transform the point_cloud by the inverse of this SE2. Args: diff --git a/argoverse/utils/se3.py b/argoverse/utils/se3.py index dc78d40a..915ed3a5 100644 --- a/argoverse/utils/se3.py +++ b/argoverse/utils/se3.py @@ -1,13 +1,16 @@ # """SE3 class for point cloud rotation and translation.""" +from typing import Any + import numpy as np +from numpy.typing import NDArray class SE3: """An SE3 class allows point cloud rotation and translation operations.""" - def __init__(self, rotation: np.ndarray, translation: np.ndarray) -> None: + def __init__(self, rotation: NDArray[np.float64], translation: NDArray[np.float64]) -> None: """Initialize an SE3 instance with its rotation and translation matrices. Args: @@ -23,7 +26,7 @@ def __init__(self, rotation: np.ndarray, translation: np.ndarray) -> None: self.transform_matrix[:3, :3] = self.rotation self.transform_matrix[:3, 3] = self.translation - def transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: + def transform_point_cloud(self, point_cloud: NDArray[np.float64]) -> Any: """Apply the SE(3) transformation to this point cloud. Args: @@ -35,7 +38,7 @@ def transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: """ return point_cloud @ self.rotation.T + self.translation - def inverse_transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: + def inverse_transform_point_cloud(self, point_cloud: NDArray[np.float64]) -> Any: """Undo the translation and then the rotation (Inverse SE(3) transformation).""" return (point_cloud.copy() - self.translation) @ self.rotation @@ -61,7 +64,7 @@ def compose(self, right_se3: "SE3") -> "SE3": Returns: chained_se3: new instance of SE3 class """ - chained_transform_matrix = self.transform_matrix @ right_se3.transform_matrix + chained_transform_matrix: NDArray[np.float64] = self.transform_matrix @ right_se3.transform_matrix chained_se3 = SE3( rotation=chained_transform_matrix[:3, :3], translation=chained_transform_matrix[:3, 3], diff --git a/argoverse/utils/sim2.py b/argoverse/utils/sim2.py index 2cad7bc5..f69fa481 100644 --- a/argoverse/utils/sim2.py +++ b/argoverse/utils/sim2.py @@ -11,6 +11,7 @@ from typing import Union import numpy as np +from numpy.typing import NDArray from argoverse.utils.helpers import assert_np_array_shape from argoverse.utils.json_utils import save_json_dict @@ -21,7 +22,7 @@ class Sim2: """Implements the Similarity(2) class.""" - def __init__(self, R: np.ndarray, t: np.ndarray, s: Union[int, float]) -> None: + def __init__(self, R: NDArray[np.float64], t: "Sim2", s: Union[int, float]) -> None: """Initialize from rotation R, translation t, and scale s. Args: @@ -75,12 +76,12 @@ def __eq__(self, other: object) -> bool: return True @property - def rotation(self) -> np.ndarray: + def rotation(self) -> NDArray[np.float64]: """Return the 2x2 rotation matrix.""" return self.R_ @property - def translation(self) -> np.ndarray: + def translation(self) -> NDArray[np.float64]: """Return the (2,) translation vector.""" return self.t_ @@ -90,7 +91,7 @@ def scale(self) -> float: return self.s_ @property - def matrix(self) -> np.ndarray: + def matrix(self) -> NDArray[np.float64]: """Calculate 3*3 matrix group equivalent.""" T = np.zeros((3, 3)) T[:2, :2] = self.R_ @@ -118,10 +119,10 @@ def compose(self, S: "Sim2") -> "Sim2": def inverse(self) -> "Sim2": """Return the inverse.""" Rt = self.R_.T - sRt = -Rt @ (self.s_ * self.t_) + sRt: "Sim2" = -Rt @ (self.s_ * self.t_) return Sim2(Rt, sRt, 1.0 / self.s_) - def transform_from(self, point_cloud: np.ndarray) -> np.ndarray: + def transform_from(self, point_cloud: NDArray[np.float64]) -> float: """Transform point cloud such that if they are in frame A, and our Sim(3) transform is defines as bSa, then we get points back in frame B: @@ -138,12 +139,12 @@ def transform_from(self, point_cloud: np.ndarray) -> np.ndarray: raise ValueError("Input point cloud is not 2-dimensional.") assert_np_array_shape(point_cloud, (None, 2)) # (2,2) x (2,N) + (2,1) = (2,N) -> transpose - transformed_point_cloud = (point_cloud @ self.R_.T) + self.t_ + transformed_point_cloud: NDArray[np.float64] = (point_cloud @ self.R_.T) + self.t_ # now scale points return transformed_point_cloud * self.s_ - def transform_point_cloud(self, point_cloud: np.ndarray) -> np.ndarray: + def transform_point_cloud(self, point_cloud: NDArray[np.float64]) -> float: """Alias for `transform_from()`, for synchrony w/ API provided by SE(2) and SE(3) classes.""" return self.transform_from(point_cloud) @@ -172,7 +173,7 @@ def from_json(cls, json_fpath: _PathLike) -> "Sim2": return cls(R, t, s) @classmethod - def from_matrix(cls, T: np.ndarray) -> "Sim2": + def from_matrix(cls, T: NDArray[np.float64]) -> "Sim2": """Generate class instance from a 3x3 Numpy matrix.""" if np.isclose(T[2, 2], 0.0): raise ZeroDivisionError("Sim(2) scale calculation would lead to division by zero.") diff --git a/argoverse/utils/transform.py b/argoverse/utils/transform.py index d2d22c35..1440f03b 100644 --- a/argoverse/utils/transform.py +++ b/argoverse/utils/transform.py @@ -9,14 +9,16 @@ """ import logging +from typing import Any import numpy as np +from numpy.typing import NDArray from scipy.spatial.transform import Rotation logger = logging.getLogger(__name__) -def yaw_to_quaternion3d(yaw: float) -> np.ndarray: +def yaw_to_quaternion3d(yaw: float) -> NDArray[np.float64]: """Convert a rotation angle in the xy plane (i.e. about the z axis) to a quaternion. Args: @@ -29,14 +31,14 @@ def yaw_to_quaternion3d(yaw: float) -> np.ndarray: return np.array([qw, qx, qy, qz]) -def rotmat2quat(R: np.ndarray) -> np.ndarray: +def rotmat2quat(R: NDArray[np.float64]) -> NDArray[np.float64]: """Convert a rotation-matrix to a quaternion in Argo's scalar-first notation (w, x, y, z).""" quat_xyzw = Rotation.from_matrix(R).as_quat() quat_wxyz = quat_scipy2argo(quat_xyzw) return quat_wxyz -def quat2rotmat(q: np.ndarray) -> np.ndarray: +def quat2rotmat(q: NDArray[np.float64]) -> Any: """Normalizes a quaternion to unit-length, then converts it into a rotation matrix. Note that libraries such as Scipy expect a quaternion in scalar-last [x, y, z, w] format, @@ -61,25 +63,25 @@ def quat2rotmat(q: np.ndarray) -> np.ndarray: return Rotation.from_quat(quat_xyzw).as_matrix() -def quat_argo2scipy(q: np.ndarray) -> np.ndarray: +def quat_argo2scipy(q: NDArray[np.float64]) -> NDArray[np.float64]: """Re-order Argoverse's scalar-first [w,x,y,z] quaternion order to Scipy's scalar-last [x,y,z,w]""" w, x, y, z = q q_scipy = np.array([x, y, z, w]) return q_scipy -def quat_scipy2argo(q: np.ndarray) -> np.ndarray: +def quat_scipy2argo(q: NDArray[np.float64]) -> NDArray[np.float64]: """Re-order Scipy's scalar-last [x,y,z,w] quaternion order to Argoverse's scalar-first [w,x,y,z].""" x, y, z, w = q q_argo = np.array([w, x, y, z]) return q_argo -def quat_argo2scipy_vectorized(q: np.ndarray) -> np.ndarray: +def quat_argo2scipy_vectorized(q: NDArray[np.float64]) -> Any: """Re-order Argoverse's scalar-first [w,x,y,z] quaternion order to Scipy's scalar-last [x,y,z,w]""" return q[..., [1, 2, 3, 0]] -def quat_scipy2argo_vectorized(q: np.ndarray) -> np.ndarray: +def quat_scipy2argo_vectorized(q: NDArray[np.float64]) -> Any: """Re-order Scipy's scalar-last [x,y,z,w] quaternion order to Argoverse's scalar-first [w,x,y,z].""" return q[..., [3, 0, 1, 2]] diff --git a/argoverse/visualization/colormap.py b/argoverse/visualization/colormap.py index 63563dea..f311ce08 100644 --- a/argoverse/visualization/colormap.py +++ b/argoverse/visualization/colormap.py @@ -18,10 +18,11 @@ from __future__ import absolute_import, division, print_function, unicode_literals import numpy as np +from numpy.typing import NDArray -def colormap(rgb: bool = False) -> np.ndarray: - color_list = np.array( +def colormap(rgb: bool = False) -> NDArray[np.float32]: + color_list = NDArray( [ 0.000, 0.447, diff --git a/argoverse/visualization/ground_visualization.py b/argoverse/visualization/ground_visualization.py index 74418f22..57579525 100644 --- a/argoverse/visualization/ground_visualization.py +++ b/argoverse/visualization/ground_visualization.py @@ -10,6 +10,7 @@ import imageio import numpy as np from colour import Color +from numpy.typing import NDArray from argoverse.data_loading.synchronization_database import SynchronizationDB from argoverse.map_representation.map_api import ArgoverseMap @@ -30,7 +31,7 @@ def draw_ground_pts_in_image( sdb: SynchronizationDB, - lidar_points: np.ndarray, + lidar_points: NDArray[np.float64], city_SE3_egovehicle: SE3, dataset_map: ArgoverseMap, log_id: str, @@ -41,7 +42,7 @@ def draw_ground_pts_in_image( plot_ground: bool = True, motion_compensate: bool = False, camera: Optional[str] = None, -) -> Union[None, np.ndarray]: +) -> Union[None, NDArray[np.float64]]: """Write an image to disk with rendered ground points for every camera. Args: diff --git a/argoverse/visualization/mayavi_utils.py b/argoverse/visualization/mayavi_utils.py index 10fa42e2..2c887924 100644 --- a/argoverse/visualization/mayavi_utils.py +++ b/argoverse/visualization/mayavi_utils.py @@ -18,6 +18,7 @@ from typing import Any, Iterable, List, Optional, Tuple, Union, cast import numpy as np +from numpy.typing import NDArray from argoverse.utils import mayavi_wrapper from argoverse.utils.frustum_clipping import clip_segment_v3_plane_n @@ -41,10 +42,10 @@ Figure = Any #: A 3D Point -Point = np.ndarray +Point = NDArray[np.float64] #: An array of 3D points -PointCloud = np.ndarray +PointCloud = NDArray[np.float64] #: Any numeric type Number = Union[int, float] @@ -55,7 +56,7 @@ def plot_bbox_3d_mayavi( fig: Figure, - corners: np.ndarray, + corners: NDArray[np.float64], colors: Tuple[Color, Color, Color] = ((0, 0, 1), (1, 0, 0), (0, 1, 0)), line_width: Number = 2, draw_text: Optional[str] = None, @@ -79,7 +80,7 @@ def plot_bbox_3d_mayavi( Updated Mayavi figure """ - def draw_rect(fig: Figure, selected_corners: np.ndarray, color: Color) -> None: + def draw_rect(fig: Figure, selected_corners: NDArray[np.float64], color: Color) -> None: prev = selected_corners[-1] for corner in selected_corners: fig = draw_mayavi_line_segment(fig, [prev, corner], color, line_width) @@ -115,9 +116,9 @@ def draw_rect(fig: Figure, selected_corners: np.ndarray, color: Color) -> None: def plot_points_3D_mayavi( - points: np.ndarray, + points: NDArray[np.float64], fig: Figure, - per_pt_color_strengths: np.ndarray = None, + per_pt_color_strengths: NDArray[np.float64] = None, fixed_color: Optional[Color] = (1, 0, 0), colormap: str = "spectral", ) -> Figure: @@ -158,8 +159,8 @@ def plot_points_3D_mayavi( def plot_3d_clipped_bbox_mayavi( fig: Figure, - planes: np.ndarray, - uv_cam: np.ndarray, + planes: NDArray[np.float64], + uv_cam: NDArray[np.float64], colors: Tuple[Color, Color, Color] = ((0, 0, 1), (1, 0, 0), (0, 1, 0)), ) -> Figure: """ @@ -173,7 +174,7 @@ def plot_3d_clipped_bbox_mayavi( Updated Mayavi figure """ - def draw_rect(fig: Figure, selected_corners: np.ndarray, color: Color) -> None: + def draw_rect(fig: Figure, selected_corners: NDArray[np.float64], color: Color) -> None: prev = selected_corners[-1] for corner in selected_corners: clip_prev, clip_corner = clip_segment_v3_plane_n(prev, corner, planes) @@ -254,7 +255,7 @@ def draw_coordinate_frame_at_origin(fig: Figure) -> Figure: def draw_lidar( - point_cloud: np.ndarray, + point_cloud: NDArray[np.float64], colormap: str = "spectral", fig: Optional[Figure] = None, bgcolor: Color = (0, 0, 0), @@ -306,7 +307,7 @@ def draw_lidar( return fig -def mayavi_compare_point_clouds(point_cloud_list: Iterable[np.ndarray]) -> None: +def mayavi_compare_point_clouds(point_cloud_list: Iterable[NDArray[np.float64]]) -> None: """ Useful for visualizing the segmentation of a scene has separate objects, each colored differently. @@ -331,7 +332,7 @@ def mayavi_compare_point_clouds(point_cloud_list: Iterable[np.ndarray]) -> None: def draw_mayavi_line_segment( fig: Figure, - points: Iterable[np.ndarray], + points: Iterable[NDArray[np.float64]], color: Color = (1, 0, 0), line_width: Optional[Number] = 1, tube_radius: Optional[Number] = None, diff --git a/argoverse/visualization/mpl_point_cloud_vis.py b/argoverse/visualization/mpl_point_cloud_vis.py index 729df7f4..07bb093b 100644 --- a/argoverse/visualization/mpl_point_cloud_vis.py +++ b/argoverse/visualization/mpl_point_cloud_vis.py @@ -5,6 +5,7 @@ import numpy as np from matplotlib.axes import Axes +from numpy.typing import NDArray __all__ = ["draw_point_cloud_bev"] @@ -19,7 +20,7 @@ def draw_point_cloud_bev( axes: Axes, - pointcloud: np.ndarray, + pointcloud: NDArray[np.float64], color: str = "w", x_lim_3d: Optional[float] = None, y_lim_3d: Optional[float] = None, diff --git a/argoverse/visualization/vis_mask.py b/argoverse/visualization/vis_mask.py index 442ecbb0..80f67bf5 100644 --- a/argoverse/visualization/vis_mask.py +++ b/argoverse/visualization/vis_mask.py @@ -25,6 +25,7 @@ import matplotlib.pyplot as plt import numpy as np from matplotlib.patches import Polygon +from numpy.typing import NDArray from argoverse.visualization.colormap import colormap @@ -37,7 +38,9 @@ Segment = Tuple[float, float, float, float] -def vis_mask(image: np.ndarray, mask: np.ndarray, color: Union[float, np.ndarray], alpha: float = 0.4) -> np.ndarray: +def vis_mask( + image: NDArray[np.float64], mask: NDArray[np.float64], color: Union[float, NDArray[np.float64]], alpha: float = 0.4 +) -> NDArray[np.float64]: """Visualize a single binary mask by blending a colored mask with image. Args: @@ -63,11 +66,11 @@ def vis_mask(image: np.ndarray, mask: np.ndarray, color: Union[float, np.ndarray def vis_class( - image: np.ndarray, + image: NDArray[np.float64], pos: Tuple[float, float], class_str: str, font_scale: float = 50.0, -) -> np.ndarray: +) -> NDArray[np.float64]: """Visualizes a class. Args: @@ -93,7 +96,7 @@ def vis_class( return image -def vis_bbox(image: np.ndarray, bbox: Tuple[int, int, int, int], thickness: int = 1) -> np.ndarray: +def vis_bbox(image: NDArray[np.float64], bbox: Tuple[int, int, int, int], thickness: int = 1) -> NDArray[np.float64]: """Visualize a bounding box. Args: image: The input image @@ -112,7 +115,7 @@ def vis_bbox(image: np.ndarray, bbox: Tuple[int, int, int, int], thickness: int return image -def decode_segment_to_mask(segm: Segment, image: np.ndarray) -> np.ndarray: +def decode_segment_to_mask(segm: Segment, image: NDArray[np.float64]) -> NDArray[np.float64]: """Create a mask from a segment Args: @@ -130,12 +133,12 @@ def decode_segment_to_mask(segm: Segment, image: np.ndarray) -> np.ndarray: def vis_one_image_opencv( - image: np.ndarray, - boxes: np.ndarray, + image: NDArray[np.float64], + boxes: NDArray[np.float64], segms: Optional[Sequence[Segment]] = None, show_box: bool = False, show_class: bool = True, -) -> np.ndarray: +) -> NDArray[np.float64]: """Constructs a numpy array with the detections visualized. Args: @@ -185,10 +188,10 @@ def vis_one_image_opencv( def vis_one_image( - image: np.ndarray, + image: NDArray[np.float64], image_name: str, output_dir: str, - boxes: np.ndarray, + boxes: NDArray[np.float64], segms: Optional[Sequence[Segment]] = None, dpi: int = 200, box_alpha: float = 0.0, @@ -214,7 +217,7 @@ def vis_one_image( if boxes is None or boxes.shape[0] == 0: return - color_list = colormap(rgb=True) / 255 + color_list: NDArray[np.float64] = colormap(rgb=True) / 255 plt.get_cmap("rainbow") fig = plt.figure(frameon=False) @@ -224,7 +227,7 @@ def vis_one_image( fig.add_axes(ax) ax.imshow(image) - sorted_inds: Union[List[Any], np.ndarray] + sorted_inds: Union[List[Any], NDArray[np.float64]] if boxes is None: sorted_inds = [] # avoid crash when 'boxes' is None else: diff --git a/argoverse/visualization/visualization_utils.py b/argoverse/visualization/visualization_utils.py index 28fa83ef..d4aa29df 100644 --- a/argoverse/visualization/visualization_utils.py +++ b/argoverse/visualization/visualization_utils.py @@ -6,6 +6,7 @@ import matplotlib.pyplot as plt import numpy as np +from numpy.typing import NDArray from PIL import Image from argoverse.data_loading.argoverse_tracking_loader import ArgoverseTrackingLoader @@ -180,7 +181,7 @@ def draw_point_cloud_trajectory( def draw_box( pyplot_axis: plt.Axes, - vertices: np.ndarray, + vertices: NDArray[np.float64], axes: Optional[Any] = None, color: Union[str, Tuple[float, float, float]] = "red", ) -> None: @@ -204,7 +205,7 @@ def draw_box( pyplot_axis.plot(*vertices[:, connection], c=color, lw=0.5) -def show_image_with_boxes(img: np.ndarray, objects: Iterable[ObjectLabelRecord], calib: Calibration) -> np.ndarray: +def show_image_with_boxes(img: NDArray[np.float64], objects: Iterable[ObjectLabelRecord], calib: Calibration) -> Any: """Show image with 2D bounding boxes.""" img1 = np.copy(img) diff --git a/argoverse/visualization/visualize_sequences.py b/argoverse/visualization/visualize_sequences.py index 1acbca28..123d0f35 100644 --- a/argoverse/visualization/visualize_sequences.py +++ b/argoverse/visualization/visualize_sequences.py @@ -2,20 +2,21 @@ """A simple python script template.""" from collections import defaultdict -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import matplotlib.lines as mlines import matplotlib.pyplot as plt import numpy as np import pandas as pd import scipy.interpolate as interp +from numpy.typing import NDArray from argoverse.map_representation.map_api import ArgoverseMap _ZORDER = {"AGENT": 15, "AV": 10, "OTHERS": 5} -def interpolate_polyline(polyline: np.ndarray, num_points: int) -> np.ndarray: +def interpolate_polyline(polyline: NDArray[np.float64], num_points: int) -> Any: duplicates = [] for i in range(1, len(polyline)): if np.allclose(polyline[i], polyline[i - 1]): @@ -31,7 +32,7 @@ def interpolate_polyline(polyline: np.ndarray, num_points: int) -> np.ndarray: def viz_sequence( df: pd.DataFrame, - lane_centerlines: Optional[List[np.ndarray]] = None, + lane_centerlines: Optional[List[NDArray[np.float64]]] = None, show: bool = True, smoothen: bool = False, ) -> None: diff --git a/setup.py b/setup.py index 1c706ed5..bb6a1906 100755 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ "matplotlib", "motmetrics==1.1.3", "numba", - "numpy==1.19", + "numpy==1.21.2", "omegaconf==2.1.0", "opencv-python>=4.1.0.25", "pandas>=0.23.1", diff --git a/tests/test_data/b_Sim2_c.json b/tests/test_data/b_Sim2_c.json new file mode 100644 index 00000000..afa60fbd --- /dev/null +++ b/tests/test_data/b_Sim2_c.json @@ -0,0 +1 @@ +{"R": [0.0, 1.0, 1.0, 0.0], "t": [-5.0, 5.0], "s": 0.1} diff --git a/tests/test_data/sample_argoverse_sweep.txt b/tests/test_data/sample_argoverse_sweep.txt old mode 100644 new mode 100755 diff --git a/tox.ini b/tox.ini index 2a24b96e..f308749f 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ deps = commands = pytest tests --cov argoverse --cov-append --cov-branch --cov-report=term-missing flake8 --max-line-length 120 --ignore E203,E704,E711,E722,E741,W291,W293,W391,W503,F821,F401,F811,F841,P101,G004,G002,I201,I100,I101 --enable-extensions G argoverse - mypy --ignore-missing --strict argoverse + mypy --ignore-missing --strict argoverse --allow-untyped-calls depends = py3{7,8}: clean