Skip to content

Commit

Permalink
Merge pull request #6 from Aratz/DEVELOP-2613_remove_queue_system
Browse files Browse the repository at this point in the history
Improve error handling
  • Loading branch information
Aratz authored Oct 3, 2023
2 parents b2ad387 + 53a0a59 commit 222930b
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 33 deletions.
68 changes: 35 additions & 33 deletions sequencing_report_service/services/local_runner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,51 +51,53 @@ def __init__(self, job_repo_factory, nextflow_command_generator, nextflow_log_di
async def _start_process(self, job_id):
with self._job_repo_factory() as job_repo:
job = job_repo.get_job(job_id)
sys_env = os.environ.copy() or {}
job_env = job.environment or {}
env = {**sys_env, **job_env}
assert job

working_dir = os.path.join(
self._nextflow_log_dirs, str(job_id))
os.mkdir(working_dir)
nxf_log = os.path.join(working_dir, "nextflow.out")
nxf_log_fh = open(nxf_log, "w", encoding="utf-8")

sys_env = os.environ.copy() or {}
job_env = job.environment or {}
env = {**sys_env, **job_env}
cmd = shlex.split(shlex.quote(" ".join(job.command)))
log.debug("Will start command %s", cmd)
process = Subprocess(
cmd,
stdout=nxf_log_fh,
stderr=nxf_log_fh,
env=env,
cwd=working_dir,
shell=True,
)

job_repo.set_state_of_job(job_id=job.job_id, state=State.STARTED)
job_repo.set_pid_of_job(job.job_id, process.pid)

return_code = await process.wait_for_exit()

nxf_log_fh.close()

with open(nxf_log, encoding="utf-8") as log_file:
cmd_log = log_file.read()
if return_code == 0:

try:
with open(nxf_log, "w", encoding="utf-8") as nxf_log_fh:
log.debug("Will start command %s", cmd)
process = Subprocess(
cmd,
stdout=nxf_log_fh,
stderr=nxf_log_fh,
env=env,
cwd=working_dir,
shell=True,
)

job_repo.set_state_of_job(job_id=job.job_id, state=State.STARTED)
job_repo.set_pid_of_job(job.job_id, process.pid)

await process.wait_for_exit()

with open(nxf_log, encoding="utf-8") as log_file:
cmd_log = log_file.read()

log.info("Successfully completed process: %s", job.command)
job_repo.set_state_of_job(
job_id=job.job_id,
state=State.DONE,
cmd_log=cmd_log,
)
else:
log.error(
"Found non-zero exit code: %s for command: %s",
return_code,
job.command,
)
except subprocess.CalledProcessError:
job = job_repo.get_job(job_id)
if job.state == State.CANCELLED:
return

with open(nxf_log, encoding="utf-8") as log_file:
cmd_log = log_file.read()
log.exception('Job failed with the following error:')
job_repo.set_state_of_job(
job_id=job.job_id,
job_id=job_id,
state=State.ERROR,
cmd_log=cmd_log,
)
Expand Down Expand Up @@ -128,8 +130,8 @@ def stop(self, job_id):
return job.job_id
if job and job.state == State.STARTED:
log.info("Will stop the currently running job.")
os.kill(job.pid, signal.SIGTERM)
job_repo.set_state_of_job(job_id, State.CANCELLED)
os.kill(job.pid, signal.SIGTERM)
return job.job_id
log.debug("Found no job to cancel with with job id: {}. Or it was not in a cancellable state.")
raise UnableToStopJob()
Expand Down
14 changes: 14 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio
import pytest_asyncio


# This is somehow necessary to make it possible to run all the async tests.
# From: https://stackoverflow.com/questions/66054356/multiple-async-unit-tests-fail-but-running-them-one-by-one-will-pass


@pytest_asyncio.fixture(scope="session")
def event_loop(request):
"""Create an instance of the default event loop for the whole session."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
56 changes: 56 additions & 0 deletions tests/unit/services/test_local_runner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,59 @@ async def test_stop(

assert local_runner_service.get_job(stopped_id).state == State.CANCELLED
assert stopped_id == job_id

@pytest.mark.asyncio
async def test_start_process(
self,
nextflow_cmd_generator,
job_repo_factory,
nextflow_log_dirs
):
local_runner_service = LocalRunnerService(
job_repo_factory,
nextflow_cmd_generator,
nextflow_log_dirs,
)

command_with_env = {
"command": ["sleep", "1"],
"environment": {},
}

with local_runner_service._job_repo_factory() as job_repo:
job = job_repo.add_job(command_with_env=command_with_env)
job_id = job.job_id
assert job.state == State.PENDING

await local_runner_service._start_process(job_id)

job = job_repo.get_job(job_id)
assert job.state == State.DONE

@pytest.mark.asyncio
async def test_start_process_fail(
self,
nextflow_cmd_generator,
job_repo_factory,
nextflow_log_dirs
):
local_runner_service = LocalRunnerService(
job_repo_factory,
nextflow_cmd_generator,
nextflow_log_dirs,
)

command_with_env = {
"command": ["fakecommand"],
"environment": {},
}

with local_runner_service._job_repo_factory() as job_repo:
job = job_repo.add_job(command_with_env=command_with_env)
job_id = job.job_id
assert job.state == State.PENDING

await local_runner_service._start_process(job_id)

job = job_repo.get_job(job_id)
assert job.state == State.ERROR

0 comments on commit 222930b

Please sign in to comment.