From c94a73ac371224859d65c1802399ad2dfe91f18f Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Mon, 16 Dec 2024 16:25:12 +0100 Subject: [PATCH 01/11] update outputs.py to pass workunit_definition --- app_runner/src/app_runner/cli/outputs.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/app_runner/src/app_runner/cli/outputs.py b/app_runner/src/app_runner/cli/outputs.py index 5e9e42a..f60e853 100644 --- a/app_runner/src/app_runner/cli/outputs.py +++ b/app_runner/src/app_runner/cli/outputs.py @@ -8,7 +8,7 @@ from app_runner.specs.outputs_spec import OutputsSpec from bfabric import Bfabric from bfabric.cli_formatting import setup_script_logging -from bfabric.entities import Workunit +from bfabric.experimental.workunit_definition import WorkunitDefinition app_outputs = cyclopts.App("outputs", help="Register output files for an app.") @@ -16,8 +16,7 @@ @app_outputs.command() def register( outputs_yaml: Path, - # TODO we should use the workunit definition instead - workunit_id: int, + workunit_ref: int | Path, *, ssh_user: str | None = None, # TODO @@ -26,16 +25,13 @@ def register( """Register the output files of a workunit.""" setup_script_logging() client = Bfabric.from_config() - + workunit_ref = workunit_ref.resolve() if isinstance(workunit_ref, Path) else workunit_ref + # TODO can we do better and provide a cache_file even here? + workunit_definition = WorkunitDefinition.from_ref(workunit=workunit_ref, client=client, cache_file=None) specs_list = OutputsSpec.read_yaml(outputs_yaml) - workunit = Workunit.find(id=workunit_id, client=client) - if workunit is None: - msg = f"Workunit with id {workunit_id} not found" - raise ValueError(msg) - register_all( client=client, - workunit=workunit, + workunit_definition=workunit_definition, specs_list=specs_list, ssh_user=ssh_user, reuse_default_resource=reuse_default_resource, From be61836365f2d00b5c7ef3e2e25001a97510b479 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Mon, 16 Dec 2024 16:44:31 +0100 Subject: [PATCH 02/11] partially refactor code to use workunit_definition in output registration --- .../output_registration/register.py | 62 +++++++++++-------- .../experimental/workunit_definition.py | 4 ++ 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index a14a8bb..83288d0 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -1,9 +1,9 @@ from __future__ import annotations +from typing import TYPE_CHECKING from loguru import logger -from bfabric.entities import Storage, Workunit from app_runner.specs.outputs_spec import ( CopyResourceSpec, UpdateExisting, @@ -13,18 +13,18 @@ ) from app_runner.util.checksums import md5sum from app_runner.util.scp import scp +from bfabric.entities import Storage, Workunit from bfabric_scripts.bfabric_save_csv2dataset import bfabric_save_csv2dataset -from glom import glom -from typing import TYPE_CHECKING if TYPE_CHECKING: from pathlib import Path from bfabric import Bfabric + from bfabric.experimental.workunit_definition import WorkunitDefinition -def _get_output_folder(spec: CopyResourceSpec, workunit: Workunit) -> Path: +def _get_output_folder(spec: CopyResourceSpec, workunit_definition: WorkunitDefinition) -> Path: if not spec.store_folder_path: - return workunit.store_output_folder + return workunit_definition.storage_output_folder else: return spec.store_folder_path @@ -32,8 +32,7 @@ def _get_output_folder(spec: CopyResourceSpec, workunit: Workunit) -> Path: def register_file_in_workunit( spec: CopyResourceSpec, client: Bfabric, - workunit: Workunit, - storage: Storage, + workunit_definition: WorkunitDefinition, resource_id: int | None = None, ) -> None: """Registers a file in the workunit.""" @@ -41,11 +40,11 @@ def register_file_in_workunit( # TODO implement this functionality raise NotImplementedError("Update existing not implemented") checksum = md5sum(spec.local_path) - output_folder = _get_output_folder(spec, workunit=workunit) + output_folder = _get_output_folder(spec, workunit_definition=workunit_definition) resource_data = { "name": spec.store_entry_path.name, - "workunitid": workunit.id, - "storageid": storage.id, + "workunitid": workunit_definition.registration.workunit_id, + "storageid": workunit_definition.registration.storage_id, "relativepath": output_folder / spec.store_entry_path, "filechecksum": checksum, "status": "available", @@ -56,14 +55,17 @@ def register_file_in_workunit( client.save("resource", resource_data) -def copy_file_to_storage(spec: CopyResourceSpec, workunit: Workunit, storage: Storage, ssh_user: str | None) -> None: +def copy_file_to_storage( + spec: CopyResourceSpec, workunit_definition: WorkunitDefinition, storage: Storage, ssh_user: str | None +) -> None: """Copies a file to the storage, according to the spec.""" - output_folder = _get_output_folder(spec, workunit=workunit) + # TODO here some direct uses of storage could still be optimized away + output_folder = _get_output_folder(spec, workunit_definition=workunit_definition) output_uri = f"{storage.scp_prefix}{output_folder / spec.store_entry_path}" scp(spec.local_path, output_uri, user=ssh_user) -def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit: Workunit) -> None: +def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit_definition: WorkunitDefinition) -> None: """Saves a dataset to the bfabric.""" # TODO should not print to stdout in the future # TODO also it should not be imported from bfabric_scripts, but rather the generic functionality should be available @@ -72,8 +74,8 @@ def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit: Workunit) -> client=client, csv_file=spec.local_path, dataset_name=spec.name or spec.local_path.stem, - container_id=workunit.container.id, - workunit_id=workunit.id, + container_id=workunit_definition.registration.container_id, + workunit_id=workunit_definition.registration.workunit_id, sep=spec.separator, has_header=spec.has_header, invalid_characters=spec.invalid_characters, @@ -92,30 +94,40 @@ def find_default_resource_id(workunit: Workunit) -> int | None: def register_all( - client: Bfabric, workunit: Workunit, specs_list: list[SpecType], ssh_user: str | None, reuse_default_resource: bool + client: Bfabric, + workunit_definition: WorkunitDefinition, + specs_list: list[SpecType], + ssh_user: str | None, + reuse_default_resource: bool, ) -> None: """Registers all the output specs to the workunit.""" default_resource_was_reused = not reuse_default_resource for spec in specs_list: logger.debug(f"Registering {spec}") if isinstance(spec, CopyResourceSpec): - storage = glom(workunit, "application.storage") - copy_file_to_storage(spec, workunit=workunit, storage=storage, ssh_user=ssh_user) + storage = Storage.find(workunit_definition.registration.storage_id, client=client) + copy_file_to_storage(spec, workunit_definition=workunit_definition, storage=storage, ssh_user=ssh_user) if not default_resource_was_reused: - resource_id = find_default_resource_id(workunit=workunit) + resource_id = find_default_resource_id( + # TODO maybe this could be made a bit more cleanly (or i actually wonder if it should be a method + # in the WorkunitDefinition class) + workunit=Workunit.find(id=workunit_definition.registration.workunit_id, client=client) + ) default_resource_was_reused = True else: resource_id = None - register_file_in_workunit(spec, client=client, workunit=workunit, storage=storage, resource_id=resource_id) + register_file_in_workunit( + spec, client=client, workunit_definition=workunit_definition, resource_id=resource_id + ) elif isinstance(spec, SaveDatasetSpec): - _save_dataset(spec, client, workunit=workunit) + _save_dataset(spec, client, workunit_definition=workunit_definition) else: raise ValueError(f"Unknown spec type: {type(spec)}") def register_outputs( outputs_yaml: Path, - workunit_id: int, + workunit_definition: WorkunitDefinition, client: Bfabric, ssh_user: str | None, reuse_default_resource: bool, @@ -126,13 +138,9 @@ def register_outputs( specs_list = OutputsSpec.read_yaml(outputs_yaml) # register all specs - workunit = Workunit.find(id=workunit_id, client=client) - if workunit is None: - msg = f"Workunit with id {workunit_id} not found" - raise ValueError(msg) register_all( client=client, - workunit=workunit, + workunit_definition=workunit_definition, specs_list=specs_list, ssh_user=ssh_user, reuse_default_resource=reuse_default_resource, diff --git a/src/bfabric/experimental/workunit_definition.py b/src/bfabric/experimental/workunit_definition.py index 55719f7..0da7c2f 100644 --- a/src/bfabric/experimental/workunit_definition.py +++ b/src/bfabric/experimental/workunit_definition.py @@ -55,6 +55,8 @@ class WorkunitRegistrationDefinition(BaseModel): workunit_id: int container_id: int + storage_id: int + storage_output_folder: Path container_type: Literal["project", "order"] @classmethod @@ -64,6 +66,8 @@ def from_workunit(cls, workunit: Workunit) -> WorkunitRegistrationDefinition: "workunit_id": workunit.id, "container_id": workunit.container.id, "container_type": workunit.container.ENDPOINT, + "storage_id": workunit.application.storage.id, + "storage_output_folder": workunit.store_output_folder, } return cls.model_validate(data) From d13b937f8a6472c88b9611f67cebf8d49a4e40eb Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Mon, 16 Dec 2024 16:46:22 +0100 Subject: [PATCH 03/11] adapt to changes --- app_runner/src/app_runner/app_runner/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app_runner/src/app_runner/app_runner/runner.py b/app_runner/src/app_runner/app_runner/runner.py index abfc938..1594de0 100644 --- a/app_runner/src/app_runner/app_runner/runner.py +++ b/app_runner/src/app_runner/app_runner/runner.py @@ -6,12 +6,12 @@ from typing import TYPE_CHECKING import yaml -from bfabric.experimental.workunit_definition import WorkunitDefinition from loguru import logger from pydantic import BaseModel from app_runner.input_preparation import prepare_folder from app_runner.output_registration import register_outputs +from bfabric.experimental.workunit_definition import WorkunitDefinition if TYPE_CHECKING: from app_runner.specs.app_spec import AppSpec @@ -52,7 +52,7 @@ def run_register_outputs(self, chunk_dir: Path, workunit_ref: int | Path, reuse_ raise ValueError(msg) register_outputs( outputs_yaml=chunk_dir / "outputs.yml", - workunit_id=registration.workunit_id, + workunit_definition=workunit_definition, client=self._client, ssh_user=self._ssh_user, reuse_default_resource=reuse_default_resource, From e539d98cddefad1589fe193447f66272ea318108 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 16:41:44 +0100 Subject: [PATCH 04/11] merge --- app_runner/src/app_runner/output_registration/register.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index ea3bc1c..486ad31 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -24,9 +24,8 @@ def _get_output_folder(spec: CopyResourceSpec, workunit_definition: WorkunitDefinition) -> Path: - # TODO check if it needs further fixes if not spec.store_folder_path: - return workunit_definition.storage_output_folder + return workunit_definition.registration.storage_output_folder else: return spec.store_folder_path From 2a1e01419622bfff1a672a6b55c70fdc96e217f9 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 16:43:53 +0100 Subject: [PATCH 05/11] simplify --- app_runner/src/app_runner/app_runner/runner.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/app_runner/src/app_runner/app_runner/runner.py b/app_runner/src/app_runner/app_runner/runner.py index 1594de0..0d688fb 100644 --- a/app_runner/src/app_runner/app_runner/runner.py +++ b/app_runner/src/app_runner/app_runner/runner.py @@ -46,10 +46,6 @@ def run_process(self, chunk_dir: Path) -> None: def run_register_outputs(self, chunk_dir: Path, workunit_ref: int | Path, reuse_default_resource: bool) -> None: workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=self._client) - registration = workunit_definition.registration - if registration is None: - msg = "Workunit definition does not provide registration information" - raise ValueError(msg) register_outputs( outputs_yaml=chunk_dir / "outputs.yml", workunit_definition=workunit_definition, From 07e7f9ea95484dc051a3970049401d66113a70c0 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 16:47:40 +0100 Subject: [PATCH 06/11] remove redundancy --- app_runner/src/app_runner/app_runner/runner.py | 18 +++++------------- app_runner/src/app_runner/cli/chunk.py | 12 ++++++++++-- .../app_runner/output_registration/register.py | 4 ---- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/app_runner/src/app_runner/app_runner/runner.py b/app_runner/src/app_runner/app_runner/runner.py index 0d688fb..696aeac 100644 --- a/app_runner/src/app_runner/app_runner/runner.py +++ b/app_runner/src/app_runner/app_runner/runner.py @@ -44,16 +44,6 @@ def run_process(self, chunk_dir: Path) -> None: logger.info(f"Running process command: {shlex.join(command)}") subprocess.run(command, check=True) - def run_register_outputs(self, chunk_dir: Path, workunit_ref: int | Path, reuse_default_resource: bool) -> None: - workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=self._client) - register_outputs( - outputs_yaml=chunk_dir / "outputs.yml", - workunit_definition=workunit_definition, - client=self._client, - ssh_user=self._ssh_user, - reuse_default_resource=reuse_default_resource, - ) - class ChunksFile(BaseModel): # TODO move to better location @@ -92,9 +82,11 @@ def run_app( runner.run_process(chunk_dir=chunk) runner.run_collect(workunit_ref=workunit_definition_file, chunk_dir=chunk) if not read_only: - runner.run_register_outputs( - chunk_dir=chunk, - workunit_ref=workunit_definition_file, + register_outputs( + outputs_yaml=chunk / "outputs.yml", + workunit_definition=workunit_definition, + client=client, + ssh_user=ssh_user, reuse_default_resource=app_spec.reuse_default_resource, ) diff --git a/app_runner/src/app_runner/cli/chunk.py b/app_runner/src/app_runner/cli/chunk.py index fc6a606..ffd0587 100644 --- a/app_runner/src/app_runner/cli/chunk.py +++ b/app_runner/src/app_runner/cli/chunk.py @@ -5,11 +5,13 @@ import cyclopts import yaml +from app_runner.output_registration import register_outputs from app_runner.specs.app_spec import AppSpec from app_runner.app_runner.runner import run_app, Runner from bfabric import Bfabric from bfabric.cli_formatting import setup_script_logging from bfabric.experimental.entity_lookup_cache import EntityLookupCache +from bfabric.experimental.workunit_definition import WorkunitDefinition app_chunk = cyclopts.App("chunk", help="Run an app on a chunk. You can create the chunks with `app dispatch`.") @@ -90,7 +92,13 @@ def outputs( runner = Runner(spec=app_spec_parsed, client=client, ssh_user=ssh_user) runner.run_collect(workunit_ref=workunit_ref, chunk_dir=chunk_dir) + # TODO specify cache file + workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=client) if not read_only: - runner.run_register_outputs( - chunk_dir=chunk_dir, workunit_ref=workunit_ref, reuse_default_resource=reuse_default_resource + register_outputs( + outputs_yaml=chunk_dir / "outputs.yml", + workunit_definition=workunit_definition, + client=client, + ssh_user=ssh_user, + reuse_default_resource=reuse_default_resource, ) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index 486ad31..2ac2391 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -154,11 +154,7 @@ def register_outputs( reuse_default_resource: bool, ) -> None: """Registers outputs to the workunit.""" - # TODO it seems there is some redundancy here (i.e. there is also the implementation in runner) - # parse the specs specs_list = OutputsSpec.read_yaml(outputs_yaml) - - # register all specs register_all( client=client, workunit_definition=workunit_definition, From fa1d568ff64686a373335b10301965dbc161ceba Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 16:59:43 +0100 Subject: [PATCH 07/11] fix --- app_runner/src/app_runner/output_registration/register.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index 2ac2391..77b3d6d 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -67,7 +67,8 @@ def _identify_existing_resource_id( if spec.update_existing in (UpdateExisting.IF_EXISTS, UpdateExisting.REQUIRED): # TODO which are actually the unique fields that should be checked? resources = Resource.find_by( - {"name": spec.store_entry_path.name, "workunitid": workunit_definition.id}, client=client + {"name": spec.store_entry_path.name, "workunitid": workunit_definition.registration.workunit_id}, + client=client, ).values() if resources: return list(resources)[0].id From 205211d2e7f37a6b8633e3d739565f4bd6eb735b Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 17:04:25 +0100 Subject: [PATCH 08/11] refactor --- .../src/app_runner/output_registration/register.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index 77b3d6d..bff8b65 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -104,8 +104,9 @@ def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit_definition: W ) -def find_default_resource_id(workunit: Workunit) -> int | None: +def find_default_resource_id(workunit_definition: WorkunitDefinition, client: Bfabric) -> int | None: """Finds the default resource's id for the workunit. Maybe in the future, this will be always `None`.""" + workunit = Workunit.find(id=workunit_definition.registration.workunit_id, client=client) candidate_resources = [ resource for resource in workunit.resources if resource["name"] not in ["slurm_stdout", "slurm_stderr"] ] @@ -130,11 +131,7 @@ def register_all( storage = Storage.find(workunit_definition.registration.storage_id, client=client) copy_file_to_storage(spec, workunit_definition=workunit_definition, storage=storage, ssh_user=ssh_user) if not default_resource_was_reused: - resource_id = find_default_resource_id( - # TODO maybe this could be made a bit more cleanly (or i actually wonder if it should be a method - # in the WorkunitDefinition class) - workunit=Workunit.find(id=workunit_definition.registration.workunit_id, client=client) - ) + resource_id = find_default_resource_id(workunit_definition=workunit_definition, client=client) default_resource_was_reused = True else: resource_id = None From 83016e9666431bc64acdacfbd9d1938e01a088c3 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 17:07:36 +0100 Subject: [PATCH 09/11] update todo --- app_runner/src/app_runner/output_registration/register.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app_runner/src/app_runner/output_registration/register.py b/app_runner/src/app_runner/output_registration/register.py index bff8b65..a8b57e2 100644 --- a/app_runner/src/app_runner/output_registration/register.py +++ b/app_runner/src/app_runner/output_registration/register.py @@ -65,7 +65,8 @@ def _identify_existing_resource_id( ) -> int | None: """Returns the id of the existing resource if it exists.""" if spec.update_existing in (UpdateExisting.IF_EXISTS, UpdateExisting.REQUIRED): - # TODO which are actually the unique fields that should be checked? + # TODO maybe it would be more accurate to use relativepath here, however historically it would often start + # with `/` which can be confusing. resources = Resource.find_by( {"name": spec.store_entry_path.name, "workunitid": workunit_definition.registration.workunit_id}, client=client, From 22a47eea15b581e56882e082868db3625d20a2cf Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 17:09:06 +0100 Subject: [PATCH 10/11] changelog --- app_runner/docs/changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app_runner/docs/changelog.md b/app_runner/docs/changelog.md index ab97453..704034a 100644 --- a/app_runner/docs/changelog.md +++ b/app_runner/docs/changelog.md @@ -9,6 +9,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Register single file command: `bfabric-app-runner outputs register-single-file` - Implement copy resource `UpdateExisting.IF_EXISTS` and `UpdateExisting.REQUIRED` support. +### Changed + +- App-runner code related to output staging accepts workunit-definition file like the other steps. + ## \[0.0.7\] - 2024-11-22 ### Fixed From bb765473d79a99a8831843d501310481477a42a9 Mon Sep 17 00:00:00 2001 From: Leonardo Schwarz Date: Tue, 7 Jan 2025 17:09:52 +0100 Subject: [PATCH 11/11] changelog --- app_runner/docs/changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/app_runner/docs/changelog.md b/app_runner/docs/changelog.md index 704034a..408a2b0 100644 --- a/app_runner/docs/changelog.md +++ b/app_runner/docs/changelog.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Register single file command: `bfabric-app-runner outputs register-single-file` - Implement copy resource `UpdateExisting.IF_EXISTS` and `UpdateExisting.REQUIRED` support. +- `storage_id` and `storage_output_folder` have been added to `WorkunitRegistrationDefinition` ### Changed