Skip to content

Commit

Permalink
Integrated Buffer with heuristic camera transformation estimation
Browse files Browse the repository at this point in the history
Improved heuristic transformation estimation algorithms

Enhanced `demo_sequential` procedure with depth and reprojection visualizations
  • Loading branch information
integraledelebesgue committed Nov 17, 2024
1 parent 24c21ef commit 4f63934
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 204 deletions.
95 changes: 88 additions & 7 deletions child_lab_framework/_procedure/demo_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@

import torch

from ..core import transformation
from ..core.video import Format, Input, Reader, Writer
from ..logging import Logger
from ..task import depth, face, gaze, pose
from ..task.camera import transformation
from ..task.camera.transformation import heuristic as heuristic_transformation
from ..task.visualization import Configuration as VisualizationConfiguration
from ..task.visualization import Visualizer

BATCH_SIZE = 32


def main(
inputs: tuple[Input, Input, Input], device: torch.device, output_directory: Path
inputs: tuple[Input, Input, Input],
device: torch.device,
output_directory: Path,
) -> None:
# ignore exceeded allocation limit on MPS and CUDA - very important!
os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0'
Expand Down Expand Up @@ -49,13 +52,24 @@ def main(

depth_estimator = depth.Estimator(executor, device, input=ceiling_reader.properties)

transformation_estimator = transformation.heuristic.Estimator(
transformation_buffer: transformation.Buffer[str] = transformation.Buffer()

window_left_to_ceiling_transformation_estimator = heuristic_transformation.Estimator(
executor,
transformation_buffer,
window_left_reader.properties,
ceiling_reader.properties,
keypoint_threshold=0.35,
)

window_right_to_ceiling_transformation_estimator = heuristic_transformation.Estimator(
executor,
transformation_buffer,
window_right_reader.properties,
ceiling_reader.properties,
keypoint_threshold=0.35,
)

pose_estimator = pose.Estimator(
executor,
device,
Expand Down Expand Up @@ -114,6 +128,30 @@ def main(
output_format=Format.MP4,
)

ceiling_projection_writer = Writer(
output_directory / (ceiling.name + '_projections.mp4'),
ceiling_reader.properties,
output_format=Format.MP4,
)

ceiling_depth_writer = Writer(
output_directory / (ceiling.name + '_depth.mp4'),
ceiling_reader.properties,
output_format=Format.MP4,
)

window_left_depth_writer = Writer(
output_directory / (window_left.name + '_depth.mp4'),
window_left_reader.properties,
output_format=Format.MP4,
)

window_right_depth_writer = Writer(
output_directory / (window_right.name + '_depth.mp4'),
window_right_reader.properties,
output_format=Format.MP4,
)

window_left_writer = Writer(
output_directory / (window_left.name + '.mp4'),
window_left_reader.properties,
Expand Down Expand Up @@ -160,33 +198,44 @@ def main(

Logger.info('Estimating depth...')
ceiling_depth = depth_estimator.predict(ceiling_frames[0])
window_left_depth = depth_estimator.predict(window_left_frames[0])
window_right_depth = depth_estimator.predict(window_right_frames[0])

ceiling_depths = [ceiling_depth for _ in range(n_frames)]
window_left_depths = [window_left_depth for _ in range(n_frames)]
window_right_depths = [window_right_depth for _ in range(n_frames)]
Logger.info('Done!')

Logger.info('Estimating transformations...')
window_left_to_ceiling = (
transformation_estimator.predict_batch(
window_left_to_ceiling_transformation_estimator.predict_batch(
ceiling_poses,
window_left_poses,
ceiling_depths,
[None for _ in range(n_frames)], # type: ignore # safe to pass
window_left_depths,
)
if ceiling_poses is not None and window_left_poses is not None
else None
)

window_right_to_ceiling = (
transformation_estimator.predict_batch(
window_right_to_ceiling_transformation_estimator.predict_batch(
ceiling_poses,
window_right_poses,
ceiling_depths,
[None for _ in range(n_frames)], # type: ignore # safe to pass
window_right_depths,
)
if ceiling_poses is not None and window_right_poses is not None
else None
)
Logger.info('Done!')

if window_left_to_ceiling is None:
Logger.error('window_left_to_ceiling == None')

if window_right_to_ceiling is None:
Logger.error('window_right_to_ceiling == None')

Logger.info('Detecting faces...')
window_left_faces = (
face_estimator.predict_batch(window_left_frames, window_left_poses)
Expand Down Expand Up @@ -237,7 +286,29 @@ def main(
)
Logger.info('Done!')

if window_left_gazes is None:
Logger.error('window_left_gazes == None')

if window_right_gazes is None:
Logger.error('window_right_gazes == None')

Logger.info('Visualizing results...')
ceiling_projection_annotated_frames = ceiling_visualizer.annotate_batch(
ceiling_frames,
[
p.unproject(window_left_reader.properties.calibration, ceiling_depth)
.transform(t)
.project(ceiling_reader.properties.calibration)
for p, t in zip(window_left_poses or [], window_left_to_ceiling or [])
],
[
p.unproject(window_right_reader.properties.calibration, ceiling_depth)
.transform(t)
.project(ceiling_reader.properties.calibration)
for p, t in zip(window_right_poses or [], window_right_to_ceiling or [])
],
)

ceiling_annotated_frames = ceiling_visualizer.annotate_batch(
ceiling_frames,
ceiling_poses,
Expand All @@ -260,6 +331,16 @@ def main(
Logger.info('Done!')

Logger.info('Saving results...')
ceiling_projection_writer.write_batch(ceiling_projection_annotated_frames)

ceiling_depth_writer.write_batch([depth.to_frame(d) for d in ceiling_depths])
window_left_depth_writer.write_batch(
[depth.to_frame(d) for d in window_left_depths]
)
window_right_depth_writer.write_batch(
[depth.to_frame(d) for d in window_right_depths]
)

ceiling_writer.write_batch(ceiling_annotated_frames)
window_left_writer.write_batch(window_left_annotated_frames)
window_right_writer.write_batch(window_right_annotated_frames)
Expand Down
16 changes: 16 additions & 0 deletions child_lab_framework/core/algebra.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from enum import IntEnum
from typing import Literal

import numpy as np
from scipy.spatial.transform import Rotation

from ..typing.array import FloatArray1, FloatArray2, FloatArray3, FloatArray6
from .calibration import Calibration
Expand Down Expand Up @@ -31,6 +33,20 @@ def rotation_matrix(angle: float, axis: Axis) -> FloatArray2:
)


def euler_angles_from_rotation_matrix(
rotation: FloatArray2,
) -> np.ndarray[tuple[Literal[3]], np.dtype[np.float32]]:
return (
Rotation.from_matrix(rotation).as_euler('xyz', degrees=False).astype(np.float32)
)


def rotation_matrix_from_euler_angles(
angles: np.ndarray[tuple[Literal[3]], np.dtype[np.float32]],
) -> FloatArray2:
return Rotation.from_euler('xyz', angles, degrees=False).as_matrix()


def normalized(vecs: FloatArray2) -> FloatArray2:
norm = np.linalg.norm(vecs, ord=2.0, axis=1)
return vecs / norm
Expand Down
3 changes: 3 additions & 0 deletions child_lab_framework/core/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Format(Enum):

@dataclass(frozen=True, repr=False)
class Properties:
name: str
length: int
height: int
width: int
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(
)

self.__input_properties = Properties(
input.name,
input_length,
input_height,
input_width,
Expand All @@ -102,6 +104,7 @@ def __init__(

# Output properties with maybe mimicked parameters
self.properties = Properties(
input.name,
input_length * self.__frame_repetitions,
mimicked_height,
mimicked_width,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import numpy as np

from .....core.algebra import kabsch
from .....core.calibration import Calibration
from .....core.transformation import EuclideanTransformation
from .....typing.array import FloatArray2, IntArray1, IntArray2
from .... import pose


def estimate(
from_pose: pose.Result,
to_pose: pose.Result,
from_depth: FloatArray2,
to_depth: FloatArray2,
from_calibration: Calibration,
to_calibration: Calibration,
confidence_threshold: float,
) -> EuclideanTransformation | None:
from_cloud, to_cloud = __truncate_to_equal_size(
__cloud_from_bounding_boxes(
from_pose, from_calibration, from_depth, confidence_threshold
),
__cloud_from_bounding_boxes(
to_pose, to_calibration, to_depth, confidence_threshold
),
)

return EuclideanTransformation(*kabsch(from_cloud, to_cloud))


def __cloud_from_bounding_boxes(
poses: pose.Result,
calibration: Calibration,
depth: FloatArray2,
confidence_threshold: float,
) -> FloatArray2:
height, width = depth.shape
cx, cy = calibration.optical_center
fx, fy = calibration.focal_length

space_chunks: list[FloatArray2] = []

boxes: IntArray2 = poses.boxes.astype(np.int32).reshape(-1, 4)
box: IntArray1

for box in boxes:
if box[4] < confidence_threshold:
continue

x_start = max(box[0], 0)
y_start = max(box[1], 0)
x_end = min(box[2], width)
y_end = min(box[2], height)

x_indices, y_indices = np.meshgrid(
np.arange(x_start, x_end, step=1.0, dtype=np.float32),
np.arange(y_start, y_end, step=1.0, dtype=np.float32),
indexing='xy',
)

z = depth[y_start:y_end, x_start:x_end]

x = (x_indices - cx) * z / fx
y = (y_indices - cy) * z / fy

points = np.concatenate(
(x.reshape(-1, 1), y.reshape(-1, 1), z.reshape(-1, 1)),
axis=1,
)

space_chunks.append(points)

return np.concatenate(space_chunks, axis=0, dtype=np.float32, casting='unsafe')


def __truncate_to_equal_size(
points1: FloatArray2,
points2: FloatArray2,
) -> tuple[FloatArray2, FloatArray2]:
n_points1, _ = points1.shape
n_points2, _ = points2.shape

if n_points1 == n_points2:
return points1, points2

elif n_points1 < n_points2:
mask = np.ones(n_points2, dtype=bool)
mask[n_points1:] = False
np.random.shuffle(mask)

return points1, points2[mask]

else:
mask = np.ones(n_points1, dtype=bool)
mask[n_points2:] = False
np.random.shuffle(mask)

return points1[mask], points2
Loading

0 comments on commit 4f63934

Please sign in to comment.