Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesigned visualization #58

Merged
merged 4 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 69 additions & 33 deletions child_lab_framework/_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def __exit__(


@click.command('calibrate')
@click.argument('source', type=Path)
@click.argument('destination', type=Path)
@click.argument('workspace', type=Path)
@click.argument('videos', type=Path, nargs=-1)
@click.option('--square-size', type=float, help='Board square size in centimeters')
@click.option(
'--inner-board-corners',
Expand All @@ -54,24 +54,49 @@ def __exit__(
)
@click_trap()
def calibrate(
source: Path,
destination: Path,
workspace: Path,
videos: list[Path],
square_size: float,
inner_board_corners: tuple[int, int],
skip: int,
) -> None:
click.echo(f'Calibrating camera from {source}...')
video_input_dir = workspace / 'input'
calibration_output_dir = workspace / 'calibration'
video_output_dir = workspace / 'output'

result = calibration_procedure.run(
Input('calibration', source, None),
chessboard.Properties(square_size, *inner_board_corners),
skip,
)
if not workspace.is_dir():
raise ValueError(f'{workspace} is not valid workspace directory')

if not video_input_dir.is_dir():
raise ValueError(f'{video_input_dir} is not valid video input directory')

click.echo(f'Calibration complete! Estimated parameters:\n{result}')
click.echo(f'Saving to {destination}...')
if not calibration_output_dir.is_dir():
raise ValueError(
f'{calibration_output_dir} is not valid calibration output directory'
)

save(result, destination)
if not video_output_dir.is_dir():
raise ValueError(f'{calibration_output_dir} is not valid video output directory')

for video in videos:
click.echo(f'Calibrating camera from {video}...')

video_input = video_input_dir / video
video_output = video_output_dir / video
calibration_output = calibration_output_dir / f'{video.stem}.yml'

calibration = calibration_procedure.run(
video_input,
video_output,
chessboard.Properties(square_size, *inner_board_corners),
skip,
)

click.echo(f'Calibration complete! Estimated parameters:\n{calibration}')
click.echo(f'Saving results to {calibration_output}...')
click.echo('')

save(calibration, calibration_output)


@click.command('estimate-transformations')
Expand Down Expand Up @@ -100,18 +125,30 @@ def estimate_transformations(
device: str | None,
checkpoint: Path | None,
) -> None:
video_dir = workspace / 'input'
calibration_dir = workspace / 'calibration'
destination = workspace / 'buffer.json'
video_input_dir = workspace / 'input'
video_output_dir = workspace / 'output'
calibration_input_dir = workspace / 'calibration'
transformation_output_dir = workspace / 'transformation'
transformation_output = transformation_output_dir / 'buffer.json'

if not workspace.is_dir():
raise ValueError(f'{workspace} is not valid workspace directory')

if not video_dir.is_dir():
raise ValueError(f'{video_dir} is not valid video directory')
if not video_input_dir.is_dir():
raise ValueError(f'{video_input_dir} is not valid video input directory')

if not calibration_dir.is_dir():
raise ValueError(f'{calibration_dir} is not valid calibration directory')
if not video_output_dir.is_dir():
raise ValueError(f'{video_output_dir} is not valid video output directory')

if not calibration_input_dir.is_dir():
raise ValueError(
f'{calibration_input_dir} is not valid calibration input directory'
)

if not transformation_output_dir.is_dir():
raise ValueError(
f'{calibration_input_dir} is not valid transformation output directory'
)

device_handle = torch.device(device or 'cpu')
model = marker.RigidModel(marker_size, 0.0)
Expand All @@ -122,26 +159,25 @@ def estimate_transformations(

config = transformation_procedure.Config(model, dictionary)

video_names = [path.name for path in videos]
video_full_paths = [video_dir / video for video in videos]

calibrations = [
load(Calibration, calibration_dir / (name + '.yml')) for name in video_names
]

inputs = [
transformation_procedure.Input(name, video, calibration)
for name, video, calibration in zip(video_names, video_full_paths, calibrations)
load(Calibration, calibration_input_dir / (video.stem + '.yml'))
for video in videos
]

click.echo('Estimating transformations...')

result = transformation_procedure.run(inputs, config, device_handle)
buffer = transformation_procedure.run(
[video_input_dir / video for video in videos],
[video_output_dir / video for video in videos],
calibrations,
config,
device_handle,
)

click.echo(f'Estimation complete! Estimated transformations:\n{result}')
click.echo(f'Saving results to {destination}...')
click.echo(f'Estimation complete! Estimated transformations:\n{buffer}')
click.echo(f'Saving results to {transformation_output}...')

save(result, destination)
save(buffer, transformation_output)


@click.command('process')
Expand Down
28 changes: 23 additions & 5 deletions child_lab_framework/_procedure/calibrate.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,45 @@
from pathlib import Path

import cv2
import numpy as np
from tqdm import trange

from ..core.calibration import Calibration
from ..core.detection import chessboard
from ..core.video import Input, Reader
from ..core.video import Format, Input, Reader, Writer
from ..task.visualization import Configuration, Visualizer
from ..typing.array import FloatArray2, FloatArray3


# TODO: Implement procedures as classes with `Iterable` protocol
# to make them both usable with tqdm and exportable as purely programistic library elements
def run(
input: Input,
video_source: Path,
annotated_video_destination: Path,
board_properties: chessboard.Properties,
skip: int,
) -> Calibration:
reader = Reader(
input,
Input(video_source.name, video_source, None),
batch_size=1,
)

detector = chessboard.Detector(board_properties)

video_properties = reader.properties

writer = Writer(
annotated_video_destination,
video_properties,
output_format=Format.MP4,
)

visualizer = Visualizer(
None, # type: ignore
properties=video_properties,
configuration=Configuration(),
)

detector = chessboard.Detector(board_properties)

inner_corners_per_row = board_properties.inner_corners_per_row
inner_corners_per_column = board_properties.inner_corners_per_column
square_size = board_properties.square_size
Expand Down Expand Up @@ -53,6 +69,8 @@ def run(
if result is None:
continue

writer.write(visualizer.annotate(frame, result))

object_points.append(chessboard_3d_model)
image_points.append(result.corners)

Expand Down
30 changes: 21 additions & 9 deletions child_lab_framework/_procedure/demo_sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ..logging import Logger
from ..task import depth, face, gaze, pose
from ..task.camera import transformation
from ..task.visualization import Configuration as VisualizationConfiguration
from ..task.visualization import Visualizer

BATCH_SIZE = 32
Expand Down Expand Up @@ -89,26 +90,38 @@ def main(
# social_distance_estimator = social_distance.Estimator(executor)
# social_distance_logger = social_distance.FileLogger('dev/output/distance.csv')

visualizer = Visualizer(
ceiling_visualizer = Visualizer(
executor,
properties=ceiling_reader.properties,
configuration=VisualizationConfiguration(),
)

window_left_visualizer = Visualizer(
executor,
properties=window_left_reader.properties,
confidence_threshold=0.5,
configuration=VisualizationConfiguration(),
)

window_right_visualizer = Visualizer(
executor,
properties=window_right_reader.properties,
configuration=VisualizationConfiguration(),
)

ceiling_writer = Writer(
str(output_directory / (ceiling.name + '.mp4')),
output_directory / (ceiling.name + '.mp4'),
ceiling_reader.properties,
output_format=Format.MP4,
)

window_left_writer = Writer(
str(output_directory / (window_left.name + '.mp4')),
output_directory / (window_left.name + '.mp4'),
window_left_reader.properties,
output_format=Format.MP4,
)

window_right_writer = Writer(
str(output_directory / (window_right.name + '.mp4')),
output_directory / (window_right.name + '.mp4'),
window_right_reader.properties,
output_format=Format.MP4,
)
Expand Down Expand Up @@ -225,21 +238,20 @@ def main(
Logger.info('Done!')

Logger.info('Visualizing results...')
ceiling_annotated_frames = visualizer.annotate_batch(
ceiling_annotated_frames = ceiling_visualizer.annotate_batch(
ceiling_frames,
ceiling_poses,
None,
ceiling_gazes,
)

window_left_annotated_frames = visualizer.annotate_batch(
window_left_annotated_frames = window_left_visualizer.annotate_batch(
window_left_frames,
window_left_poses,
window_left_faces,
window_left_gazes,
)

window_right_annotated_frames = visualizer.annotate_batch(
window_right_annotated_frames = window_right_visualizer.annotate_batch(
window_right_frames,
window_right_poses,
window_right_faces,
Expand Down
51 changes: 37 additions & 14 deletions child_lab_framework/_procedure/estimate_transformations.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from dataclasses import dataclass
from pathlib import Path

import cv2
import torch
from tqdm import tqdm

from ..core import transformation
from ..core.calibration import Calibration
from ..core.detection import marker
from ..core.video import Input, Reader
from ..core.video import Format, Input, Reader, Writer
from ..task import visualization

MARKER_PREFIX = 'marker'

Expand All @@ -21,39 +24,59 @@ class Config:
# TODO: Implement procedures as classes with `Iterable` protocol
# to make them both usable with tqdm and exportable as purely programistic library elements
def run(
inputs: list[Input], config: Config, device: torch.device
video_sources: list[Path],
video_destinations: list[Path],
calibrations: list[Calibration],
config: Config,
device: torch.device,
) -> transformation.Buffer[str]:
if len(inputs) < 2:
if len(video_sources) < 2:
raise ValueError('At least two inputs are required to estimate transformations')

# TODO: Dump AruDice to config
buffer = transformation.Buffer({input.name for input in inputs})
buffer = transformation.Buffer({input.stem for input in video_sources})

readers = [Reader(input, batch_size=1) for input in inputs]
readers = [
Reader(Input(input.stem, input, calibration), batch_size=1)
for input, calibration in zip(video_sources, calibrations)
]

marker_model = config.model
writers = [
Writer(destination, reader.properties, output_format=Format.MP4)
for destination, reader in zip(video_destinations, readers)
]

marker_detector = marker.Detector(
model=marker_model,
detector = marker.Detector(
model=config.model,
dictionary=config.dictionary,
detector_parameters=config.detector_parameters,
)

visualizer = visualization.Visualizer(
None, # type: ignore
properties=readers[0].properties,
configuration=visualization.Configuration(),
)

while not buffer.connected:
frames = [
(reader, frame) for reader in readers if (frame := reader.read()) is not None
views = [
(reader, writer, frame)
for reader, writer in zip(readers, writers)
if (frame := reader.read()) is not None
]

if len(frames) == 0:
if len(views) == 0:
break

for reader, frame in tqdm(frames, 'Processing frames'):
marker_detector.calibration = reader.properties.calibration
markers = marker_detector.predict(frame)
for reader, writer, frame in tqdm(views, 'Processing frames'):
detector.calibration = reader.properties.calibration
markers = detector.predict(frame)

if markers is None:
continue

writer.write(visualizer.annotate(frame, markers))

id: int
for id, marker_transformation in zip(markers.ids, markers.transformations):
if marker_transformation is None:
Expand Down
Loading
Loading