From e6e9a243806fc67fd7e591b6a5aa5930e1a2246b Mon Sep 17 00:00:00 2001 From: "@jmmshn" Date: Wed, 10 Jan 2024 16:35:21 -0800 Subject: [PATCH 1/3] debug --- src/jobflow/managers/local.py | 19 ++++++++++++------- src/jobflow/settings.py | 6 ++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index 1b09b186..0a39ffbc 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -102,14 +102,19 @@ def _run_job(job: jobflow.Job, parents): errored.add(job.uuid) return None, False - try: + if SETTINGS.RAISE_IMMEDIATELY: response = job.run(store=store) - except Exception: - import traceback - - logger.info(f"{job.name} failed with exception:\n{traceback.format_exc()}") - errored.add(job.uuid) - return None, False + else: + try: + response = job.run(store=store) + except Exception: + import traceback + + logger.info( + f"{job.name} failed with exception:\n{traceback.format_exc()}" + ) + errored.add(job.uuid) + return None, False responses[job.uuid][job.index] = response diff --git a/src/jobflow/settings.py b/src/jobflow/settings.py index 620914e1..5a5e3ad7 100644 --- a/src/jobflow/settings.py +++ b/src/jobflow/settings.py @@ -117,6 +117,12 @@ class JobflowSettings(BaseSettings): "%Y-%m-%d-%H-%M-%S-%f", description="Date stamp format used to create directories", ) + RAISE_IMMEDIATELY: bool = Field( + default=False, + description="Whether to raise an error immediately when the job fails. " + "Usually we allow jobs to fail and check the status post mortem." + "Sometimes, debugging is easier if the error is raised immediately.", + ) model_config = SettingsConfigDict(env_prefix="jobflow_") @model_validator(mode="before") From 9af1614e0937fbe30defe3e907d4865e64521f18 Mon Sep 17 00:00:00 2001 From: "@jmmshn" Date: Fri, 19 Jan 2024 13:34:40 -0800 Subject: [PATCH 2/3] function arg --- src/jobflow/managers/local.py | 7 ++++++- src/jobflow/settings.py | 6 ------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index 0a39ffbc..002fd212 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -21,6 +21,7 @@ def run_locally( root_dir: str | Path | None = None, ensure_success: bool = False, allow_external_references: bool = False, + raise_immediately: bool = False, ) -> dict[str, dict[int, jobflow.Response]]: """ Run a :obj:`Job` or :obj:`Flow` locally. @@ -46,6 +47,10 @@ def run_locally( allow_external_references : bool If False all the references to other outputs should be from other Jobs of the Flow. + raise_immediately : bool + If True, raise an exception immediately if a job fails. If False, continue + running the flow and only raise an exception at the end if the flow did not + finish running successfully. Returns ------- @@ -102,7 +107,7 @@ def _run_job(job: jobflow.Job, parents): errored.add(job.uuid) return None, False - if SETTINGS.RAISE_IMMEDIATELY: + if raise_immediately: response = job.run(store=store) else: try: diff --git a/src/jobflow/settings.py b/src/jobflow/settings.py index 5a5e3ad7..620914e1 100644 --- a/src/jobflow/settings.py +++ b/src/jobflow/settings.py @@ -117,12 +117,6 @@ class JobflowSettings(BaseSettings): "%Y-%m-%d-%H-%M-%S-%f", description="Date stamp format used to create directories", ) - RAISE_IMMEDIATELY: bool = Field( - default=False, - description="Whether to raise an error immediately when the job fails. " - "Usually we allow jobs to fail and check the status post mortem." - "Sometimes, debugging is easier if the error is raised immediately.", - ) model_config = SettingsConfigDict(env_prefix="jobflow_") @model_validator(mode="before") From a76586a385b80a4967d61e17751b5a3c69394d92 Mon Sep 17 00:00:00 2001 From: "@jmmshn" Date: Fri, 19 Jan 2024 13:37:12 -0800 Subject: [PATCH 3/3] function arg --- tests/managers/test_local.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/managers/test_local.py b/tests/managers/test_local.py index bc85be12..66751e53 100644 --- a/tests/managers/test_local.py +++ b/tests/managers/test_local.py @@ -331,6 +331,9 @@ def test_error_flow(memory_jobstore, clean_dir, error_flow, capsys): with pytest.raises(RuntimeError): run_locally(flow, store=memory_jobstore, ensure_success=True) + with pytest.raises(ValueError, match="errored"): + run_locally(flow, store=memory_jobstore, raise_immediately=True) + def test_ensure_success_with_replace(memory_jobstore, error_replace_flow, capsys): from jobflow import run_locally