From cfc3a171852e69002048b08cce9e81283a33a69c Mon Sep 17 00:00:00 2001 From: brimoor Date: Tue, 2 Jan 2024 18:43:39 -0500 Subject: [PATCH] report progress of delegated operations --- plugins/annotation/__init__.py | 19 +++++++++- plugins/brain/__init__.py | 65 +++++++++++++++++++++++++++++++++- plugins/evaluation/__init__.py | 9 ++++- plugins/io/__init__.py | 44 ++++++++++++++++++++++- plugins/utils/__init__.py | 38 +++++++++++++++++++- plugins/zoo/__init__.py | 26 +++++++++++++- 6 files changed, 195 insertions(+), 6 deletions(-) diff --git a/plugins/annotation/__init__.py b/plugins/annotation/__init__.py index d21807d7..a3b1597c 100644 --- a/plugins/annotation/__init__.py +++ b/plugins/annotation/__init__.py @@ -6,6 +6,7 @@ | """ import contextlib +import inspect import json import threading @@ -912,12 +913,28 @@ def execute(self, ctx): anno_key = ctx.params["anno_key"] unexpected = ctx.params["unexpected"] cleanup = ctx.params["cleanup"] + delegate = ctx.params.get("delegate", False) _inject_annotation_secrets(ctx) + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(ctx.dataset.load_annotations).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + ctx.dataset.load_annotations( - anno_key, unexpected=unexpected, cleanup=cleanup + anno_key, + unexpected=unexpected, + cleanup=cleanup, + **kwargs, ) + ctx.trigger("reload_dataset") diff --git a/plugins/brain/__init__.py b/plugins/brain/__init__.py index 1990215a..c75d76fa 100644 --- a/plugins/brain/__init__.py +++ b/plugins/brain/__init__.py @@ -7,6 +7,7 @@ """ from collections import defaultdict from datetime import datetime +import inspect import json from bson import json_util @@ -71,6 +72,18 @@ def execute(self, ctx): num_workers = 0 target_view = _get_target_view(ctx, target) + + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(fob.compute_visualization).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + fob.compute_visualization( target_view, patches_field=patches_field, @@ -81,6 +94,7 @@ def execute(self, ctx): batch_size=batch_size, num_workers=num_workers, skip_failures=skip_failures, + **kwargs, ) def resolve_output(self, ctx): @@ -183,6 +197,16 @@ def execute(self, ctx): num_workers = 0 target_view = _get_target_view(ctx, target) + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(fob.compute_similarity).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + fob.compute_similarity( target_view, patches_field=patches_field, @@ -975,6 +999,18 @@ def execute(self, ctx): num_workers = 0 target_view = _get_target_view(ctx, target) + + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(fob.compute_uniqueness).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + fob.compute_uniqueness( target_view, uniqueness_field=uniqueness_field, @@ -984,6 +1020,7 @@ def execute(self, ctx): batch_size=batch_size, num_workers=num_workers, skip_failures=skip_failures, + **kwargs, ) ctx.trigger("reload_dataset") @@ -1061,9 +1098,19 @@ def execute(self, ctx): pred_field = kwargs.pop("pred_field") label_field = kwargs.pop("label_field") mistakenness_field = kwargs.pop("mistakenness_field") - kwargs.pop("delegate") + delegate = kwargs.pop("delegate") target_view = _get_target_view(ctx, target) + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(fob.compute_mistakenness).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + fob.compute_mistakenness( target_view, pred_field, @@ -1071,6 +1118,7 @@ def execute(self, ctx): mistakenness_field=mistakenness_field, **kwargs, ) + ctx.trigger("reload_dataset") @@ -1244,13 +1292,28 @@ def execute(self, ctx): target = ctx.params.get("target", None) label_field = ctx.params.get("label_field") hardness_field = ctx.params.get("hardness_field") + delegate = ctx.params.get("delegate", False) target_view = _get_target_view(ctx, target) + + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(fob.compute_hardness).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + fob.compute_hardness( target_view, label_field, hardness_field=hardness_field, + **kwargs, ) + ctx.trigger("reload_dataset") diff --git a/plugins/evaluation/__init__.py b/plugins/evaluation/__init__.py index 101c7919..f61333f4 100644 --- a/plugins/evaluation/__init__.py +++ b/plugins/evaluation/__init__.py @@ -5,6 +5,7 @@ | `voxel51.com `_ | """ +import inspect import json from bson import json_util @@ -45,7 +46,7 @@ def execute(self, ctx): gt_field = kwargs.pop("gt_field") eval_key = kwargs.pop("eval_key") method = kwargs.pop("method") - kwargs.pop("delegate") + delegate = kwargs.pop("delegate") target_view = _get_target_view(ctx, target) _, eval_type, _ = _get_evaluation_type(target_view, pred_field) @@ -64,6 +65,12 @@ def execute(self, ctx): elif eval_type == "segmentation": eval_fcn = target_view.evaluate_segmentations + if delegate: + # can remove check if we require `fiftyone>=0.24` + if "progress" in inspect.signature(eval_fcn).parameters: + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + eval_fcn( pred_field, gt_field=gt_field, diff --git a/plugins/io/__init__.py b/plugins/io/__init__.py index 4eaf4dde..d8a38095 100644 --- a/plugins/io/__init__.py +++ b/plugins/io/__init__.py @@ -7,6 +7,7 @@ """ import base64 import contextlib +import inspect import multiprocessing.dummy import os @@ -818,8 +819,15 @@ def _import_media_only(ctx): delegate = ctx.params.get("delegate", False) if delegate: + kwargs = {} + + # can remove check if we require `fiftyone>=0.24` + if "progress" in inspect.signature(ctx.dataset.add_samples).parameters: + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + samples = map(make_sample, filepaths) - ctx.dataset.add_samples(samples, num_samples=len(filepaths)) + ctx.dataset.add_samples(samples, num_samples=len(filepaths), **kwargs) return batcher = fou.DynamicBatcher( @@ -852,11 +860,18 @@ def _import_media_and_labels(ctx): label_types = ctx.params.get("label_types", None) tags = ctx.params.get("tags", None) dynamic = ctx.params.get("dynamic", False) + delegate = ctx.params.get("delegate", False) kwargs = ctx.params.get("kwargs", {}) if label_types is not None: kwargs["label_types"] = label_types + if delegate: + # can remove check if we require `fiftyone>=0.24` + if "progress" in inspect.signature(ctx.dataset.add_dir).parameters: + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + ctx.dataset.add_dir( dataset_dir=dataset_dir, dataset_type=dataset_type, @@ -891,12 +906,19 @@ def _import_labels_only(ctx): dataset_dir = _parse_path(ctx, "dataset_dir") label_field = ctx.params.get("label_field", None) dynamic = ctx.params.get("dynamic", False) + delegate = ctx.params.get("delegate", False) kwargs = ctx.params.get("kwargs", {}) label_types = ctx.params.get("label_types", None) if label_types is not None: kwargs["label_types"] = label_types + if delegate: + # can remove check if we require `fiftyone>=0.24` + if "progress" in inspect.signature(ctx.dataset.merge_dir).parameters: + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + with contextlib.ExitStack() as exit_context: if labels_file is not None: tmp_dir = exit_context.enter_context(fos.TempDir()) @@ -2014,6 +2036,7 @@ def _export_samples(ctx): csv_fields = ctx.params.get("csv_fields", None) abs_paths = ctx.params.get("abs_paths", None) manual = ctx.params.get("manual", False) + delegate = ctx.params.get("delegate", False) kwargs = ctx.params.get("kwargs", {}) if _can_export_multiple_fields(dataset_type): @@ -2065,6 +2088,12 @@ def _export_samples(ctx): if "abs_paths" not in kwargs: kwargs["abs_paths"] = abs_paths + if delegate: + # can remove check if we require `fiftyone>=0.24` + if "progress" in inspect.signature(target_view.export).parameters: + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + target_view.export( export_dir=export_dir, dataset_type=dataset_type, @@ -2524,13 +2553,26 @@ def execute(self, ctx): output_dir = _parse_path(ctx, "output_dir") label_fields = ctx.params.get("label_fields", None) overwrite = ctx.params.get("overwrite", False) + delegate = ctx.params.get("delegate", False) target_view = _get_target_view(ctx, target) + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(target_view.draw_labels).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + target_view.draw_labels( output_dir, label_fields=label_fields, overwrite=overwrite, + **kwargs, ) diff --git a/plugins/utils/__init__.py b/plugins/utils/__init__.py index 2c096750..dc061307 100644 --- a/plugins/utils/__init__.py +++ b/plugins/utils/__init__.py @@ -6,6 +6,7 @@ | """ import contextlib +import inspect import json import multiprocessing.dummy @@ -1199,7 +1200,19 @@ def execute(self, ctx): view = _get_target_view(ctx, target) if delegate: - view.compute_metadata(overwrite=overwrite, num_workers=num_workers) + kwargs = {} + + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(view.compute_metadata).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + + view.compute_metadata( + overwrite=overwrite, num_workers=num_workers, **kwargs + ) else: for update in _compute_metadata_generator( ctx, view, overwrite=overwrite, num_workers=num_workers @@ -1502,6 +1515,17 @@ def execute(self, ctx): if not delegate: num_workers = 0 + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(foui.transform_images).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + foui.transform_images( view, size=size, @@ -1509,6 +1533,7 @@ def execute(self, ctx): output_dir=output_dir, num_workers=num_workers, skip_failures=True, + **kwargs, ) if thumbnail_path not in ctx.dataset.app_config.media_fields: @@ -1858,6 +1883,17 @@ def execute(self, ctx): args = ctx.params["args"] kwargs = ctx.params["kwargs"] + # Special handling if we find a `progress` kwarg that is float/int + progress = kwargs.get("progress", None) + if isinstance(progress, float): + # Report progress every `progress` seconds + set_progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(set_progress, dt=progress) + elif isinstance(progress, int): + # Report progress every in `progress` equally-spaced increments + set_progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(set_progress, n=progress) + if has_view: sample_collection = ctx.view elif has_dataset: diff --git a/plugins/zoo/__init__.py b/plugins/zoo/__init__.py index a6b5741c..c39fff4e 100644 --- a/plugins/zoo/__init__.py +++ b/plugins/zoo/__init__.py @@ -6,6 +6,7 @@ | """ from collections import defaultdict +import inspect import fiftyone as fo import fiftyone.operators as foo @@ -46,10 +47,19 @@ def execute(self, ctx): splits = kwargs.pop("splits", None) label_field = kwargs.pop("label_field", None) kwargs.pop("dataset_name", None) - kwargs.pop("delegate", None) + delegate = kwargs.pop("delegate", None) dataset_name = _get_zoo_dataset_name(ctx) + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(foz.load_zoo_dataset).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + dataset = foz.load_zoo_dataset( name, splits=splits, @@ -445,6 +455,17 @@ def execute(self, ctx): if not delegate: num_workers = 0 + kwargs = {} + + if delegate: + # can remove check if we require `fiftyone>=0.24` + if ( + "progress" + in inspect.signature(target_view.apply_model).parameters + ): + progress = lambda pb: ctx.set_progress(progress=pb.progress) + kwargs["progress"] = fo.report_progress(progress, dt=5.0) + if embeddings and patches_field is not None: target_view.compute_patch_embeddings( model, @@ -453,6 +474,7 @@ def execute(self, ctx): batch_size=batch_size, num_workers=num_workers, skip_failures=skip_failures, + **kwargs, ) elif embeddings: target_view.compute_embeddings( @@ -461,6 +483,7 @@ def execute(self, ctx): batch_size=batch_size, num_workers=num_workers, skip_failures=skip_failures, + **kwargs, ) else: target_view.apply_model( @@ -473,6 +496,7 @@ def execute(self, ctx): skip_failures=skip_failures, output_dir=output_dir, rel_dir=rel_dir, + **kwargs, )