Skip to content

Commit

Permalink
Merge GH-365/emcee-submit into test/willard
Browse files Browse the repository at this point in the history
  • Loading branch information
TimothyWillard committed Nov 14, 2024
2 parents 6b7a1e8 + 66ab33d commit d2b2f69
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 16 deletions.
92 changes: 76 additions & 16 deletions flepimop/gempyor_pkg/src/gempyor/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .logging import get_script_logger
from .utils import _format_cli_options, _git_checkout, _git_head, _shutil_which, config
from .shared_cli import (
MEMORY_MB,
NONNEGATIVE_DURATION,
cli,
config_files_argument,
Expand Down Expand Up @@ -970,7 +971,7 @@ def _submit_scenario_job(
),
click.Option(
param_decls=["--memory", "memory"],
type=click.IntRange(min=1),
type=MEMORY_MB,
default=None,
help="Override for the amount of memory per node to use in MB.",
),
Expand Down Expand Up @@ -1002,7 +1003,59 @@ def _submit_scenario_job(
)
@click.pass_context
def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None:
"""Submit batch jobs"""
"""
Submit batch inference jobs.
This CLI tool makes it a breeze to submit batch inference jobs. In general, this
tool does:
1. Determine information about the job such as the inference method, batch system,
job size/time limit, and resources.
2. Write metadata about the batch job to a 'manifest.json' file.
3. Loop over the outcome/seir scenario modifiers and generate/submit batch jobs for
each of those.
4. Checkout a new branch in the project directory to preserve the outputs.
To get a better understanding of this tool you can use the `--dry-run` flag which
will complete all of steps described above except for submitting the jobs. Or if you
would like to test run the batch scripts without submitting to slurm or AWS you can
use the `--local` flag which will run the "batch" job locally (only use for small
test jobs).
Here are some common examples to get started:
1. Assuming you are in your clone of the `HopkinsIDD/flepimop_sample` repository
this command will do a dry run of the sample inference configuration with two
populations.
```bash
$ flepimop submit --jobs 4 \ # With legacy inference jobs refers to chains
--simulations 20 \ # The number of iterations per a chain
--blocks 1 \ # The number of consecutive blocks to run
--slurm \ # Use slurm, shorthand for `--batch-system slurm`
--project-path $(pwd) \ # Manually specify the project path
--email [email protected] \ # Use slurm's built-in email alerts for the job
--skip-checkout \ # Do not create a new branch in the project repo
-vvv \ # Use the max verbosity
--dry-run \ # Do not actually submit the job via `sbatch`.
config_sample_2pop_inference.yml
```
2. Extending on example (1) let's make some small modifications to submit a "real"
job.
```bash
$ flepimop submit --jobs 10 \ # See before
--simulations 400 \ # See before
--blocks 1 \ # See before
--slurm \ # See before
--partition dedicated_slurm_partition \ # Submit to a particular slurm partition
--simulation-time 30s \ # Specify the time limit per simulation
--initial-time 5min \ # Specify the initial time limit
--conda-env custom-flepimop-env \ # Specify a custom conda env to use
--cpus 4 \ # Use 4 CPUs per a job
-vvv \ # Use the max verbosity
config_sample_2pop_inference.yml
```
"""
# Generic setup
now = datetime.now(timezone.utc)
logger = get_script_logger(__name__, kwargs["verbosity"])
Expand Down Expand Up @@ -1073,28 +1126,22 @@ def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None:
logger.info("Setting a total job time limit of %s minutes", job_time_limit.format())

# Job resources
memory = None if kwargs["memory"] is None else math.ceil(kwargs["memory"])
if memory != kwargs["memory"]:
logger.warning(
"The requested memory of %.3fMB has been rounded up to %uMB for submission",
kwargs["memory"],
memory,
)
job_resources = JobResources.from_presets(
job_size,
inference_method,
nodes=kwargs["nodes"],
cpus=kwargs["cpus"],
memory=kwargs["memory"],
memory=memory,
)
logger.info("Requesting the resources %s for this job.", job_resources)

# Outcome/seir modifier scenarios
outcome_modifiers_scenarios = (
cfg["outcome_modifiers"]["scenarios"].as_str_seq()
if cfg["outcome_modifiers"].exists()
and cfg["outcome_modifiers"]["scenarios"].exists()
else [None]
)
seir_modifiers_scenarios = (
cfg["seir_modifiers"]["scenarios"].as_str_seq()
if cfg["seir_modifiers"].exists() and cfg["seir_modifiers"]["scenarios"].exists()
else [None]
)

# Restart/continuation location
# TODO: Implement this

Expand All @@ -1119,6 +1166,19 @@ def _click_submit(ctx: click.Context = mock_context, **kwargs: Any) -> None:
"Dumped the final config for this batch submission to %s", config_out.absolute()
)

# Outcome/seir modifier scenarios
outcome_modifiers_scenarios = (
cfg["outcome_modifiers"]["scenarios"].as_str_seq()
if cfg["outcome_modifiers"].exists()
and cfg["outcome_modifiers"]["scenarios"].exists()
else [None]
)
seir_modifiers_scenarios = (
cfg["seir_modifiers"]["scenarios"].as_str_seq()
if cfg["seir_modifiers"].exists() and cfg["seir_modifiers"]["scenarios"].exists()
else [None]
)

# Loop over the scenarios
for outcome_modifiers_scenario, seir_modifiers_scenario in product(
outcome_modifiers_scenarios, seir_modifiers_scenarios
Expand Down
47 changes: 47 additions & 0 deletions flepimop/gempyor_pkg/src/gempyor/shared_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,53 @@ def convert(
NONNEGATIVE_DURATION = DurationParamType(nonnegative=True)


class MemoryParamType(click.ParamType):
name = "memory"
_units = {
"kb": 1024.0**1.0,
"k": 1024.0**1.0,
"mb": 1024.0**2.0,
"m": 1024.0**2.0,
"gb": 1024.0**3.0,
"g": 1024.0**3.0,
"t": 1024.0**4.0,
"tb": 1024.0**4.0,
}

def __init__(self, unit: str) -> None:
super().__init__()
if (unit := unit.lower()) not in self._units.keys():
raise ValueError(
f"The `unit` given is not valid, given '{unit}' and "
"must be one of: {', '.join(self._units.keys())}."
)
self._unit = unit
self._regex = re.compile(
rf"^(([0-9]+)?(\.[0-9]+)?)({'|'.join(self._units.keys())})?$",
flags=re.IGNORECASE,
)

def convert(
self, value: Any, param: click.Parameter | None, ctx: click.Context | None
) -> float:
value = str(value).strip()
if (m := self._regex.match(value)) is None:
self.fail(f"{value!r} is not a valid memory size.", param, ctx)
number, _, _, unit = m.groups()
unit = unit.lower()
if unit == self._unit:
return float(number)
return (self._units.get(unit, self._unit) * float(number)) / (
self._units.get(self._unit)
)


MEMORY_KB = MemoryParamType("kb")
MEMORY_MB = MemoryParamType("mb")
MEMORY_GB = MemoryParamType("gb")
MEMORY_TB = MemoryParamType("tb")


def click_helpstring(
params: click.Parameter | list[click.Parameter],
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import random
from typing import Any

from click.exceptions import BadParameter
import pytest

from gempyor.shared_cli import MemoryParamType


@pytest.mark.parametrize("unit", ("Nope", "NO CHANCE", "wrong", "bb"))
def test_invalid_unit_value_error(unit: str) -> None:
with pytest.raises(
ValueError,
match=(
"^The `unit` given is not valid, given "
f"'{unit.lower()}' and must be one of:.*.$"
),
):
MemoryParamType(unit)


@pytest.mark.parametrize("value", ("1..2MB", "3.4cb", "56.abc", "-1GB"))
def test_invalid_value_bad_parameter(value: Any) -> None:
memory = MemoryParamType("mb")
with pytest.raises(BadParameter, match="^.* is not a valid memory size.$"):
memory.convert(value, None, None)


@pytest.mark.parametrize("unit", MemoryParamType._units.keys())
@pytest.mark.parametrize(
"number",
[random.randint(1, 1000) for _ in range(3)] # int
+ [random.random() for _ in range(3)] # float without numbers left of decimal
+ [
random.randint(1, 25) + random.random() for _ in range(3)
], # float with numbers left of the decimal
)
def test_convert_acts_as_identity(unit: str, number: int) -> None:
memory = MemoryParamType(unit)
assert memory.convert(f"{number}{unit}".lstrip("0"), None, None) == number
assert memory.convert(f"{number}{unit.upper()}".lstrip("0"), None, None) == number


@pytest.mark.parametrize(
("unit", "value", "expected"),
(
("gb", "1.2gb", 1.2),
("kb", "1mb", 1024.0),
("gb", "30mb", 30.0 / 1024.0),
("kb", "2tb", 2.0 * (1024.0**3.0)),
("mb", "0.1gb", 0.1 * 1024.0),
),
)
def test_exact_results_for_select_inputs(unit: str, value: Any, expected: float) -> None:
memory = MemoryParamType(unit)
assert memory.convert(value, None, None) == expected

0 comments on commit d2b2f69

Please sign in to comment.