diff --git a/child_lab_framework/_cli/cli.py b/child_lab_framework/_cli/cli.py index 04ae877..580a2d6 100644 --- a/child_lab_framework/_cli/cli.py +++ b/child_lab_framework/_cli/cli.py @@ -189,11 +189,18 @@ def estimate_transformations( required=False, help='Torch device to use for tensor computations', ) +@click.option( + '--skip', + type=int, + required=False, + help='Seconds of videos to skip at the beginning', +) # @click_trap() def process( workspace: Path, videos: list[Path], device: str | None, + skip: int | None, ) -> None: video_dir = workspace / 'input' calibration_dir = workspace / 'calibration' @@ -227,7 +234,7 @@ def process( click.echo(f'Processing {"video" if len(videos) == 1 else "videos"}...') - demo_sequential.main(inputs, device_handle, destination) # type: ignore + demo_sequential.main(inputs, device_handle, destination, skip) # type: ignore click.echo('Done!') diff --git a/child_lab_framework/_procedure/demo_sequential.py b/child_lab_framework/_procedure/demo_sequential.py index 2e9b8bb..0553483 100644 --- a/child_lab_framework/_procedure/demo_sequential.py +++ b/child_lab_framework/_procedure/demo_sequential.py @@ -4,10 +4,11 @@ import torch +from ..core import transformation from ..core.video import Format, Input, Reader, Writer from ..logging import Logger from ..task import depth, face, gaze, pose -from ..task.camera import transformation +from ..task.camera.transformation import heuristic as heuristic_transformation from ..task.visualization import Configuration as VisualizationConfiguration from ..task.visualization import Visualizer @@ -15,7 +16,10 @@ def main( - inputs: tuple[Input, Input, Input], device: torch.device, output_directory: Path + inputs: tuple[Input, Input, Input], + device: torch.device, + output_directory: Path, + skip: int | None, ) -> None: # ignore exceeded allocation limit on MPS and CUDA - very important! os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0' @@ -38,6 +42,7 @@ def main( width=ceiling_properties.width, fps=ceiling_properties.fps, ) + window_left_properties = window_left_reader.properties window_right_reader = Reader( window_right, @@ -46,20 +51,32 @@ def main( width=ceiling_properties.width, fps=ceiling_properties.fps, ) + window_right_properties = window_right_reader.properties - depth_estimator = depth.Estimator(executor, device, input=ceiling_reader.properties) + depth_estimator = depth.Estimator(executor, device, input=ceiling_properties) - transformation_estimator = transformation.heuristic.Estimator( + transformation_buffer: transformation.Buffer[str] = transformation.Buffer() + + window_left_to_ceiling_transformation_estimator = heuristic_transformation.Estimator( + executor, + transformation_buffer, + window_left_properties, + ceiling_properties, + keypoint_threshold=0.35, + ) + + window_right_to_ceiling_transformation_estimator = heuristic_transformation.Estimator( executor, - window_left_reader.properties, - ceiling_reader.properties, + transformation_buffer, + window_right_properties, + ceiling_properties, keypoint_threshold=0.35, ) pose_estimator = pose.Estimator( executor, device, - input=ceiling_reader.properties, + input=ceiling_properties, max_detections=2, threshold=0.5, ) @@ -69,26 +86,26 @@ def main( # A workaround to use the model efficiently on both desktop and server. # TODO: remove this as soon as it's possible to specify device per component via CLI/config file. device if device == torch.device('cuda') else torch.device('cpu'), - input=ceiling_reader.properties, + input=ceiling_properties, confidence_threshold=0.5, suppression_threshold=0.1, ) window_left_gaze_estimator = gaze.Estimator( executor, - input=window_left_reader.properties, + input=window_left_properties, ) window_right_gaze_estimator = gaze.Estimator( executor, - input=window_right_reader.properties, + input=window_right_properties, ) ceiling_gaze_estimator = gaze.ceiling_projection.Estimator( executor, - ceiling_reader.properties, - window_left_reader.properties, - window_right_reader.properties, + ceiling_properties, + window_left_properties, + window_right_properties, ) # social_distance_estimator = social_distance.Estimator(executor) @@ -96,42 +113,72 @@ def main( ceiling_visualizer = Visualizer( executor, - properties=ceiling_reader.properties, + properties=ceiling_properties, configuration=VisualizationConfiguration(), ) window_left_visualizer = Visualizer( executor, - properties=window_left_reader.properties, + properties=window_left_properties, configuration=VisualizationConfiguration(), ) window_right_visualizer = Visualizer( executor, - properties=window_right_reader.properties, + properties=window_right_properties, configuration=VisualizationConfiguration(), ) ceiling_writer = Writer( output_directory / (ceiling.name + '.mp4'), - ceiling_reader.properties, + ceiling_properties, + output_format=Format.MP4, + ) + + ceiling_projection_writer = Writer( + output_directory / (ceiling.name + '_projections.mp4'), + ceiling_properties, + output_format=Format.MP4, + ) + + ceiling_depth_writer = Writer( + output_directory / (ceiling.name + '_depth.mp4'), + ceiling_properties, + output_format=Format.MP4, + ) + + window_left_depth_writer = Writer( + output_directory / (window_left.name + '_depth.mp4'), + window_left_properties, + output_format=Format.MP4, + ) + + window_right_depth_writer = Writer( + output_directory / (window_right.name + '_depth.mp4'), + window_right_properties, output_format=Format.MP4, ) window_left_writer = Writer( output_directory / (window_left.name + '.mp4'), - window_left_reader.properties, + window_left_properties, output_format=Format.MP4, ) window_right_writer = Writer( output_directory / (window_right.name + '.mp4'), - window_right_reader.properties, + window_right_properties, output_format=Format.MP4, ) Logger.info('Components instantiated') + if skip is not None and skip > 0: + frames_to_skip = skip * ceiling_properties.fps + ceiling_reader.read_skipping(frames_to_skip) + window_left_reader.read_skipping(frames_to_skip) + window_right_reader.read_skipping(frames_to_skip) + while True: ceiling_frames = ceiling_reader.read_batch() if ceiling_frames is None: @@ -163,34 +210,54 @@ def main( Logger.error('window_right_poses == None') Logger.info('Estimating depth...') - ceiling_depth = depth_estimator.predict(ceiling_frames[0]) + ceiling_depth = depth_estimator.predict( + ceiling_frames[0], + ceiling_properties, + ) + window_left_depth = depth_estimator.predict( + window_left_frames[0], + window_left_properties, + ) + window_right_depth = depth_estimator.predict( + window_right_frames[0], + window_right_properties, + ) + ceiling_depths = [ceiling_depth for _ in range(n_frames)] + window_left_depths = [window_left_depth for _ in range(n_frames)] + window_right_depths = [window_right_depth for _ in range(n_frames)] Logger.info('Done!') Logger.info('Estimating transformations...') window_left_to_ceiling = ( - transformation_estimator.predict_batch( + window_left_to_ceiling_transformation_estimator.predict_batch( ceiling_poses, window_left_poses, ceiling_depths, - [None for _ in range(n_frames)], # type: ignore # safe to pass + window_left_depths, ) if ceiling_poses is not None and window_left_poses is not None else None ) window_right_to_ceiling = ( - transformation_estimator.predict_batch( + window_right_to_ceiling_transformation_estimator.predict_batch( ceiling_poses, window_right_poses, ceiling_depths, - [None for _ in range(n_frames)], # type: ignore # safe to pass + window_right_depths, ) if ceiling_poses is not None and window_right_poses is not None else None ) Logger.info('Done!') + if window_left_to_ceiling is None: + Logger.error('window_left_to_ceiling == None') + + if window_right_to_ceiling is None: + Logger.error('window_right_to_ceiling == None') + Logger.info('Detecting faces...') window_left_faces = ( face_estimator.predict_batch(window_left_frames, window_left_poses) @@ -241,7 +308,29 @@ def main( ) Logger.info('Done!') + if window_left_gazes is None: + Logger.error('window_left_gazes == None') + + if window_right_gazes is None: + Logger.error('window_right_gazes == None') + Logger.info('Visualizing results...') + ceiling_projection_annotated_frames = ceiling_visualizer.annotate_batch( + ceiling_frames, + [ + p.unproject(window_left_properties.calibration, ceiling_depth) + .transform(t.inverse) + .project(ceiling_properties.calibration) + for p, t in zip(window_left_poses or [], window_left_to_ceiling or []) + ], + [ + p.unproject(window_right_properties.calibration, ceiling_depth) + .transform(t.inverse) + .project(ceiling_properties.calibration) + for p, t in zip(window_right_poses or [], window_right_to_ceiling or []) + ], + ) + ceiling_annotated_frames = ceiling_visualizer.annotate_batch( ceiling_frames, ceiling_poses, @@ -264,6 +353,16 @@ def main( Logger.info('Done!') Logger.info('Saving results...') + ceiling_projection_writer.write_batch(ceiling_projection_annotated_frames) + + ceiling_depth_writer.write_batch([depth.to_frame(d) for d in ceiling_depths]) + window_left_depth_writer.write_batch( + [depth.to_frame(d) for d in window_left_depths] + ) + window_right_depth_writer.write_batch( + [depth.to_frame(d) for d in window_right_depths] + ) + ceiling_writer.write_batch(ceiling_annotated_frames) window_left_writer.write_batch(window_left_annotated_frames) window_right_writer.write_batch(window_right_annotated_frames) diff --git a/child_lab_framework/core/algebra.py b/child_lab_framework/core/algebra.py index 90961d9..e0e93a1 100644 --- a/child_lab_framework/core/algebra.py +++ b/child_lab_framework/core/algebra.py @@ -1,6 +1,8 @@ from enum import IntEnum +from typing import Literal import numpy as np +from scipy.spatial.transform import Rotation from ..typing.array import FloatArray1, FloatArray2, FloatArray3, FloatArray6 from .calibration import Calibration @@ -31,6 +33,20 @@ def rotation_matrix(angle: float, axis: Axis) -> FloatArray2: ) +def euler_angles_from_rotation_matrix( + rotation: FloatArray2, +) -> np.ndarray[tuple[Literal[3]], np.dtype[np.float32]]: + return ( + Rotation.from_matrix(rotation).as_euler('xyz', degrees=False).astype(np.float32) + ) + + +def rotation_matrix_from_euler_angles( + angles: np.ndarray[tuple[Literal[3]], np.dtype[np.float32]], +) -> FloatArray2: + return Rotation.from_euler('xyz', angles, degrees=False).as_matrix() + + def normalized(vecs: FloatArray2) -> FloatArray2: norm = np.linalg.norm(vecs, ord=2.0, axis=1) return vecs / norm diff --git a/child_lab_framework/core/transformation/__init__.py b/child_lab_framework/core/transformation/__init__.py index fa89b05..eeb169f 100644 --- a/child_lab_framework/core/transformation/__init__.py +++ b/child_lab_framework/core/transformation/__init__.py @@ -1,3 +1,7 @@ +from .interface import Projectable, Transformable, Unprojectable + +from .error import reprojection_error # isort: skip + from .transformation import ( EuclideanTransformation, ProjectiveTransformation, @@ -7,8 +11,12 @@ from .buffer import Buffer # isort: skip __all__ = [ + 'Projectable', + 'Transformable', + 'Unprojectable', 'Buffer', 'Transformation', 'EuclideanTransformation', 'ProjectiveTransformation', + 'reprojection_error', ] diff --git a/child_lab_framework/core/transformation/buffer.py b/child_lab_framework/core/transformation/buffer.py index dc22611..384d6b7 100644 --- a/child_lab_framework/core/transformation/buffer.py +++ b/child_lab_framework/core/transformation/buffer.py @@ -7,6 +7,8 @@ from .. import serialization from . import Transformation +from .error import reprojection_error as _reprojection_error +from .interface import ProjectableAndTransformable # T appears both as a method argument and return type, therefore it must be invariant. @@ -74,9 +76,14 @@ def __setitem__( def __getitem__(self, from_to: tuple[T, T]) -> Transformation | None: connections = self.__connections - maybe_result: Transformation | None = connections.get_edge_data(*from_to).get( - 'transformation' - ) + match connections.get_edge_data(*from_to): + case {'transformation': transformation} if isinstance( + transformation, Transformation + ): + maybe_result = transformation + + case _: + maybe_result = None if maybe_result is not None: return maybe_result @@ -100,6 +107,51 @@ def __getitem__(self, from_to: tuple[T, T]) -> Transformation | None: return transformation + def reprojection_error[U: ProjectableAndTransformable]( + self, + evaluated_frame: T, + referential_frame: T, + evaluated_object: U, + referential_object: U, + ) -> float: + transformation = self.__getitem__((evaluated_frame, referential_frame)) + if transformation is None: + return float('inf') + + return _reprojection_error(evaluated_object, referential_object, transformation) + + def update_transformation_if_better[U: ProjectableAndTransformable]( + self, + from_frame: T, + to_frame: T, + from_object: U, + to_object: U, + transformation: Transformation, + ) -> None: + new_error = _reprojection_error(from_object, to_object, transformation) + current_error = self.reprojection_error( + from_frame, + to_frame, + from_object, + to_object, + ) + + if new_error < current_error: + self.__setitem__((from_frame, to_frame), transformation) + + inverse_transformation = transformation.inverse + + new_error = _reprojection_error(to_object, from_object, inverse_transformation) + current_error = self.reprojection_error( + to_frame, + from_frame, + to_object, + from_object, + ) + + if new_error < current_error: + self.__setitem__((to_frame, from_frame), inverse_transformation) + def serialize(self) -> dict[str, serialization.Value]: return { 'frames_of_reference': list(map(str, self.__frames_of_reference)), diff --git a/child_lab_framework/core/transformation/error.py b/child_lab_framework/core/transformation/error.py new file mode 100644 index 0000000..c68e47e --- /dev/null +++ b/child_lab_framework/core/transformation/error.py @@ -0,0 +1,15 @@ +import numpy as np + +from .interface import ProjectableAndTransformable +from .transformation import Transformation + + +def reprojection_error[T: ProjectableAndTransformable]( + object_in_a: T, + object_in_b: T, + a_to_b: Transformation, +) -> float: + true_points_in_b = object_in_b.flat_points + projected_points_in_b = object_in_a.transform(a_to_b).flat_points + + return float(np.linalg.norm(true_points_in_b - projected_points_in_b)) diff --git a/child_lab_framework/core/transformation/interface.py b/child_lab_framework/core/transformation/interface.py new file mode 100644 index 0000000..3473994 --- /dev/null +++ b/child_lab_framework/core/transformation/interface.py @@ -0,0 +1,25 @@ +from typing import Protocol, Self + +from ...typing.array import FloatArray2 +from ..calibration import Calibration +from .transformation import Transformation + + +class Transformable(Protocol): + def transform(self, transformation: Transformation) -> Self: ... + + +class Projectable[T: 'Unprojectable'](Protocol): + @property + def flat_points(self) -> FloatArray2: ... + def project(self, calibration: Calibration) -> T: ... + + +class Unprojectable[T](Protocol): + def unproject(self, calibration: Calibration, depth: FloatArray2) -> T: ... + + +# A workaround - Python doesn't have a notion of type intersection. +# Cannot just write `T: Projectable + Transformable`. +# Remove this protocol as soon as it's possible. Hopefully... +class ProjectableAndTransformable(Projectable, Transformable, Protocol): ... diff --git a/child_lab_framework/core/transformation/transformation.py b/child_lab_framework/core/transformation/transformation.py index 80df18f..5a2a124 100644 --- a/child_lab_framework/core/transformation/transformation.py +++ b/child_lab_framework/core/transformation/transformation.py @@ -24,7 +24,8 @@ def transform[Shape, DataType: Any]( if len(input.shape) == 1: return self.rotation @ input + self.translation # type: ignore - return np.einsum('...j,ij->...i', input, self.rotation) + self.translation + translation = np.squeeze(self.translation) # TODO: get rid of this operation + return np.einsum('...j,ij->...i', input, self.rotation) + translation @property @abstractmethod diff --git a/child_lab_framework/core/video.py b/child_lab_framework/core/video.py index 6b71a25..d965d43 100644 --- a/child_lab_framework/core/video.py +++ b/child_lab_framework/core/video.py @@ -19,6 +19,7 @@ class Format(Enum): @dataclass(frozen=True, repr=False) class Properties: + name: str length: int height: int width: int @@ -93,6 +94,7 @@ def __init__( ) self.__input_properties = Properties( + input.name, input_length, input_height, input_width, @@ -102,6 +104,7 @@ def __init__( # Output properties with maybe mimicked parameters self.properties = Properties( + input.name, input_length * self.__frame_repetitions, mimicked_height, mimicked_width, diff --git a/child_lab_framework/task/camera/transformation/heuristic/box_kabsch.py b/child_lab_framework/task/camera/transformation/heuristic/box_kabsch.py new file mode 100644 index 0000000..d5999d9 --- /dev/null +++ b/child_lab_framework/task/camera/transformation/heuristic/box_kabsch.py @@ -0,0 +1,108 @@ +from math import ceil, floor + +import numpy as np + +from .....core.algebra import kabsch +from .....core.calibration import Calibration +from .....core.transformation import EuclideanTransformation +from .....typing.array import FloatArray2, IntArray1 +from .... import pose + + +def estimate( + from_pose: pose.Result, + to_pose: pose.Result, + from_depth: FloatArray2, + to_depth: FloatArray2, + from_calibration: Calibration, + to_calibration: Calibration, + confidence_threshold: float, +) -> EuclideanTransformation | None: + from_cloud = __cloud_from_bounding_boxes( + from_pose, from_calibration, from_depth, confidence_threshold + ) + + if from_cloud is None: + return None + + to_cloud = __cloud_from_bounding_boxes( + to_pose, to_calibration, to_depth, confidence_threshold + ) + + if to_cloud is None: + return None + + from_cloud, to_cloud = __truncate_to_equal_size(from_cloud, to_cloud) + + return EuclideanTransformation(*kabsch(from_cloud, to_cloud)) + + +def __cloud_from_bounding_boxes( + poses: pose.Result, + calibration: Calibration, + depth: FloatArray2, + confidence_threshold: float, +) -> FloatArray2 | None: + height, width = depth.shape + cx, cy = calibration.optical_center + fx, fy = calibration.focal_length + + space_chunks: list[FloatArray2] = [] + + box: IntArray1 + for box in poses.boxes: + if box[4] < confidence_threshold: + continue + + x_start = max(int(floor(box[0])), 0) + y_start = max(int(floor(box[1])), 0) + x_end = min(int(ceil(box[2])), width) + y_end = min(int(ceil(box[3])), height) + + x_indices, y_indices = np.meshgrid( + np.arange(x_start, x_end, step=1.0, dtype=np.float32), + np.arange(y_start, y_end, step=1.0, dtype=np.float32), + indexing='xy', + ) + + z = depth[y_start:y_end, x_start:x_end] + + x = (x_indices - cx) * z / fx + y = (y_indices - cy) * z / fy + + points = np.concatenate( + (x.reshape(-1, 1), y.reshape(-1, 1), z.reshape(-1, 1)), + axis=1, + ) + + space_chunks.append(points) + + if len(space_chunks) == 0: + return None + + return np.concatenate(space_chunks, axis=0, dtype=np.float32, casting='unsafe') + + +def __truncate_to_equal_size( + points1: FloatArray2, + points2: FloatArray2, +) -> tuple[FloatArray2, FloatArray2]: + n_points1, _ = points1.shape + n_points2, _ = points2.shape + + if n_points1 == n_points2: + return points1, points2 + + elif n_points1 < n_points2: + mask = np.ones(n_points2, dtype=bool) + mask[n_points1:] = False + np.random.shuffle(mask) + + return points1, points2[mask] + + else: + mask = np.ones(n_points1, dtype=bool) + mask[n_points2:] = False + np.random.shuffle(mask) + + return points1[mask], points2 diff --git a/child_lab_framework/task/camera/transformation/heuristic/heuristic.py b/child_lab_framework/task/camera/transformation/heuristic/heuristic.py index e943c3b..552221e 100644 --- a/child_lab_framework/task/camera/transformation/heuristic/heuristic.py +++ b/child_lab_framework/task/camera/transformation/heuristic/heuristic.py @@ -2,15 +2,14 @@ from concurrent.futures.thread import ThreadPoolExecutor from itertools import starmap -import cv2 - from .....core.sequence import imputed_with_reference_inplace -from .....core.transformation import ProjectiveTransformation +from .....core.transformation import Buffer, Transformation from .....core.video import Properties -from .....typing.array import FloatArray1, FloatArray2 +from .....logging import Logger +from .....typing.array import FloatArray2 from .....typing.stream import Fiber from .... import pose -from . import projection +from . import box_kabsch, kabsch, projection type Input = tuple[ list[pose.Result | None] | None, @@ -21,58 +20,101 @@ class Estimator: - from_view: Properties + executor: ThreadPoolExecutor + transformation_buffer: Buffer[str] + from_view: Properties to_view: Properties - to_view_intrinsics: FloatArray2 - to_view_distortion: FloatArray1 keypoint_threshold: float - executor: ThreadPoolExecutor - def __init__( self, executor: ThreadPoolExecutor, + transformation_buffer: Buffer[str], from_view: Properties, to_view: Properties, *, keypoint_threshold: float = 0.25, ) -> None: + self.executor = executor + + self.transformation_buffer = transformation_buffer + transformation_buffer.add_frame_of_reference(from_view.name) + transformation_buffer.add_frame_of_reference(to_view.name) + self.from_view = from_view self.to_view = to_view - self.to_view_intrinsics = to_view.calibration.intrinsics[:3, :3] - self.to_view_distortion = to_view.calibration.distortion self.keypoint_threshold = keypoint_threshold - self.executor = executor - + # TODO: rebuild transformation estimation API - get rid of `heuristic.Estimator` facade and implement appropriate estimators for concrete methods. def predict( self, from_pose: pose.Result, to_pose: pose.Result, from_depth: FloatArray2, to_depth: FloatArray2, - ) -> ProjectiveTransformation | None: - match projection.estimate( - from_pose, - to_pose, - from_depth, - to_depth, - self.to_view_intrinsics, - self.to_view_distortion, - self.keypoint_threshold, - ): - case rotation, translation: - return ProjectiveTransformation( - cv2.Rodrigues(rotation)[0], # type: ignore - translation, - self.to_view.calibration, - ) + ) -> Transformation | None: + # TODO: remove this workaround when actors are recognized and matched between cameras + if len(from_pose.actors) != len(to_pose.actors): + return None + + from_calibration = self.from_view.calibration + to_calibration = self.to_view.calibration + + from_pose_3d = from_pose.unproject(from_calibration, from_depth) + to_pose_3d = to_pose.unproject(to_calibration, to_depth) + + from_name = self.from_view.name + to_name = self.to_view.name + + buffer = self.transformation_buffer + + transformation: Transformation | None = None + for transformation in [ + projection.estimate( + from_pose, + from_pose_3d, + to_pose, + to_pose_3d, + from_calibration, + to_calibration, + self.keypoint_threshold, + ), + kabsch.estimate( + from_pose, + from_pose_3d, + to_pose, + to_pose_3d, + self.keypoint_threshold, + ), + box_kabsch.estimate( + from_pose, + to_pose, + from_depth, + to_depth, + from_calibration, + to_calibration, + self.keypoint_threshold, + ), + ]: + if transformation is None: + continue + + buffer.update_transformation_if_better( + from_name, + to_name, + from_pose_3d, + to_pose_3d, + transformation, + ) + + error = buffer.reprojection_error(from_name, to_name, from_pose_3d, to_pose_3d) + + Logger.info(f'Reprojection error from {from_name} to {to_name}: {error:.2e}') - case None: - return None + return buffer[from_name, to_name] def predict_batch( self, @@ -80,7 +122,7 @@ def predict_batch( to_poses: list[pose.Result], from_depths: list[FloatArray2], to_depths: list[FloatArray2], - ) -> list[ProjectiveTransformation] | None: + ) -> list[Transformation] | None: return imputed_with_reference_inplace( list( starmap( @@ -101,7 +143,7 @@ def __predict_safe( to_pose: pose.Result | None, from_depth: FloatArray2, to_depth: FloatArray2, - ) -> ProjectiveTransformation | None: + ) -> Transformation | None: if from_pose is None or to_pose is None: return None @@ -109,11 +151,11 @@ def __predict_safe( async def stream( self, - ) -> Fiber[Input | None, list[ProjectiveTransformation | None] | None]: + ) -> Fiber[Input | None, list[Transformation | None] | None]: loop = asyncio.get_running_loop() executor = self.executor - results: list[ProjectiveTransformation | None] | None = None + results: list[Transformation | None] | None = None while True: match (yield results): diff --git a/child_lab_framework/task/camera/transformation/heuristic/kabsch.py b/child_lab_framework/task/camera/transformation/heuristic/kabsch.py index f72d11c..05b3e2c 100644 --- a/child_lab_framework/task/camera/transformation/heuristic/kabsch.py +++ b/child_lab_framework/task/camera/transformation/heuristic/kabsch.py @@ -2,63 +2,45 @@ import numpy as np -from .....typing.array import FloatArray1, FloatArray2, FloatArray3, IntArray1 +from .....core.algebra import kabsch +from .....core.transformation import EuclideanTransformation +from .....typing.array import FloatArray2, IntArray1 from .... import pose -def common_points_indicator( - points1: FloatArray2, points2: FloatArray2, confidence_threshold: float -) -> IntArray1: - probabilities = np.minimum(points1.view()[:, -1], points2.view()[:, -1]) - - indicator = np.squeeze(np.where(probabilities >= confidence_threshold)) - - return typing.cast(IntArray1, indicator) - - def estimate( from_pose: pose.Result, + from_pose_3d: pose.Result3d, to_pose: pose.Result, - from_depth: FloatArray2, - to_depth: FloatArray2, + to_pose_3d: pose.Result3d, confidence_threshold: float, -) -> tuple[FloatArray2, FloatArray1] | None: - # NOTE: A quick workaround - if len(from_pose.actors) != len(to_pose.actors): - return None +) -> EuclideanTransformation | None: + from_keypoints = from_pose.flat_points_with_confidence + to_keypoints = to_pose.flat_points_with_confidence - from_keypoints = from_pose.depersonificated_keypoints - to_keypoints = to_pose.depersonificated_keypoints - common = common_points_indicator(from_keypoints, to_keypoints, confidence_threshold) + common = __common_points_indicator(from_keypoints, to_keypoints, confidence_threshold) - from_points_2d: FloatArray2 = from_keypoints.view()[common][:, [0, 1]] - to_points_2d: FloatArray2 = to_keypoints.view()[common][:, [0, 1]] - - from_xs, from_ys = from_points_2d.astype(np.int32).T - to_xs, to_ys = to_points_2d.astype(np.int32).T - - from_depths: FloatArray2 = from_depth[from_ys, from_xs, np.newaxis] - to_depths: FloatArray2 = to_depth[to_ys, to_xs, np.newaxis] + if common.size < 6: + return None - from_points_3d: FloatArray3 = np.concatenate((from_points_2d, from_depths), axis=1) - to_points_3d: FloatArray3 = np.concatenate((to_points_2d, to_depths), axis=1) + from_points_3d = from_pose_3d.flat_points[common] + to_points_3d = to_pose_3d.flat_points[common] - from_center: FloatArray1 = typing.cast(FloatArray1, np.mean(from_points_3d, axis=0)) - to_center: FloatArray1 = typing.cast(FloatArray1, np.mean(to_points_3d, axis=0)) + return EuclideanTransformation(*kabsch(from_points_3d, to_points_3d)) - translation: FloatArray1 = to_center - from_center - cross_covariance: FloatArray2 = np.dot( - (from_points_3d - from_center).T, to_points_3d - to_center +def __common_points_indicator( + points1: FloatArray2, + points2: FloatArray2, + confidence_threshold: float, +) -> IntArray1: + sufficient_confidence = ( + np.minimum(points1.view()[:, 2], points2.view()[:, 2]) >= confidence_threshold ) - u, _, vt = np.linalg.svd(cross_covariance) - v = vt.T - ut = u.T - - if np.linalg.det(np.dot(v, ut)) < 0.0: - v[-1, :] *= -1.0 + x_non_zero = (points1[:, 0] * points2[:, 0]) > 0.0 + y_non_zero = (points1[:, 1] * points2[:, 1]) > 0.0 - rotation: FloatArray2 = np.dot(v, ut) + indicator = np.squeeze(np.where(sufficient_confidence & x_non_zero & y_non_zero)) - return rotation, translation + return typing.cast(IntArray1, indicator) diff --git a/child_lab_framework/task/camera/transformation/heuristic/projection.py b/child_lab_framework/task/camera/transformation/heuristic/projection.py index d53ff72..4f6aaf5 100644 --- a/child_lab_framework/task/camera/transformation/heuristic/projection.py +++ b/child_lab_framework/task/camera/transformation/heuristic/projection.py @@ -3,69 +3,56 @@ import cv2 import numpy as np -from .....task import pose -from .....typing.array import ( - FloatArray1, - FloatArray2, - FloatArray3, - IntArray1, +from .....core.algebra import ( + euler_angles_from_rotation_matrix, + rotation_matrix_from_euler_angles, ) - - -def common_points_indicator( - points1: FloatArray2, - points2: FloatArray2, - confidence_threshold: float, -) -> IntArray1: - sufficient_confidence = ( - np.minimum(points1.view()[:, -1], points2.view()[:, -1]) >= confidence_threshold - ) - - x_non_zero = (points1[:, 0] * points2[:, 0]) >= 0.0 - y_non_zero = (points1[:, 1] * points2[:, 1]) >= 0.0 - - indicator = np.squeeze(np.where(sufficient_confidence & x_non_zero & y_non_zero)) - - return typing.cast(IntArray1, indicator) +from .....core.calibration import Calibration +from .....core.transformation import ProjectiveTransformation +from .....task import pose +from .....typing.array import FloatArray1, FloatArray2, IntArray1 def estimate( from_pose: pose.Result, + from_pose_3d: pose.Result3d, to_pose: pose.Result, - from_depth: FloatArray2, - to_depth: FloatArray2, - intrinsics_matrix: FloatArray2, - distortion: FloatArray1, + to_pose_3d: pose.Result3d, + from_calibration: Calibration, + to_calibration: Calibration, confidence_threshold: float, -) -> tuple[FloatArray1, FloatArray1] | None: - # NOTE: Workaround; fix when inter-camera actor recognition is introduced - if len(from_pose.actors) != len(to_pose.actors): - return None +) -> ProjectiveTransformation | None: + from_points_2d = from_pose.flat_points_with_confidence + to_points_2d = to_pose.flat_points_with_confidence - from_keypoints = from_pose.depersonificated_keypoints - to_keypoints = to_pose.depersonificated_keypoints - common = common_points_indicator(from_keypoints, to_keypoints, confidence_threshold) + common = __common_points_indicator(from_points_2d, to_points_2d, confidence_threshold) if common.size < 6: return None - from_points_2d: FloatArray2 = from_keypoints.view()[common][:, [0, 1]] - to_points_2d: FloatArray2 = to_keypoints.view()[common][:, [0, 1]] - - from_xs, from_ys = from_points_2d.astype(np.int32).T + from_points_2d = from_points_2d[common][:, [0, 1]] + to_points_2d = to_points_2d[common][:, [0, 1]] - from_xs[from_xs >= 1920] = 1919 - from_ys[from_ys >= 1080] = 1079 - - from_depths: FloatArray2 = from_depth[from_ys, from_xs].reshape(-1, 1) - - from_points_3d: FloatArray3 = np.concatenate((from_points_2d, from_depths), axis=1) + to_points_3d = to_pose_3d.flat_points[common] + from_points_3d = from_pose_3d.flat_points[common] success, rotation, translation = cv2.solvePnP( from_points_3d, to_points_2d, - intrinsics_matrix, - distortion, + to_calibration.intrinsics, + to_calibration.distortion, + useExtrinsicGuess=True, + flags=cv2.SOLVEPNP_SQPNP, + ) + + if not success: + return None + + success, inverse_rotation, inverse_translation = cv2.solvePnP( + to_points_3d, + from_points_2d, + from_calibration.intrinsics, + from_calibration.distortion, useExtrinsicGuess=True, flags=cv2.SOLVEPNP_SQPNP, ) @@ -73,4 +60,39 @@ def estimate( if not success: return None - return (typing.cast(FloatArray1, rotation), typing.cast(FloatArray1, translation)) + rotation = typing.cast(FloatArray2, cv2.Rodrigues(rotation)[0]) + inverse_rotation = typing.cast(FloatArray2, cv2.Rodrigues(inverse_rotation)[0]) + inverse_inverse_rotation = np.linalg.inv(inverse_rotation) + + average_rotation = rotation_matrix_from_euler_angles( + ( + euler_angles_from_rotation_matrix(rotation) + + euler_angles_from_rotation_matrix(inverse_inverse_rotation) + ) + / 2.0 + ) + + average_translation: FloatArray1 = (translation - inverse_translation) / 2.0 + + return ProjectiveTransformation( + average_rotation, + average_translation, + to_calibration, + ) + + +def __common_points_indicator( + points1: FloatArray2, + points2: FloatArray2, + confidence_threshold: float, +) -> IntArray1: + sufficient_confidence = ( + np.minimum(points1.view()[:, 2], points2.view()[:, 2]) >= confidence_threshold + ) + + x_non_zero = (points1[:, 0] * points2[:, 0]) > 0.0 + y_non_zero = (points1[:, 1] * points2[:, 1]) > 0.0 + + indicator = np.squeeze(np.where(sufficient_confidence & x_non_zero & y_non_zero)) + + return typing.cast(IntArray1, indicator) diff --git a/child_lab_framework/task/camera/transformation/heuristic/shoulders.py b/child_lab_framework/task/camera/transformation/heuristic/shoulders.py deleted file mode 100644 index 1e3d5fb..0000000 --- a/child_lab_framework/task/camera/transformation/heuristic/shoulders.py +++ /dev/null @@ -1,67 +0,0 @@ -import numpy as np - -from .....core.algebra import Axis, rotation_matrix -from .....core.video import Perspective -from .....typing.array import FloatArray1, FloatArray2, FloatArray3 - - -# Trivial at the moment -def anchor_person_index(n_people: int, perspective: Perspective) -> int: - match perspective: - case Perspective.WINDOW_LEFT: - return n_people - 1 - - case Perspective.WINDOW_RIGHT: - return 0 - - case _: - assert False, 'Unreachable' - - -# TODO: Add depth -# NOTE: assumes matched person indices in both views -def estimate( - ceiling_shoulders: FloatArray3, side_shoulders: FloatArray3, perspective: Perspective -) -> tuple[FloatArray2, FloatArray1]: - anchor = anchor_person_index( - min(len(ceiling_shoulders), len(side_shoulders)), perspective - ) - - anchor_ceiling_shoulders: FloatArray2 = ceiling_shoulders[anchor, ...] - anchor_side_shoulders: FloatArray2 = side_shoulders[anchor, ...] - - anchor_ceiling_shoulder_components: FloatArray1 = ( - anchor_ceiling_shoulders[1, :-1] - anchor_ceiling_shoulders[0, :-1] - ) - anchor_side_shoulder_components: FloatArray1 = ( - anchor_side_shoulders[1, :-1] - anchor_side_shoulders[0, :-1] - ) - - print(f'\n{anchor_ceiling_shoulder_components = }') - print(f'\n{anchor_side_shoulder_components = }') - - alpha, gamma = np.arccos( - anchor_side_shoulder_components / anchor_ceiling_shoulder_components - ) - print(f'\n{alpha = }, {gamma = }\n') - - rotation: FloatArray2 = rotation_matrix(alpha, Axis.X) @ rotation_matrix( - gamma, Axis.Z - ) - - anchor_side_shoulders[:, 2] = 1.0 - - anchor_ceiling_shoulder_components_reconstructed: FloatArray2 = ( - anchor_side_shoulders @ rotation - ) - translation_components: FloatArray2 = ( - anchor_ceiling_shoulder_components_reconstructed - - anchor_ceiling_shoulder_components.reshape(-1, 1) - ) - - translation: FloatArray1 = ( - translation_components[0, :] + translation_components[1, :] - ) / 2.0 - translation = translation.reshape(-1, 1) - - return rotation, translation diff --git a/child_lab_framework/task/depth/__init__.py b/child_lab_framework/task/depth/__init__.py index b6102c0..fed65c4 100644 --- a/child_lab_framework/task/depth/__init__.py +++ b/child_lab_framework/task/depth/__init__.py @@ -1,3 +1,3 @@ -from .depth import Estimator +from .depth import Estimator, to_frame -__all__ = ['Estimator'] +__all__ = ['Estimator', 'to_frame'] diff --git a/child_lab_framework/task/depth/depth.py b/child_lab_framework/task/depth/depth.py index c64d5d1..ebd00ce 100644 --- a/child_lab_framework/task/depth/depth.py +++ b/child_lab_framework/task/depth/depth.py @@ -1,6 +1,5 @@ import asyncio from concurrent.futures import ThreadPoolExecutor -from typing import Literal import numpy import torch @@ -14,23 +13,14 @@ from ...typing.stream import Fiber from ...util import MODELS_DIR -type DepthProResult = dict[Literal['depth', 'focallength_px'], torch.Tensor] - def to_frame(depth_map: FloatArray2) -> Frame: - green = (depth_map / depth_map.max() * 255).astype(numpy.uint8) - other = numpy.zeros_like(green) - frame = numpy.transpose(numpy.stack((other, green, other)), (1, 2, 0)) - return frame - - -# DEVICE_LOCK: threading.Lock = threading.Lock() + normalized = (depth_map / depth_map.max() * 255).astype(numpy.uint8) + return numpy.transpose(numpy.stack((normalized, normalized, normalized)), (1, 2, 0)) class Estimator: MODEL_PATH = MODELS_DIR / 'depth_pro.pt' - MODEL_INPUT_SIZE = (616, 1064) - PADDING_BORDER_COLOR = (123.675, 116.28, 103.53) executor: ThreadPoolExecutor device: torch.device @@ -70,9 +60,7 @@ def __init__( ] ) - def predict(self, frame: Frame) -> FloatArray2: - # frame = opencv.cvtColor(frame, opencv.COLOR_BGR2RGB) # type: ignore - + def predict(self, frame: Frame, properties: Properties) -> FloatArray2: input = torch.from_numpy(frame.copy()).to(self.device) input = torch.permute(input, (2, 0, 1)) input = torch.unsqueeze(input, 0) @@ -80,8 +68,14 @@ def predict(self, frame: Frame) -> FloatArray2: # shape of the input after transposition: 1 x n_channels x height x width + focal_length = ( + properties.calibration.focal_length[0] + * self.model.input_image_size + / properties.width + ) + # TODO: return the tensor itself without transferring (Issue #6) - result = self.model.predict(input) + result = self.model.predict(input, focal_length) depth = self.from_model(result.depth).cpu().numpy() del result @@ -101,7 +95,7 @@ async def stream(self) -> Fiber[list[Frame] | None, list[FloatArray2] | None]: results = await loop.run_in_executor( executor, lambda: imputed_with_reference_inplace( - [self.predict(frames[n_frames // 2])] + [self.predict(frames[n_frames // 2], self.input)] + [None for _ in range(n_frames - 1)] ), ) diff --git a/child_lab_framework/task/gaze/__init__.py b/child_lab_framework/task/gaze/__init__.py index c4eb6a7..55f6dab 100644 --- a/child_lab_framework/task/gaze/__init__.py +++ b/child_lab_framework/task/gaze/__init__.py @@ -1,4 +1,4 @@ from . import ceiling_projection -from .gaze import Estimator, Result +from .gaze import Estimator, Result3d -__all__ = ['ceiling_projection', 'Estimator', 'Result'] +__all__ = ['ceiling_projection', 'Estimator', 'Result3d'] diff --git a/child_lab_framework/task/gaze/ceiling_projection.py b/child_lab_framework/task/gaze/ceiling_projection.py index 3849854..f7b51ac 100644 --- a/child_lab_framework/task/gaze/ceiling_projection.py +++ b/child_lab_framework/task/gaze/ceiling_projection.py @@ -9,7 +9,7 @@ from ...core.sequence import imputed_with_reference_inplace from ...core.stream import InvalidArgumentException -from ...core.transformation import ProjectiveTransformation +from ...core.transformation import Transformation from ...core.video import Properties from ...logging import Logger from ...typing.array import BoolArray1, FloatArray1, FloatArray2 @@ -24,8 +24,8 @@ list[pose.Result | None] | None, list[face.Result | None] | None, list[face.Result | None] | None, - list[ProjectiveTransformation | None] | None, - list[ProjectiveTransformation | None] | None, + list[Transformation | None] | None, + list[Transformation | None] | None, ] @@ -42,7 +42,7 @@ def visualize( configuration: visualization.Configuration, ) -> Frame: starts = self.centres - ends = starts + 100.0 * self.directions + ends = starts + float(configuration.gaze_line_length) * self.directions start: FloatArray1 end: FloatArray1 @@ -66,16 +66,12 @@ def visualize( class Estimator: - BASELINE_WEIGHT: float = 0.0 - COLLECTIVE_CORRECTION_WEIGHT: float = 1.0 - BASELINE_WEIGHT - TEMPORARY_RESCALE: float = 1000.0 # for numerical stability during projection + executor: ThreadPoolExecutor ceiling_properties: Properties window_left_properties: Properties window_right_properties: Properties - executor: ThreadPoolExecutor - def __init__( self, executor: ThreadPoolExecutor, @@ -91,10 +87,10 @@ def __init__( def predict( self, ceiling_pose: pose.Result, - window_left_gaze: gaze.Result | None, - window_right_gaze: gaze.Result | None, - window_left_to_ceiling: ProjectiveTransformation | None, - window_right_to_ceiling: ProjectiveTransformation | None, + window_left_gaze: gaze.Result3d | None, + window_right_gaze: gaze.Result3d | None, + window_left_to_ceiling: Transformation | None, + window_right_to_ceiling: Transformation | None, ) -> Result: centres, directions = ceiling_baseline.estimate( ceiling_pose, @@ -119,43 +115,28 @@ def predict( np.array([False for _ in range(len(centres))]), ) - correction_weight = self.COLLECTIVE_CORRECTION_WEIGHT / float(correction_count) - baseline_weight = self.BASELINE_WEIGHT - temporary_rescale = self.TEMPORARY_RESCALE - # slicing gaze direction arrays is a heuristic workaround for a lack of exact actor matching between cameras # assuming there are only two actors in the world. # gaze detected in the left camera => it belongs the actor on the right # cannot reuse `correct_from_left` value because the type checker gets confused about not-None values if window_left_gaze is not None and window_left_to_ceiling is not None: - directions_simplified = ( - np.squeeze(np.mean(window_left_gaze.directions, axis=1)) - * temporary_rescale - ) + calibration = self.window_left_properties.calibration + projected_gaze = window_left_gaze.transform( + window_left_to_ceiling.inverse + ).project(calibration) - directions_projected = ( - window_left_to_ceiling.project(directions_simplified) - / temporary_rescale - * correction_weight - ) - - directions[-1, ...] *= baseline_weight - directions[-1, ...] += directions_projected[-1, ...] + projected_directions = np.squeeze(np.mean(projected_gaze.directions, axis=1)) + directions[-1, ...] = projected_directions[-1, ...] if window_right_gaze is not None and window_right_to_ceiling is not None: - directions_simplified = np.squeeze( - np.mean(window_right_gaze.directions, axis=1) * temporary_rescale - ) - - directions_projected = ( - window_right_to_ceiling.project(directions_simplified) - / temporary_rescale - * correction_weight - ) + calibration = self.window_right_properties.calibration + projected_gaze = window_right_gaze.transform( + window_right_to_ceiling.inverse + ).project(calibration) - directions[0, ...] *= baseline_weight - directions[0, ...] += directions_projected[0, ...] + projected_directions = np.squeeze(np.mean(projected_gaze.directions, axis=1)) + directions[0, ...] = projected_directions[0, ...] return Result( centres, @@ -168,10 +149,10 @@ def predict( def predict_batch( self, ceiling_poses: list[pose.Result], - window_left_gazes: list[gaze.Result] | None, - window_right_gazes: list[gaze.Result] | None, - window_left_to_ceiling: list[ProjectiveTransformation] | None, - window_right_to_ceiling: list[ProjectiveTransformation] | None, + window_left_gazes: list[gaze.Result3d] | None, + window_right_gazes: list[gaze.Result3d] | None, + window_left_to_ceiling: list[Transformation] | None, + window_right_to_ceiling: list[Transformation] | None, ) -> list[Result] | None: return imputed_with_reference_inplace( list( @@ -191,10 +172,10 @@ def predict_batch( def __predict_safe( self, ceiling_pose: pose.Result | None, - window_left_gaze: gaze.Result | None, - window_right_gaze: gaze.Result | None, - window_left_to_ceiling: ProjectiveTransformation | None, - window_right_to_ceiling: ProjectiveTransformation | None, + window_left_gaze: gaze.Result3d | None, + window_right_gaze: gaze.Result3d | None, + window_left_to_ceiling: Transformation | None, + window_right_to_ceiling: Transformation | None, ) -> Result | None: if ceiling_pose is None: return None diff --git a/child_lab_framework/task/gaze/gaze.py b/child_lab_framework/task/gaze/gaze.py index 0a4b6bc..b41871b 100644 --- a/child_lab_framework/task/gaze/gaze.py +++ b/child_lab_framework/task/gaze/gaze.py @@ -1,18 +1,22 @@ import asyncio import typing from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass, field +from dataclasses import dataclass +from functools import cached_property from itertools import repeat, starmap import cv2 import mini_face as mf import numpy as np +from ...core.algebra import normalized_3d +from ...core.calibration import Calibration from ...core.sequence import ( imputed_with_reference_inplace, imputed_with_zeros_reference_inplace, ) from ...core.stream import InvalidArgumentException +from ...core.transformation import Transformation from ...core.video import Frame, Properties from ...typing.array import FloatArray2, FloatArray3, IntArray1 from ...typing.stream import Fiber @@ -20,18 +24,66 @@ from .. import face, visualization -@dataclass -class Result2d: +@dataclass(frozen=True) +class Result: eyes: FloatArray3 directions: FloatArray3 -@dataclass -class Result: +@dataclass(frozen=True) +class Result3d: eyes: FloatArray3 directions: FloatArray3 - was_projected: bool = field(default=False) + # For numerical stability during transformation of a normalized `directions` + __STABILIZING_MULTIPLIER: typing.ClassVar[float] = 100.0 + + @property + def flat_points(self) -> FloatArray2: + return self.__flat_points + + @cached_property + def __flat_points(self) -> FloatArray2: + return np.concatenate(self.eyes) + + def project(self, calibration: Calibration) -> Result: + cx, cy = calibration.optical_center + fx, fy = calibration.focal_length + + # TODO: remove the rescale (Issue #60) + eyes = self.eyes.copy() * 8.0 / 28.0 + z = eyes[..., -1] + eyes[..., 0] *= fx + eyes[..., 1] *= fy + eyes[..., 0] /= z + eyes[..., 1] /= z + eyes[..., 0] += cx + eyes[..., 1] += cy + + ends = self.eyes + self.__STABILIZING_MULTIPLIER * self.directions + z = ends[..., -1] + ends[..., 0] *= fx + ends[..., 1] *= fy + ends[..., 0] /= z + ends[..., 1] /= z + ends[..., 0] += cx + ends[..., 1] += cy + + directions = ends - eyes + directions_2d = directions[..., :-1] + directions_2d = normalized_3d(directions_2d) + + return Result(eyes[..., :-1], directions_2d) + + def transform(self, transformation: Transformation) -> 'Result3d': + multiplier = self.__STABILIZING_MULTIPLIER + + return Result3d( + transformation.transform(self.eyes), + transformation.transform(multiplier * self.directions) / multiplier, + ) + + # TODO: Move to `Result` for consistency with other models def visualize( self, frame: Frame, @@ -45,31 +97,9 @@ def visualize( thickness = configuration.gaze_line_thickness line_length = configuration.gaze_line_length - calibration = frame_properties.calibration - cx, cy = calibration.optical_center - fx, fy = calibration.focal_length - - # TODO: Project using `Projectable` (Issue #59) - - # TODO: remove the rescale (Issue #60) - starts = self.eyes.copy() * 8.0 / 28.0 - ends = starts + float(line_length) * self.directions - - starts_z = starts[..., -1] - starts[..., 0] *= fx - starts[..., 1] *= fy - starts[..., 0] /= starts_z - starts[..., 1] /= starts_z - starts[..., 0] += cx - starts[..., 1] += cy - - ends_z = ends[..., -1] - ends[..., 0] *= fx - ends[..., 1] *= fy - ends[..., 0] /= ends_z - ends[..., 1] /= ends_z - ends[..., 0] += cx - ends[..., 1] += cy + projected = self.project(frame_properties.calibration) + starts = projected.eyes + ends = starts + float(line_length) * projected.directions actor_starts: FloatArray2 actor_ends: FloatArray2 @@ -133,7 +163,7 @@ def __init__( models_directory=self.MODELS_DIR, ) - def predict(self, frame: Frame, faces: face.Result) -> Result | None: + def predict(self, frame: Frame, faces: face.Result) -> Result3d | None: extractor = self.extractor eyes: list[FloatArray3 | None] = [] @@ -161,7 +191,7 @@ def predict(self, frame: Frame, faces: face.Result) -> Result | None: imputed_with_zeros_reference_inplace(directions), ): case list(eyes_imputed), list(directions_imputed): - return Result( + return Result3d( np.concatenate(eyes_imputed, axis=0), np.concatenate(directions_imputed, axis=0), ) @@ -173,22 +203,22 @@ def predict_batch( self, frames: list[Frame], faces: list[face.Result], - ) -> list[Result] | None: + ) -> list[Result3d] | None: return imputed_with_reference_inplace( list(starmap(self.predict, zip(frames, faces))) ) - def __predict_safe(self, frame: Frame, faces: face.Result | None) -> Result | None: + def __predict_safe(self, frame: Frame, faces: face.Result | None) -> Result3d | None: if faces is None: return None return self.predict(frame, faces) - async def stream(self) -> Fiber[Input | None, list[Result | None] | None]: + async def stream(self) -> Fiber[Input | None, list[Result3d | None] | None]: executor = self.executor loop = asyncio.get_running_loop() - results: list[Result | None] | None = None + results: list[Result3d | None] | None = None while True: match (yield results): diff --git a/child_lab_framework/task/pose/__init__.py b/child_lab_framework/task/pose/__init__.py index 527a21a..615e8e6 100644 --- a/child_lab_framework/task/pose/__init__.py +++ b/child_lab_framework/task/pose/__init__.py @@ -6,6 +6,7 @@ Estimator, Keypoints, Result, + Result3d, ) __all__ = [ @@ -16,4 +17,5 @@ 'Estimator', 'Keypoints', 'Result', + 'Result3d', ] diff --git a/child_lab_framework/task/pose/pose.py b/child_lab_framework/task/pose/pose.py index c1c5d3d..dfda95c 100644 --- a/child_lab_framework/task/pose/pose.py +++ b/child_lab_framework/task/pose/pose.py @@ -2,7 +2,9 @@ import typing from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from enum import IntEnum, auto +from functools import cached_property import cv2 import numpy as np @@ -12,8 +14,10 @@ from torchvision.transforms.transforms import InterpolationMode from ultralytics.engine import results as yolo +from ...core.calibration import Calibration from ...core.geometry import area_broadcast from ...core.sequence import imputed_with_reference_inplace +from ...core.transformation import Transformation from ...core.video import Frame, Properties from ...typing.array import FloatArray1, FloatArray2, FloatArray3, IntArray1 from ...typing.stream import Fiber @@ -34,70 +38,94 @@ class Actor(IntEnum): CHILD = auto() +@dataclass(frozen=True) class Result: n_detections: int boxes: BatchedBoxes - keypoints: BatchedKeypoints + keypoints: BatchedKeypoints # TODO: separate confidences from spatial coordinates actors: list[Actor] - __centres: FloatArray2 | None = None - __depersonificated_keypoints: FloatArray2 | None = None - # NOTE heuristic: sitting actors preserve lexicographic order of bounding boxes - @staticmethod - def __ordered( - boxes: BatchedBoxes, keypoints: BatchedKeypoints, actors: list[Actor] - ) -> tuple[BatchedBoxes, BatchedKeypoints, list[Actor]]: + @classmethod + def ordered( + cls, + n_detections: int, + boxes: BatchedBoxes, + keypoints: BatchedKeypoints, + actors: list[Actor], + ) -> typing.Self: order = np.lexsort(np.flipud(boxes.T)) boxes = boxes[order] keypoints = keypoints[order] actors = [actors[i] for i in order] - return boxes, keypoints, actors - - def __init__( - self, - n_detections: int, - boxes: BatchedBoxes, - keypoints: BatchedKeypoints, - actors: list[Actor], - ) -> None: - self.n_detections = n_detections - self.boxes, self.keypoints, self.actors = Result.__ordered( - boxes, - keypoints, - actors, - ) + return cls(n_detections, boxes, keypoints, actors) def iter(self) -> Iterable[tuple[Actor, Box, Keypoints]]: return zip(self.actors, self.boxes, self.keypoints) - @property + @cached_property def centres(self) -> FloatArray2: - if self.__centres is not None: - return self.__centres - keypoints = self.keypoints.view() - centres = ( + return ( keypoints[:, YoloKeypoint.RIGHT_SHOULDER, :2] + keypoints[:, YoloKeypoint.LEFT_SHOULDER, :2] ) / 2.0 - self.__centres = centres - - return centres - - @property - def depersonificated_keypoints(self) -> FloatArray2: - if self.__depersonificated_keypoints is not None: - return self.__depersonificated_keypoints - - stacked_keypoints = np.concatenate(self.keypoints) - self.__depersonificated_keypoints = stacked_keypoints - - return stacked_keypoints + @cached_property + def flat_points_with_confidence(self) -> FloatArray2: + return np.concatenate(self.keypoints) + + # TODO: unproject bounding boxes properly (Issue #62) + def unproject(self, calibration: Calibration, depth: FloatArray2) -> 'Result3d': + height, width = depth.shape + n_actors, n_keypoints, _ = self.keypoints.shape + + keypoints = self.flat_points_with_confidence[:, :2].copy() + confidence = self.flat_points_with_confidence[:, 2, np.newaxis].copy() + keypoints_truncated = keypoints.astype(np.int32) + + keypoints_depth = depth[ + np.clip( + keypoints_truncated[:, 1], + a_min=0, + a_max=height - 1, + dtype=np.int32, + ), + np.clip( + keypoints_truncated[:, 0], + a_min=0, + a_max=width - 1, + dtype=np.int32, + ), + ].reshape(-1, 1) + + cx, cy = calibration.optical_center + fx, fy = calibration.focal_length + + keypoints[:, 0] -= cx + keypoints[:, 1] -= cy + keypoints *= keypoints_depth + keypoints[:, 0] /= fx + keypoints[:, 1] /= fy + + unprojected_keypoints = np.concatenate( + ( + keypoints, + keypoints_depth, + confidence, + ), + axis=1, + ).reshape(n_actors, n_keypoints, -1) + + return Result3d( + self.n_detections, + self.boxes, + unprojected_keypoints, + self.actors, + ) def visualize( self, @@ -161,6 +189,59 @@ def visualize( return frame +@dataclass(frozen=True) +class Result3d: + n_detections: int + boxes: FloatArray2 + keypoints: FloatArray3 + actors: list[Actor] + + # Boilerplate to match the `Projectable` protocol and cache the value at the same time. + @property + def flat_points(self) -> FloatArray2: + return self.__flat_points + + @cached_property + def __flat_points(self) -> FloatArray2: + return np.concatenate(self.keypoints[..., :-1]) + + # TODO: project bounding boxes properly (Issue #62) + def project(self, calibration: Calibration) -> Result: + cx, cy = calibration.optical_center + fx, fy = calibration.focal_length + + projected_keypoints = self.keypoints.copy() + projected_keypoints[..., 0] *= fx + projected_keypoints[..., 1] *= fy + projected_keypoints[..., :2] /= projected_keypoints[..., 2, np.newaxis] + projected_keypoints[..., 0] += cx + projected_keypoints[..., 1] += cy + + return Result( + self.n_detections, + self.boxes, + projected_keypoints[..., [0, 1, 3]], + self.actors, + ) + + # TODO: reorder actors if needed (Issue #63), + # transform bounding boxes properly (Issue #62) + def transform(self, transformation: Transformation) -> 'Result3d': + confidence = self.keypoints[..., -1, np.newaxis] + transformed_keypoints = transformation.transform(self.keypoints[..., :-1]) + transformed_keypoint_with_confidence = np.concatenate( + (transformed_keypoints, confidence), + axis=2, + ) + + return Result3d( + self.n_detections, + self.boxes, + transformed_keypoint_with_confidence, + self.actors, + ) + + class Estimator: MODEL_PATH: str = str(MODELS_DIR / 'yolov11x-pose.pt') @@ -270,7 +351,7 @@ def __interpret(self, detections: yolo.Results) -> Result | None: del boxes, keypoints - return Result(n_detections, boxes_cpu, keypoints_cpu, actors) + return Result.ordered(n_detections, boxes_cpu, keypoints_cpu, actors) async def stream(self) -> Fiber[list[Frame] | None, list[Result] | None]: executor = self.executor