From 53a0a59580276a7a7fdb36b7c360376421ddfdab Mon Sep 17 00:00:00 2001 From: Adrien Coulier Date: Mon, 2 Oct 2023 10:56:02 +0200 Subject: [PATCH] Improve error handling --- .../services/local_runner_service.py | 68 ++++++++++--------- tests/conftest.py | 14 ++++ .../services/test_local_runner_service.py | 56 +++++++++++++++ 3 files changed, 105 insertions(+), 33 deletions(-) create mode 100644 tests/conftest.py diff --git a/sequencing_report_service/services/local_runner_service.py b/sequencing_report_service/services/local_runner_service.py index 264bdcd..19d7848 100644 --- a/sequencing_report_service/services/local_runner_service.py +++ b/sequencing_report_service/services/local_runner_service.py @@ -51,51 +51,53 @@ def __init__(self, job_repo_factory, nextflow_command_generator, nextflow_log_di async def _start_process(self, job_id): with self._job_repo_factory() as job_repo: job = job_repo.get_job(job_id) - sys_env = os.environ.copy() or {} - job_env = job.environment or {} - env = {**sys_env, **job_env} + assert job working_dir = os.path.join( self._nextflow_log_dirs, str(job_id)) os.mkdir(working_dir) nxf_log = os.path.join(working_dir, "nextflow.out") - nxf_log_fh = open(nxf_log, "w", encoding="utf-8") - + sys_env = os.environ.copy() or {} + job_env = job.environment or {} + env = {**sys_env, **job_env} cmd = shlex.split(shlex.quote(" ".join(job.command))) - log.debug("Will start command %s", cmd) - process = Subprocess( - cmd, - stdout=nxf_log_fh, - stderr=nxf_log_fh, - env=env, - cwd=working_dir, - shell=True, - ) - - job_repo.set_state_of_job(job_id=job.job_id, state=State.STARTED) - job_repo.set_pid_of_job(job.job_id, process.pid) - - return_code = await process.wait_for_exit() - - nxf_log_fh.close() - - with open(nxf_log, encoding="utf-8") as log_file: - cmd_log = log_file.read() - if return_code == 0: + + try: + with open(nxf_log, "w", encoding="utf-8") as nxf_log_fh: + log.debug("Will start command %s", cmd) + process = Subprocess( + cmd, + stdout=nxf_log_fh, + stderr=nxf_log_fh, + env=env, + cwd=working_dir, + shell=True, + ) + + job_repo.set_state_of_job(job_id=job.job_id, state=State.STARTED) + job_repo.set_pid_of_job(job.job_id, process.pid) + + await process.wait_for_exit() + + with open(nxf_log, encoding="utf-8") as log_file: + cmd_log = log_file.read() + log.info("Successfully completed process: %s", job.command) job_repo.set_state_of_job( job_id=job.job_id, state=State.DONE, cmd_log=cmd_log, ) - else: - log.error( - "Found non-zero exit code: %s for command: %s", - return_code, - job.command, - ) + except subprocess.CalledProcessError: + job = job_repo.get_job(job_id) + if job.state == State.CANCELLED: + return + + with open(nxf_log, encoding="utf-8") as log_file: + cmd_log = log_file.read() + log.exception('Job failed with the following error:') job_repo.set_state_of_job( - job_id=job.job_id, + job_id=job_id, state=State.ERROR, cmd_log=cmd_log, ) @@ -128,8 +130,8 @@ def stop(self, job_id): return job.job_id if job and job.state == State.STARTED: log.info("Will stop the currently running job.") - os.kill(job.pid, signal.SIGTERM) job_repo.set_state_of_job(job_id, State.CANCELLED) + os.kill(job.pid, signal.SIGTERM) return job.job_id log.debug("Found no job to cancel with with job id: {}. Or it was not in a cancellable state.") raise UnableToStopJob() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..cf1ef93 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,14 @@ +import asyncio +import pytest_asyncio + + +# This is somehow necessary to make it possible to run all the async tests. +# From: https://stackoverflow.com/questions/66054356/multiple-async-unit-tests-fail-but-running-them-one-by-one-will-pass + + +@pytest_asyncio.fixture(scope="session") +def event_loop(request): + """Create an instance of the default event loop for the whole session.""" + loop = asyncio.get_event_loop_policy().new_event_loop() + yield loop + loop.close() diff --git a/tests/unit/services/test_local_runner_service.py b/tests/unit/services/test_local_runner_service.py index 6ced9f7..255116a 100644 --- a/tests/unit/services/test_local_runner_service.py +++ b/tests/unit/services/test_local_runner_service.py @@ -72,3 +72,59 @@ async def test_stop( assert local_runner_service.get_job(stopped_id).state == State.CANCELLED assert stopped_id == job_id + + @pytest.mark.asyncio + async def test_start_process( + self, + nextflow_cmd_generator, + job_repo_factory, + nextflow_log_dirs + ): + local_runner_service = LocalRunnerService( + job_repo_factory, + nextflow_cmd_generator, + nextflow_log_dirs, + ) + + command_with_env = { + "command": ["sleep", "1"], + "environment": {}, + } + + with local_runner_service._job_repo_factory() as job_repo: + job = job_repo.add_job(command_with_env=command_with_env) + job_id = job.job_id + assert job.state == State.PENDING + + await local_runner_service._start_process(job_id) + + job = job_repo.get_job(job_id) + assert job.state == State.DONE + + @pytest.mark.asyncio + async def test_start_process_fail( + self, + nextflow_cmd_generator, + job_repo_factory, + nextflow_log_dirs + ): + local_runner_service = LocalRunnerService( + job_repo_factory, + nextflow_cmd_generator, + nextflow_log_dirs, + ) + + command_with_env = { + "command": ["fakecommand"], + "environment": {}, + } + + with local_runner_service._job_repo_factory() as job_repo: + job = job_repo.add_job(command_with_env=command_with_env) + job_id = job.job_id + assert job.state == State.PENDING + + await local_runner_service._start_process(job_id) + + job = job_repo.get_job(job_id) + assert job.state == State.ERROR