From 78507f5b2de419ab74ba65e0e972441c3dde799f Mon Sep 17 00:00:00 2001 From: vorozhkog Date: Mon, 2 Dec 2024 19:12:50 +0000 Subject: [PATCH] back to batches, timer improvements, ann async download --- dev_requirements.txt | 2 +- src/globals.py | 21 +++++++++++ src/main.py | 87 ++++++++++++++++++++++---------------------- 3 files changed, 65 insertions(+), 45 deletions(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index 677de06..2ec3e86 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,4 +1,4 @@ -supervisely==6.73.162 +supervisely==6.73.242 lxml numpy>=1.19.4 Pillow>=8.0.1 diff --git a/src/globals.py b/src/globals.py index f01f463..ce7b98c 100644 --- a/src/globals.py +++ b/src/globals.py @@ -3,6 +3,7 @@ import supervisely as sly from dotenv import load_dotenv from distutils.util import strtobool +import time if sly.is_development(): load_dotenv("local.env") @@ -56,3 +57,23 @@ raise ValueError( f"train_val_split_coef should be between 0 and 1, your data is {TRAIN_VAL_SPLIT_COEF}" ) + +class Timer: + def __init__(self, message=None, items_cnt=None): + self.message = message + self.items_cnt = items_cnt + self.elapsed = 0 + + def __enter__(self): + self.start = time.perf_counter() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.end = time.perf_counter() + self.elapsed = self.end - self.start + msg = self.message or "Block execution" + if self.items_cnt is not None: + log_msg = f"{msg} time: {self.elapsed:.3f} seconds per {self.items_cnt} items ({self.elapsed/self.items_cnt:.3f} seconds per item)" + else: + log_msg = f"{msg} time: {self.elapsed:.3f} seconds" + sly.logger.info(log_msg) \ No newline at end of file diff --git a/src/main.py b/src/main.py index 285875a..bbd3334 100644 --- a/src/main.py +++ b/src/main.py @@ -8,7 +8,6 @@ import utils import asyncio -from tinytimer import Timer @sly.handle_exceptions(has_ui=False) def from_sly_to_pascal(api: sly.Api): @@ -54,11 +53,6 @@ def from_sly_to_pascal(api: sly.Api): "Preparing images for export", total_images_cnt, sly.logger ) - if api.server_address.startswith("https://"): - semaphore = asyncio.Semaphore(100) - else: - semaphore = None - for dataset in datasets: if dataset.name in dataset_names: is_trainval = 1 @@ -66,46 +60,51 @@ def from_sly_to_pascal(api: sly.Api): is_trainval = 0 images = api.image.get_list(dataset.id) - image_ids = [image_info.id for image_info in images] + for batch in sly.batched(images): + image_ids = [image_info.id for image_info in batch] - if g.ADD_PREFIX_TO_IMAGES: - image_paths = [ - os.path.join(result_images_dir, f"{dataset.id}_{image_info.name}") - for image_info in images - ] - else: - image_paths = [ - os.path.join(result_images_dir, image_info.name) for image_info in images - ] - for idx, path in enumerate(image_paths): - if os.path.exists(path): - img_name = os.path.basename(path) - name, ext = os.path.splitext(img_name) - i = 1 - new_name = f"{name}_{i}{ext}" - while os.path.exists(os.path.join(result_images_dir, new_name)): - i += 1 - new_name = f"{name}_{i}{ext}" - sly.logger.warn( - f"Image {img_name} already exists in the directory. New name: {new_name}" - ) - image_paths[idx] = os.path.join(result_images_dir, new_name) - - with Timer() as t: - coro = api.image.download_paths_async(image_ids, image_paths, semaphore) - loop = sly.utils.get_or_create_event_loop() - if loop.is_running(): - future = asyncio.run_coroutine_threadsafe(coro, loop) - future.result() + if g.ADD_PREFIX_TO_IMAGES: + image_paths = [ + os.path.join(result_images_dir, f"{dataset.id}_{image_info.name}") + for image_info in images + ] else: - loop.run_until_complete(coro) - sly.logger.info( - f"Downloading time: {t.elapsed:.4f} seconds per {len(image_ids)} images ({t.elapsed/len(image_ids):.4f} seconds per image)" - ) - - for batch in sly.batched(images): - # api.image.download_paths(dataset.id, image_ids, image_paths) - ann_infos = api.annotation.download_batch(dataset.id, image_ids) + image_paths = [ + os.path.join(result_images_dir, image_info.name) for image_info in images + ] + for idx, path in enumerate(image_paths): + if os.path.exists(path): + img_name = os.path.basename(path) + name, ext = os.path.splitext(img_name) + i = 1 + new_name = f"{name}_{i}{ext}" + while os.path.exists(os.path.join(result_images_dir, new_name)): + i += 1 + new_name = f"{name}_{i}{ext}" + sly.logger.warn( + f"Image {img_name} already exists in the directory. New name: {new_name}" + ) + image_paths[idx] = os.path.join(result_images_dir, new_name) + + with g.Timer("Image downloading", len(image_ids)): + coro = api.image.download_paths_async(image_ids, image_paths) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + future.result() + else: + loop.run_until_complete(coro) + + ann_infos = [] + with g.Timer("Annotation downloading", len(image_ids)): + coro = api.annotation.download_batch_async(dataset.id, image_ids) + loop = sly.utils.get_or_create_event_loop() + if loop.is_running(): + future = asyncio.run_coroutine_threadsafe(coro, loop) + ann_infos.extend(future.result()) + else: + ann_infos.extend(loop.run_until_complete(coro)) + for image_info, ann_info, img_path in zip(batch, ann_infos, image_paths): cur_img_filename = os.path.basename(img_path) img_title, img_ext = os.path.splitext(cur_img_filename)