Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register output code uses WorkunitDefinition #115

Merged
merged 12 commits into from
Jan 7, 2025
5 changes: 5 additions & 0 deletions app_runner/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

- Register single file command: `bfabric-app-runner outputs register-single-file`
- Implement copy resource `UpdateExisting.IF_EXISTS` and `UpdateExisting.REQUIRED` support.
- `storage_id` and `storage_output_folder` have been added to `WorkunitRegistrationDefinition`

### Changed

- App-runner code related to output staging accepts workunit-definition file like the other steps.

## \[0.0.7\] - 2024-11-22

Expand Down
24 changes: 6 additions & 18 deletions app_runner/src/app_runner/app_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from typing import TYPE_CHECKING

import yaml
from bfabric.experimental.workunit_definition import WorkunitDefinition
from loguru import logger
from pydantic import BaseModel

from app_runner.input_preparation import prepare_folder
from app_runner.output_registration import register_outputs
from bfabric.experimental.workunit_definition import WorkunitDefinition

if TYPE_CHECKING:
from app_runner.specs.app_spec import AppSpec
Expand Down Expand Up @@ -44,20 +44,6 @@ def run_process(self, chunk_dir: Path) -> None:
logger.info(f"Running process command: {shlex.join(command)}")
subprocess.run(command, check=True)

def run_register_outputs(self, chunk_dir: Path, workunit_ref: int | Path, reuse_default_resource: bool) -> None:
workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=self._client)
registration = workunit_definition.registration
if registration is None:
msg = "Workunit definition does not provide registration information"
raise ValueError(msg)
register_outputs(
outputs_yaml=chunk_dir / "outputs.yml",
workunit_id=registration.workunit_id,
client=self._client,
ssh_user=self._ssh_user,
reuse_default_resource=reuse_default_resource,
)


class ChunksFile(BaseModel):
# TODO move to better location
Expand Down Expand Up @@ -96,9 +82,11 @@ def run_app(
runner.run_process(chunk_dir=chunk)
runner.run_collect(workunit_ref=workunit_definition_file, chunk_dir=chunk)
if not read_only:
runner.run_register_outputs(
chunk_dir=chunk,
workunit_ref=workunit_definition_file,
register_outputs(
outputs_yaml=chunk / "outputs.yml",
workunit_definition=workunit_definition,
client=client,
ssh_user=ssh_user,
reuse_default_resource=app_spec.reuse_default_resource,
)

Expand Down
12 changes: 10 additions & 2 deletions app_runner/src/app_runner/cli/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import cyclopts
import yaml

from app_runner.output_registration import register_outputs
from app_runner.specs.app_spec import AppSpec
from app_runner.app_runner.runner import run_app, Runner
from bfabric import Bfabric
from bfabric.cli_formatting import setup_script_logging
from bfabric.experimental.entity_lookup_cache import EntityLookupCache
from bfabric.experimental.workunit_definition import WorkunitDefinition

app_chunk = cyclopts.App("chunk", help="Run an app on a chunk. You can create the chunks with `app dispatch`.")

Expand Down Expand Up @@ -90,7 +92,13 @@ def outputs(

runner = Runner(spec=app_spec_parsed, client=client, ssh_user=ssh_user)
runner.run_collect(workunit_ref=workunit_ref, chunk_dir=chunk_dir)
# TODO specify cache file
workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=client)
if not read_only:
runner.run_register_outputs(
chunk_dir=chunk_dir, workunit_ref=workunit_ref, reuse_default_resource=reuse_default_resource
register_outputs(
outputs_yaml=chunk_dir / "outputs.yml",
workunit_definition=workunit_definition,
client=client,
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
)
21 changes: 9 additions & 12 deletions app_runner/src/app_runner/cli/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,22 @@
from app_runner.specs.outputs_spec import OutputsSpec, CopyResourceSpec, UpdateExisting
from bfabric import Bfabric
from bfabric.cli_formatting import setup_script_logging
from bfabric.entities import Workunit
from bfabric.experimental.workunit_definition import WorkunitDefinition

app_outputs = cyclopts.App("outputs", help="Register output files for an app.")


def _get_workunit(client: Bfabric, workunit_id: int) -> Workunit:
def _get_workunit_definition(client: Bfabric, workunit_ref: int | Path) -> WorkunitDefinition:
"""Get the workunit with the given id and raises an error if it is not found."""
workunit = Workunit.find(id=workunit_id, client=client)
if workunit is None:
msg = f"Workunit with id {workunit_id} not found"
raise ValueError(msg)
return workunit
workunit_ref = workunit_ref.resolve() if isinstance(workunit_ref, Path) else workunit_ref
# TODO can we do better and provide a cache_file even here?
return WorkunitDefinition.from_ref(workunit=workunit_ref, client=client, cache_file=None)


@app_outputs.command()
def register(
outputs_yaml: Path,
# TODO we should use the workunit definition instead
workunit_id: int,
workunit_ref: int | Path,
*,
ssh_user: str | None = None,
# TODO
Expand All @@ -39,7 +36,7 @@ def register(
specs_list = OutputsSpec.read_yaml(outputs_yaml)
register_all(
client=client,
workunit=_get_workunit(client, workunit_id),
workunit_definition=_get_workunit_definition(client, workunit_ref),
specs_list=specs_list,
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
Expand All @@ -50,7 +47,7 @@ def register(
def register_single_file(
local_path: Path,
*,
workunit_id: int,
workunit_ref: int | Path,
store_entry_path: Path | None = None,
store_folder_path: Path | None = None,
update_existing: UpdateExisting = UpdateExisting.NO,
Expand All @@ -76,7 +73,7 @@ def register_single_file(
pprint(spec)
register_all(
client=client,
workunit=_get_workunit(client, workunit_id),
workunit_definition=_get_workunit_definition(client, workunit_ref),
specs_list=[spec],
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
Expand Down
80 changes: 43 additions & 37 deletions app_runner/src/app_runner/output_registration/register.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from loguru import logger

from bfabric.entities import Storage, Workunit, Resource
from app_runner.specs.outputs_spec import (
CopyResourceSpec,
UpdateExisting,
Expand All @@ -13,40 +13,40 @@
)
from app_runner.util.checksums import md5sum
from app_runner.util.scp import scp
from bfabric.entities import Resource
from bfabric.entities import Storage, Workunit
from bfabric_scripts.bfabric_save_csv2dataset import bfabric_save_csv2dataset
from glom import glom
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pathlib import Path
from bfabric import Bfabric
from bfabric.experimental.workunit_definition import WorkunitDefinition


def _get_output_folder(spec: CopyResourceSpec, workunit: Workunit) -> Path:
def _get_output_folder(spec: CopyResourceSpec, workunit_definition: WorkunitDefinition) -> Path:
if not spec.store_folder_path:
return workunit.store_output_folder
return workunit_definition.registration.storage_output_folder
else:
return spec.store_folder_path


def register_file_in_workunit(
spec: CopyResourceSpec,
client: Bfabric,
workunit: Workunit,
storage: Storage,
workunit_definition: WorkunitDefinition,
resource_id: int | None = None,
) -> None:
"""Registers a file in the workunit."""
existing_id = _identify_existing_resource_id(client, spec, workunit)
existing_id = _identify_existing_resource_id(client, spec, workunit_definition)
if resource_id is not None and existing_id is not None and resource_id != existing_id:
raise ValueError(f"Resource id {resource_id} does not match existing resource id {existing_id}")

checksum = md5sum(spec.local_path)
output_folder = _get_output_folder(spec, workunit=workunit)
output_folder = _get_output_folder(spec, workunit_definition=workunit_definition)
resource_data = {
"name": spec.store_entry_path.name,
"workunitid": workunit.id,
"storageid": storage.id,
"workunitid": workunit_definition.registration.workunit_id,
"storageid": workunit_definition.registration.storage_id,
"relativepath": output_folder / spec.store_entry_path,
"filechecksum": checksum,
"status": "available",
Expand All @@ -60,28 +60,35 @@ def register_file_in_workunit(
client.save("resource", resource_data)


def _identify_existing_resource_id(client: Bfabric, spec: CopyResourceSpec, workunit: Workunit) -> int | None:
def _identify_existing_resource_id(
client: Bfabric, spec: CopyResourceSpec, workunit_definition: WorkunitDefinition
) -> int | None:
"""Returns the id of the existing resource if it exists."""
if spec.update_existing in (UpdateExisting.IF_EXISTS, UpdateExisting.REQUIRED):
# TODO which are actually the unique fields that should be checked?
# TODO maybe it would be more accurate to use relativepath here, however historically it would often start
# with `/` which can be confusing.
resources = Resource.find_by(
{"name": spec.store_entry_path.name, "workunitid": workunit.id}, client=client
{"name": spec.store_entry_path.name, "workunitid": workunit_definition.registration.workunit_id},
client=client,
).values()
if resources:
return list(resources)[0].id
elif spec.update_existing == UpdateExisting.REQUIRED:
raise ValueError(f"Resource {spec.store_entry_path.name} not found in workunit {workunit.id}")
raise ValueError(f"Resource {spec.store_entry_path.name} not found in workunit {workunit_definition.id}")
return None


def copy_file_to_storage(spec: CopyResourceSpec, workunit: Workunit, storage: Storage, ssh_user: str | None) -> None:
def copy_file_to_storage(
spec: CopyResourceSpec, workunit_definition: WorkunitDefinition, storage: Storage, ssh_user: str | None
) -> None:
"""Copies a file to the storage, according to the spec."""
output_folder = _get_output_folder(spec, workunit=workunit)
# TODO here some direct uses of storage could still be optimized away
output_folder = _get_output_folder(spec, workunit_definition=workunit_definition)
output_uri = f"{storage.scp_prefix}{output_folder / spec.store_entry_path}"
scp(spec.local_path, output_uri, user=ssh_user)


def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit: Workunit) -> None:
def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit_definition: WorkunitDefinition) -> None:
"""Saves a dataset to the bfabric."""
# TODO should not print to stdout in the future
# TODO also it should not be imported from bfabric_scripts, but rather the generic functionality should be available
Expand All @@ -90,16 +97,17 @@ def _save_dataset(spec: SaveDatasetSpec, client: Bfabric, workunit: Workunit) ->
client=client,
csv_file=spec.local_path,
dataset_name=spec.name or spec.local_path.stem,
container_id=workunit.container.id,
workunit_id=workunit.id,
container_id=workunit_definition.registration.container_id,
workunit_id=workunit_definition.registration.workunit_id,
sep=spec.separator,
has_header=spec.has_header,
invalid_characters=spec.invalid_characters,
)


def find_default_resource_id(workunit: Workunit) -> int | None:
def find_default_resource_id(workunit_definition: WorkunitDefinition, client: Bfabric) -> int | None:
"""Finds the default resource's id for the workunit. Maybe in the future, this will be always `None`."""
workunit = Workunit.find(id=workunit_definition.registration.workunit_id, client=client)
candidate_resources = [
resource for resource in workunit.resources if resource["name"] not in ["slurm_stdout", "slurm_stderr"]
]
Expand All @@ -110,47 +118,45 @@ def find_default_resource_id(workunit: Workunit) -> int | None:


def register_all(
client: Bfabric, workunit: Workunit, specs_list: list[SpecType], ssh_user: str | None, reuse_default_resource: bool
client: Bfabric,
workunit_definition: WorkunitDefinition,
specs_list: list[SpecType],
ssh_user: str | None,
reuse_default_resource: bool,
) -> None:
"""Registers all the output specs to the workunit."""
default_resource_was_reused = not reuse_default_resource
for spec in specs_list:
logger.debug(f"Registering {spec}")
if isinstance(spec, CopyResourceSpec):
storage = glom(workunit, "application.storage")
copy_file_to_storage(spec, workunit=workunit, storage=storage, ssh_user=ssh_user)
storage = Storage.find(workunit_definition.registration.storage_id, client=client)
copy_file_to_storage(spec, workunit_definition=workunit_definition, storage=storage, ssh_user=ssh_user)
if not default_resource_was_reused:
resource_id = find_default_resource_id(workunit=workunit)
resource_id = find_default_resource_id(workunit_definition=workunit_definition, client=client)
default_resource_was_reused = True
else:
resource_id = None
register_file_in_workunit(spec, client=client, workunit=workunit, storage=storage, resource_id=resource_id)
register_file_in_workunit(
spec, client=client, workunit_definition=workunit_definition, resource_id=resource_id
)
elif isinstance(spec, SaveDatasetSpec):
_save_dataset(spec, client, workunit=workunit)
_save_dataset(spec, client, workunit_definition=workunit_definition)
else:
raise ValueError(f"Unknown spec type: {type(spec)}")


def register_outputs(
outputs_yaml: Path,
workunit_id: int,
workunit_definition: WorkunitDefinition,
client: Bfabric,
ssh_user: str | None,
reuse_default_resource: bool,
) -> None:
"""Registers outputs to the workunit."""
# TODO it seems there is some redundancy here (i.e. there is also the implementation in runner)
# parse the specs
specs_list = OutputsSpec.read_yaml(outputs_yaml)

# register all specs
workunit = Workunit.find(id=workunit_id, client=client)
if workunit is None:
msg = f"Workunit with id {workunit_id} not found"
raise ValueError(msg)
register_all(
client=client,
workunit=workunit,
workunit_definition=workunit_definition,
specs_list=specs_list,
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
Expand Down
4 changes: 4 additions & 0 deletions src/bfabric/experimental/workunit_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class WorkunitRegistrationDefinition(BaseModel):

workunit_id: int
container_id: int
storage_id: int
storage_output_folder: Path
container_type: Literal["project", "order"]

@classmethod
Expand All @@ -59,6 +61,8 @@ def from_workunit(cls, workunit: Workunit) -> WorkunitRegistrationDefinition:
"workunit_id": workunit.id,
"container_id": workunit.container.id,
"container_type": workunit.container.ENDPOINT,
"storage_id": workunit.application.storage.id,
"storage_output_folder": workunit.store_output_folder,
}
return cls.model_validate(data)

Expand Down
Loading