Skip to content

Commit

Permalink
report progress of delegated operations
Browse files Browse the repository at this point in the history
  • Loading branch information
brimoor committed Jan 29, 2024
1 parent 6d1de93 commit cfc3a17
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 6 deletions.
19 changes: 18 additions & 1 deletion plugins/annotation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
|
"""
import contextlib
import inspect
import json
import threading

Expand Down Expand Up @@ -912,12 +913,28 @@ def execute(self, ctx):
anno_key = ctx.params["anno_key"]
unexpected = ctx.params["unexpected"]
cleanup = ctx.params["cleanup"]
delegate = ctx.params.get("delegate", False)

_inject_annotation_secrets(ctx)

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(ctx.dataset.load_annotations).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

ctx.dataset.load_annotations(
anno_key, unexpected=unexpected, cleanup=cleanup
anno_key,
unexpected=unexpected,
cleanup=cleanup,
**kwargs,
)

ctx.trigger("reload_dataset")


Expand Down
65 changes: 64 additions & 1 deletion plugins/brain/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
from collections import defaultdict
from datetime import datetime
import inspect
import json

from bson import json_util
Expand Down Expand Up @@ -71,6 +72,18 @@ def execute(self, ctx):
num_workers = 0

target_view = _get_target_view(ctx, target)

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(fob.compute_visualization).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

fob.compute_visualization(
target_view,
patches_field=patches_field,
Expand All @@ -81,6 +94,7 @@ def execute(self, ctx):
batch_size=batch_size,
num_workers=num_workers,
skip_failures=skip_failures,
**kwargs,
)

def resolve_output(self, ctx):
Expand Down Expand Up @@ -183,6 +197,16 @@ def execute(self, ctx):
num_workers = 0

target_view = _get_target_view(ctx, target)

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(fob.compute_similarity).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

fob.compute_similarity(
target_view,
patches_field=patches_field,
Expand Down Expand Up @@ -975,6 +999,18 @@ def execute(self, ctx):
num_workers = 0

target_view = _get_target_view(ctx, target)

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(fob.compute_uniqueness).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

fob.compute_uniqueness(
target_view,
uniqueness_field=uniqueness_field,
Expand All @@ -984,6 +1020,7 @@ def execute(self, ctx):
batch_size=batch_size,
num_workers=num_workers,
skip_failures=skip_failures,
**kwargs,
)

ctx.trigger("reload_dataset")
Expand Down Expand Up @@ -1061,16 +1098,27 @@ def execute(self, ctx):
pred_field = kwargs.pop("pred_field")
label_field = kwargs.pop("label_field")
mistakenness_field = kwargs.pop("mistakenness_field")
kwargs.pop("delegate")
delegate = kwargs.pop("delegate")

target_view = _get_target_view(ctx, target)

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(fob.compute_mistakenness).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

fob.compute_mistakenness(
target_view,
pred_field,
label_field,
mistakenness_field=mistakenness_field,
**kwargs,
)

ctx.trigger("reload_dataset")


Expand Down Expand Up @@ -1244,13 +1292,28 @@ def execute(self, ctx):
target = ctx.params.get("target", None)
label_field = ctx.params.get("label_field")
hardness_field = ctx.params.get("hardness_field")
delegate = ctx.params.get("delegate", False)

target_view = _get_target_view(ctx, target)

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(fob.compute_hardness).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

fob.compute_hardness(
target_view,
label_field,
hardness_field=hardness_field,
**kwargs,
)

ctx.trigger("reload_dataset")


Expand Down
9 changes: 8 additions & 1 deletion plugins/evaluation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""
import inspect
import json

from bson import json_util
Expand Down Expand Up @@ -45,7 +46,7 @@ def execute(self, ctx):
gt_field = kwargs.pop("gt_field")
eval_key = kwargs.pop("eval_key")
method = kwargs.pop("method")
kwargs.pop("delegate")
delegate = kwargs.pop("delegate")

target_view = _get_target_view(ctx, target)
_, eval_type, _ = _get_evaluation_type(target_view, pred_field)
Expand All @@ -64,6 +65,12 @@ def execute(self, ctx):
elif eval_type == "segmentation":
eval_fcn = target_view.evaluate_segmentations

if delegate:
# can remove check if we require `fiftyone>=0.24`
if "progress" in inspect.signature(eval_fcn).parameters:
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

eval_fcn(
pred_field,
gt_field=gt_field,
Expand Down
44 changes: 43 additions & 1 deletion plugins/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
import base64
import contextlib
import inspect
import multiprocessing.dummy
import os

Expand Down Expand Up @@ -818,8 +819,15 @@ def _import_media_only(ctx):
delegate = ctx.params.get("delegate", False)

if delegate:
kwargs = {}

# can remove check if we require `fiftyone>=0.24`
if "progress" in inspect.signature(ctx.dataset.add_samples).parameters:
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

samples = map(make_sample, filepaths)
ctx.dataset.add_samples(samples, num_samples=len(filepaths))
ctx.dataset.add_samples(samples, num_samples=len(filepaths), **kwargs)
return

batcher = fou.DynamicBatcher(
Expand Down Expand Up @@ -852,11 +860,18 @@ def _import_media_and_labels(ctx):
label_types = ctx.params.get("label_types", None)
tags = ctx.params.get("tags", None)
dynamic = ctx.params.get("dynamic", False)
delegate = ctx.params.get("delegate", False)
kwargs = ctx.params.get("kwargs", {})

if label_types is not None:
kwargs["label_types"] = label_types

if delegate:
# can remove check if we require `fiftyone>=0.24`
if "progress" in inspect.signature(ctx.dataset.add_dir).parameters:
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

ctx.dataset.add_dir(
dataset_dir=dataset_dir,
dataset_type=dataset_type,
Expand Down Expand Up @@ -891,12 +906,19 @@ def _import_labels_only(ctx):
dataset_dir = _parse_path(ctx, "dataset_dir")
label_field = ctx.params.get("label_field", None)
dynamic = ctx.params.get("dynamic", False)
delegate = ctx.params.get("delegate", False)
kwargs = ctx.params.get("kwargs", {})

label_types = ctx.params.get("label_types", None)
if label_types is not None:
kwargs["label_types"] = label_types

if delegate:
# can remove check if we require `fiftyone>=0.24`
if "progress" in inspect.signature(ctx.dataset.merge_dir).parameters:
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

with contextlib.ExitStack() as exit_context:
if labels_file is not None:
tmp_dir = exit_context.enter_context(fos.TempDir())
Expand Down Expand Up @@ -2014,6 +2036,7 @@ def _export_samples(ctx):
csv_fields = ctx.params.get("csv_fields", None)
abs_paths = ctx.params.get("abs_paths", None)
manual = ctx.params.get("manual", False)
delegate = ctx.params.get("delegate", False)
kwargs = ctx.params.get("kwargs", {})

if _can_export_multiple_fields(dataset_type):
Expand Down Expand Up @@ -2065,6 +2088,12 @@ def _export_samples(ctx):
if "abs_paths" not in kwargs:
kwargs["abs_paths"] = abs_paths

if delegate:
# can remove check if we require `fiftyone>=0.24`
if "progress" in inspect.signature(target_view.export).parameters:
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

target_view.export(
export_dir=export_dir,
dataset_type=dataset_type,
Expand Down Expand Up @@ -2524,13 +2553,26 @@ def execute(self, ctx):
output_dir = _parse_path(ctx, "output_dir")
label_fields = ctx.params.get("label_fields", None)
overwrite = ctx.params.get("overwrite", False)
delegate = ctx.params.get("delegate", False)

target_view = _get_target_view(ctx, target)

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(target_view.draw_labels).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

target_view.draw_labels(
output_dir,
label_fields=label_fields,
overwrite=overwrite,
**kwargs,
)


Expand Down
38 changes: 37 additions & 1 deletion plugins/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
|
"""
import contextlib
import inspect
import json
import multiprocessing.dummy

Expand Down Expand Up @@ -1199,7 +1200,19 @@ def execute(self, ctx):
view = _get_target_view(ctx, target)

if delegate:
view.compute_metadata(overwrite=overwrite, num_workers=num_workers)
kwargs = {}

# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(view.compute_metadata).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

view.compute_metadata(
overwrite=overwrite, num_workers=num_workers, **kwargs
)
else:
for update in _compute_metadata_generator(
ctx, view, overwrite=overwrite, num_workers=num_workers
Expand Down Expand Up @@ -1502,13 +1515,25 @@ def execute(self, ctx):
if not delegate:
num_workers = 0

kwargs = {}

if delegate:
# can remove check if we require `fiftyone>=0.24`
if (
"progress"
in inspect.signature(foui.transform_images).parameters
):
progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(progress, dt=5.0)

foui.transform_images(
view,
size=size,
output_field=thumbnail_path,
output_dir=output_dir,
num_workers=num_workers,
skip_failures=True,
**kwargs,
)

if thumbnail_path not in ctx.dataset.app_config.media_fields:
Expand Down Expand Up @@ -1858,6 +1883,17 @@ def execute(self, ctx):
args = ctx.params["args"]
kwargs = ctx.params["kwargs"]

# Special handling if we find a `progress` kwarg that is float/int
progress = kwargs.get("progress", None)
if isinstance(progress, float):
# Report progress every `progress` seconds
set_progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(set_progress, dt=progress)
elif isinstance(progress, int):
# Report progress every in `progress` equally-spaced increments
set_progress = lambda pb: ctx.set_progress(progress=pb.progress)
kwargs["progress"] = fo.report_progress(set_progress, n=progress)

if has_view:
sample_collection = ctx.view
elif has_dataset:
Expand Down
Loading

0 comments on commit cfc3a17

Please sign in to comment.