Skip to content

Commit

Permalink
Merge pull request #643 from opensafely-core/fix-error-erroring
Browse files Browse the repository at this point in the history
Remove left-over logic from switch to finalizing everything
  • Loading branch information
bloodearnest authored Aug 31, 2023
2 parents 4678033 + 6d7d502 commit 68db9b0
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 52 deletions.
42 changes: 12 additions & 30 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,6 @@ def workspace_is_archived(workspace):
return False


def was_oomkilled(container):
# Nb. this flag has been observed to be unreliable on some versions of Linux
return container["State"]["ExitCode"] == 137 and container["State"]["OOMKilled"]


def oomkilled_message(container):
message = "Job ran out of memory"
memory_limit = container.get("HostConfig", {}).get("Memory", 0)
if memory_limit > 0:
gb_limit = memory_limit / (1024**3)
message += f" (limit was {gb_limit:.2f}GB)"
return message


class LocalDockerAPI(ExecutorAPI):
"""ExecutorAPI implementation using local docker service."""

Expand Down Expand Up @@ -291,19 +277,6 @@ def get_status(self, job_definition, timeout=15):
else:
# container present but not running, i.e. finished
# Nb. this does not include prepared jobs, as they have a volume but not a container
if job_definition.cancelled:
return JobStatus(
ExecutorState.EXECUTED,
f"Job cancelled by {job_definition.cancelled}",
)
if was_oomkilled(container):
return JobStatus(ExecutorState.ERROR, oomkilled_message(container))
if container["State"]["ExitCode"] == 137:
return JobStatus(
ExecutorState.ERROR,
"Job either ran out of memory or was killed by an admin",
)

timestamp_ns = datestr_to_ns_timestamp(container["State"]["FinishedAt"])
return JobStatus(ExecutorState.EXECUTED, timestamp_ns=timestamp_ns)

Expand Down Expand Up @@ -405,8 +378,16 @@ def finalize_job(job_definition):

if exit_code == 137 and job_definition.cancelled:
message = f"Job cancelled by {job_definition.cancelled}"
elif was_oomkilled(container_metadata):
message = oomkilled_message(container_metadata)
# Nb. this flag has been observed to be unreliable on some versions of Linux
elif (
container_metadata["State"]["ExitCode"] == 137
and container_metadata["State"]["OOMKilled"]
):
message = "Job ran out of memory"
memory_limit = container_metadata.get("HostConfig", {}).get("Memory", 0)
if memory_limit > 0:
gb_limit = memory_limit / (1024**3)
message += f" (limit was {gb_limit:.2f}GB)"
else:
message = config.DOCKER_EXIT_CODES.get(exit_code)

Expand Down Expand Up @@ -499,7 +480,8 @@ def persist_outputs(job_definition, outputs, job_metadata):
write_manifest_file(
medium_privacy_dir,
{
"repo": job_definition.study.git_repo_url,
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)
Expand Down
8 changes: 8 additions & 0 deletions jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ class JobDefinition:


class ExecutorState(Enum):
# Job is currently preparing to run: creating volumes,copying files, etc
PREPARING = "preparing"
# Job volume is prepared and ready to run
PREPARED = "prepared"
# Job currently executing
EXECUTING = "executing"
# Job process has finished executing, and has an exit code
EXECUTED = "executed"
# Job is currently being inspected and finalized
FINALIZING = "finalizing"
# Job has finished finalization
FINALIZED = "finalized"
# Executor doesn't know anything about this job (it only tracks active jobs)
UNKNOWN = "unknown"
# There was an error with the executor (*not* the same thing as an error with job)
ERROR = "error"


Expand Down
2 changes: 1 addition & 1 deletion jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ def mark_job_as_failed(job, code, message, error=None, **attrs):
if error is None:
error = True

set_code(job, code, message, error=error, attrs=attrs)
set_code(job, code, message, error=error, **attrs)


def set_code(
Expand Down
17 changes: 6 additions & 11 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,8 @@ def test_integration_with_cohortextractor(
# alongside cohortextractor in this test, however once it supports a close
# enough set of dummy data we can merge them into a single test.
extraction_tool = "cohortextractor"

if extraction_tool == "cohortextractor":
generate_action = "generate_cohort"
else:
generate_action = "generate_dataset"
generate_action = "generate_cohort"
image = "cohortextractor"

api = get_executor_api()

Expand All @@ -56,10 +53,6 @@ def test_integration_with_cohortextractor(
# Disable repo URL checking so we can run using a local test repo
monkeypatch.setattr("jobrunner.config.ALLOWED_GITHUB_ORGS", None)

if extraction_tool == "cohortextractor":
image = "cohortextractor"
else:
image = "databuilder:v0.36.0"
ensure_docker_images_present(image, "python")

# Set up a mock job-server with a single job request
Expand Down Expand Up @@ -96,6 +89,7 @@ def test_integration_with_cohortextractor(
# Check that expected number of pending jobs are created
jobs = get_posted_jobs(requests_mock)
assert [job["status"] for job in jobs.values()] == ["pending"] * 7

# Execute one tick of the run loop and then sync
jobrunner.run.handle_jobs(api)
jobrunner.sync.sync()
Expand Down Expand Up @@ -142,6 +136,7 @@ def test_integration_with_cohortextractor(
jobrunner.sync.sync()

# Run the main loop until there are no jobs left and then sync

jobrunner.run.main(exit_callback=lambda active_jobs: len(active_jobs) == 0)
jobrunner.sync.sync()

Expand All @@ -168,7 +163,7 @@ def test_integration_with_cohortextractor(
manifest_file = medium_privacy_workspace / "metadata" / "manifest.json"
manifest = json.loads(manifest_file.read_text())
assert manifest["workspace"] == "testing"
assert manifest["repo"] == str(test_repo.path)
assert manifest["repo"] is None

if extraction_tool == "cohortextractor":
output_name = "input"
Expand Down Expand Up @@ -335,7 +330,7 @@ def test_integration_with_databuilder(
manifest_file = medium_privacy_workspace / "metadata" / "manifest.json"
manifest = json.loads(manifest_file.read_text())
assert manifest["workspace"] == "testing"
assert manifest["repo"] == str(test_repo.path)
assert manifest["repo"] is None

# Check that all the outputs have been produced
assert (high_privacy_workspace / "output/dataset.csv").exists()
Expand Down
12 changes: 2 additions & 10 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,7 @@ def test_finalize_failed_137(docker_cleanup, job_definition, tmp_work_dir, volum
# impersonate an admin
docker.kill(local.container_name(job_definition))

wait_for_state(api, job_definition, ExecutorState.ERROR)
assert (
api.get_status(job_definition).message
== "Job either ran out of memory or was killed by an admin"
)
wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED
Expand Down Expand Up @@ -458,11 +454,7 @@ def test_finalize_failed_oomkilled(docker_cleanup, job_definition, tmp_work_dir)
status = api.prepare(job_definition)
status = api.execute(job_definition)

wait_for_state(api, job_definition, ExecutorState.ERROR)
assert (
api.get_status(job_definition).message
== "Job ran out of memory (limit was 0.01GB)"
)
wait_for_state(api, job_definition, ExecutorState.EXECUTED)

status = api.finalize(job_definition)
assert status.state == ExecutorState.FINALIZED
Expand Down

0 comments on commit 68db9b0

Please sign in to comment.