Skip to content

Commit

Permalink
Merge pull request #121 from fgcz/main
Browse files Browse the repository at this point in the history
Release app_runner 0.0.9
  • Loading branch information
leoschwarz authored Jan 9, 2025
2 parents 18ec371 + d42ac07 commit d16f62c
Show file tree
Hide file tree
Showing 25 changed files with 885 additions and 47 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ bfabric/scripts/query_result.txt
build/
dist/
site/
feats/
_build/
18 changes: 18 additions & 0 deletions app_runner/docs/_design_notes/_app_definition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# App Definition Design

## Goals

- Centralized configuration for all app versions so that if necessary adaptions to the system integration can be done in one place easily.

## Remarks

- The app ID is not part of the app specification, since the same app specification can be used for multiple B-Fabric apps.

## Open question

## Future possibilities

- Version ranges: Maybe we could use semver, but it would need to be a standardized flavor thereof.
- Allow to provide a folder (or set of YAML files) with multiple app definitions, to be pooled together.
If there are any version conflicts an error should be raised.
There probably should be a tool to check the available versions easily.
14 changes: 14 additions & 0 deletions app_runner/docs/_design_notes/_scheduling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Open questions

- Inter-chunk dependencies (assuming some chunks are different from others)
- Should chunks be standardized somehow
- How to transfer a chunk (which is a folder) reasonably across different scheduler systems? -> tar or shared folder (needs config...)
- Ideally: first version allows relatively flexible slurm configuration

## Future possibilities

- The initial version is concerned about submitting one job to a (SLURM) scheduler. However, at a later time multi-node
jobs could be introduced by this job submitting then further jobs to the scheduler. Ideally, it could reuse the same
scheduling interface code as is used for the single-node jobs.
- Internally, we could prepare by making the app runner code that executes the individual chunks more generic.
- Parallel execution of chunks could be introduced. This is actually very similar to the previous point.
10 changes: 10 additions & 0 deletions app_runner/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## \[Unreleased\]

## \[0.0.9\] - 2025-01-09

### Added

- App specs can now define multiple versions in one file. (AppSpec = Collection of app versions and other information.)
- To avoid boilerplate, mako templates can be used inside of strings.
- Apps will resolve the version to use based on the `application_version` field.
- Validation functionality for the new app specification has been added.
- App versions can define a submitter, however this information is not yet used.

## \[0.0.8\] - 2025-01-08

### Added
Expand Down
2 changes: 1 addition & 1 deletion app_runner/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "app_runner"
description = "Application runner for B-Fabric apps"
version = "0.0.8"
version = "0.0.9"
license = { text = "GPL-3.0" }
authors = [
{name = "Leonardo Schwarz", email = "[email protected]"},
Expand Down
50 changes: 50 additions & 0 deletions app_runner/src/app_runner/app_runner/resolve_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from bfabric.experimental.workunit_definition import WorkunitDefinition

from app_runner.specs.app.app_spec import AppSpec

if TYPE_CHECKING:
from pathlib import Path
from bfabric import Bfabric
from app_runner.specs.app.app_version import AppVersion


def resolve_app(versions: AppSpec, workunit_definition: WorkunitDefinition) -> AppVersion:
"""Resolves the app version to use for the provided workunit definition."""
# TODO this should be more generic in the future about the key for the app version (should be handled in AppSpec)
# TODO logic to define "latest" version (should also be handled in AppSpec)
if "application_version" not in workunit_definition.execution.raw_parameters:
raise ValueError("The workunit definition does not contain an application version.")
app_version = workunit_definition.execution.raw_parameters["application_version"]
# TODO graceful handling of invalid versions
return versions[app_version]


def load_workunit_information(
app_spec: Path, client: Bfabric, work_dir: Path, workunit_ref: int | Path
) -> tuple[AppVersion, Path]:
"""Loads the app version and workunit definition from the provided app spec and workunit reference.
:param app_spec: Path to the app spec file.
:param client: The B-Fabric client to use for resolving the workunit.
:param work_dir: Path to the work directory.
:param workunit_ref: Reference to the workunit (ID or YAML file path).
:return app_version: The app version to use.
:return workunit_ref: Path to the workunit definition file. Can be used to reference the workunit in further
steps to avoid unnecessary B-Fabric lookups. (If the workunit_ref was already a path, it will be returned as is,
otherwise the file will be created in the work directory.)
"""
workunit_definition_file = work_dir / "workunit_definition.yml"
workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client, cache_file=workunit_definition_file)
app_versions = AppSpec.load_yaml(
app_spec,
app_id=workunit_definition.registration.application_id,
app_name=workunit_definition.registration.application_name,
)
if isinstance(workunit_ref, int):
workunit_ref = workunit_definition_file
app_version = resolve_app(versions=app_versions, workunit_definition=workunit_definition)
return app_version, workunit_ref
6 changes: 3 additions & 3 deletions app_runner/src/app_runner/app_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
from bfabric.experimental.workunit_definition import WorkunitDefinition

if TYPE_CHECKING:
from app_runner.specs.app_spec import AppSpec
from app_runner.specs.app.app_version import AppVersion
from bfabric import Bfabric


class Runner:
def __init__(self, spec: AppSpec, client: Bfabric, ssh_user: str | None = None) -> None:
def __init__(self, spec: AppVersion, client: Bfabric, ssh_user: str | None = None) -> None:
self._app_spec = spec
self._client = client
self._ssh_user = ssh_user
Expand Down Expand Up @@ -51,7 +51,7 @@ class ChunksFile(BaseModel):


def run_app(
app_spec: AppSpec,
app_spec: AppVersion,
workunit_ref: int | Path,
work_dir: Path,
client: Bfabric,
Expand Down
14 changes: 9 additions & 5 deletions app_runner/src/app_runner/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
from pathlib import Path

import cyclopts
import yaml

from app_runner.specs.app_spec import AppSpec
from app_runner.app_runner.resolve_app import load_workunit_information
from app_runner.app_runner.runner import run_app, Runner
from bfabric import Bfabric
from bfabric.cli_formatting import setup_script_logging
Expand All @@ -27,12 +26,14 @@ def run(
# TODO doc
setup_script_logging()
client = Bfabric.from_config()
app_spec_parsed = AppSpec.model_validate(yaml.safe_load(app_spec.read_text()))

app_version, workunit_ref = load_workunit_information(app_spec, client, work_dir, workunit_ref)

# TODO(#107): usage of entity lookup cache was problematic -> beyond the full solution we could also consider
# to deactivate the cache for the output registration
# with EntityLookupCache.enable():
run_app(
app_spec=app_spec_parsed,
app_spec=app_version,
workunit_ref=workunit_ref,
work_dir=work_dir,
client=client,
Expand All @@ -57,6 +58,9 @@ def dispatch(
work_dir = work_dir.resolve()
# TODO set workunit to processing? (i.e. add read-only option here)
client = Bfabric.from_config()

app_version, workunit_ref = load_workunit_information(app_spec, client, work_dir, workunit_ref)

with EntityLookupCache.enable():
runner = Runner(spec=AppSpec.model_validate(yaml.safe_load(app_spec.read_text())), client=client, ssh_user=None)
runner = Runner(spec=app_version, client=client, ssh_user=None)
runner.run_dispatch(workunit_ref=workunit_ref, work_dir=work_dir)
27 changes: 18 additions & 9 deletions app_runner/src/app_runner/cli/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
from pathlib import Path

import cyclopts
import yaml

from app_runner.output_registration import register_outputs
from app_runner.specs.app_spec import AppSpec
from app_runner.app_runner.resolve_app import load_workunit_information
from app_runner.app_runner.runner import run_app, Runner
from app_runner.output_registration import register_outputs
from bfabric import Bfabric
from bfabric.cli_formatting import setup_script_logging
from bfabric.experimental.entity_lookup_cache import EntityLookupCache
Expand Down Expand Up @@ -35,10 +34,13 @@ def run_all(
"""
setup_script_logging()
client = Bfabric.from_config()
app_spec_parsed = AppSpec.model_validate(yaml.safe_load(app_spec.read_text()))

app_version, workunit_ref = load_workunit_information(
app_spec=app_spec, client=client, work_dir=work_dir, workunit_ref=workunit_ref
)

run_app(
app_spec=app_spec_parsed,
app_spec=app_version,
workunit_ref=workunit_ref,
work_dir=work_dir,
client=client,
Expand All @@ -59,10 +61,14 @@ def process(app_spec: Path, chunk_dir: Path) -> None:
setup_script_logging()
client = Bfabric.from_config()
chunk_dir = chunk_dir.resolve()
app_spec_parsed = AppSpec.model_validate(yaml.safe_load(app_spec.read_text()))

# TODO this lookup of workunit_definition is very problematic now! FIX NEEDED
app_version, workunit_ref = load_workunit_information(
app_spec=app_spec, client=client, work_dir=chunk_dir, workunit_ref=chunk_dir / ".." / "workunit_definition.yml"
)

with EntityLookupCache.enable():
runner = Runner(spec=app_spec_parsed, client=client, ssh_user=None)
runner = Runner(spec=app_version, client=client, ssh_user=None)
runner.run_process(chunk_dir=chunk_dir)


Expand All @@ -88,9 +94,12 @@ def outputs(
setup_script_logging()
client = Bfabric.from_config()
chunk_dir = chunk_dir.resolve()
app_spec_parsed = AppSpec.model_validate(yaml.safe_load(app_spec.read_text()))

runner = Runner(spec=app_spec_parsed, client=client, ssh_user=ssh_user)
app_version, workunit_ref = load_workunit_information(
app_spec=app_spec, client=client, work_dir=chunk_dir, workunit_ref=workunit_ref
)

runner = Runner(spec=app_version, client=client, ssh_user=ssh_user)
runner.run_collect(workunit_ref=workunit_ref, chunk_dir=chunk_dir)
# TODO specify cache file
workunit_definition = WorkunitDefinition.from_ref(workunit_ref, client=client)
Expand Down
30 changes: 22 additions & 8 deletions app_runner/src/app_runner/cli/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,47 @@
from pathlib import Path

import cyclopts
import rich
import rich.pretty
import yaml
from rich.pretty import pprint

from app_runner.specs.app_spec import AppSpec
from app_runner.specs.app.app_spec import AppSpecTemplate, AppSpec
from app_runner.specs.inputs_spec import InputsSpec
from app_runner.specs.outputs_spec import OutputsSpec
from app_runner.specs.submitter_spec import SubmittersSpec

app_validate = cyclopts.App("validate", help="Validate yaml files.")


@app_validate.command()
def app_spec(yaml_file: Path) -> None:
def app_spec_template(yaml_file: Path) -> None:
"""Validate an app spec file."""
app_spec = AppSpec.model_validate(yaml.safe_load(yaml_file.read_text()))
rich.pretty.pprint(app_spec)
app_spec_file = AppSpecTemplate.model_validate(yaml.safe_load(yaml_file.read_text()))
pprint(app_spec_file)


@app_validate.command()
def app_spec(app_yaml: Path, app_id: str = "x", app_name: str = "y") -> None:
"""Validates the app versions by expanding the relevant config info."""
versions = AppSpec.load_yaml(app_yaml, app_id=app_id, app_name=app_name)
pprint(versions)


@app_validate.command()
def inputs_spec(yaml_file: Path) -> None:
"""Validate an inputs spec file."""
inputs_spec = InputsSpec.model_validate(yaml.safe_load(yaml_file.read_text()))
rich.pretty.pprint(inputs_spec)
pprint(inputs_spec)


@app_validate.command()
def outputs_spec(yaml_file: Path) -> None:
"""Validate an outputs spec file."""
outputs_spec = OutputsSpec.model_validate(yaml.safe_load(yaml_file.read_text()))
rich.pretty.pprint(outputs_spec)
pprint(outputs_spec)


@app_validate.command()
def submitters_spec(yaml_file: Path) -> None:
"""Validate a submitters spec file."""
submitters_spec = SubmittersSpec.model_validate(yaml.safe_load(yaml_file.read_text()))
pprint(submitters_spec)
Empty file.
54 changes: 54 additions & 0 deletions app_runner/src/app_runner/specs/app/app_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import yaml
from pydantic import BaseModel

from app_runner.specs.app.app_version import AppVersion, AppVersionMultiTemplate # noqa: TCH001

if TYPE_CHECKING:
from pathlib import Path


class BfabricAppSpec(BaseModel):
"""Contains the app specification information that is relevant to bfabric..."""

# TODO unclear if it should be kept
app_runner: str


class AppSpecTemplate(BaseModel):
# TODO consider whether to reintroduce
# bfabric: BfabricAppSpec
versions: list[AppVersionMultiTemplate]

def evaluate(self, app_id: str, app_name: str) -> AppSpec:
"""Evaluates the template to a concrete ``AppSpec`` instance."""
versions_templates = [expanded for version in self.versions for expanded in version.expand_versions()]
versions = [template.evaluate(app_id=app_id, app_name=app_name) for template in versions_templates]
return AppSpec.model_validate({"versions": versions})


class AppSpec(BaseModel):
"""Parsed app versions from the app spec file."""

versions: list[AppVersion]

@classmethod
def load_yaml(cls, app_yaml: Path, app_id: int | str, app_name: str) -> AppSpec:
"""Loads the app versions from the provided YAML file and evaluates the templates."""
app_spec_file = AppSpecTemplate.model_validate(yaml.safe_load(app_yaml.read_text()))
return app_spec_file.evaluate(app_id=str(app_id), app_name=str(app_name))

@property
def available_versions(self) -> set[str]:
"""The available versions of the app."""
return {version.version for version in self.versions}

def __getitem__(self, version: str) -> AppVersion | None:
"""Returns the app version with the provided version number or None if it does not exist."""
for app_version in self.versions:
if app_version.version == version:
return app_version
return None
Loading

0 comments on commit d16f62c

Please sign in to comment.