diff --git a/bin/cosmos b/bin/cosmos index 457899fa..4ccc1059 100755 --- a/bin/cosmos +++ b/bin/cosmos @@ -38,7 +38,8 @@ def shell(db_url_or_path_to_sqlite): cosmos_app.shell() -def run(in_jobs, default_drm, default_queue, restart, max_cores): +def run(in_jobs, default_drm, default_queue, restart, max_cores, + default_job_class=None): """ Create an embarassingly parallel workflow from all jobs in `in_jobs`. `in_jobs` should be a json of a dict keyed by uid => command. @@ -48,6 +49,7 @@ def run(in_jobs, default_drm, default_queue, restart, max_cores): cosmos_app = Cosmos(database_url='sqlite:///cosmos.sqlite', default_drm=default_drm, + default_job_class=default_job_class, default_queue=default_queue, get_submit_args=partial(default_get_submit_args, parallel_env='smp')) cosmos_app.initdb() @@ -77,6 +79,7 @@ if __name__ == '__main__': sp = sps.add_parser('run') sp.add_argument('in_jobs') sp.add_argument('--default-drm', '-drm', default='ge') + sp.add_argument('--job-class', '-j') sp.add_argument('--queue', '-q') sp.add_argument('--max_cores', '--max-cores', '-c', type=int, diff --git a/cosmos/VERSION b/cosmos/VERSION index fef250dd..848abb76 100644 --- a/cosmos/VERSION +++ b/cosmos/VERSION @@ -1 +1 @@ -2.6.23 +2.6.24 diff --git a/cosmos/job/drm/drm_ge.py b/cosmos/job/drm/drm_ge.py index 74f2c542..f067e60a 100644 --- a/cosmos/job/drm/drm_ge.py +++ b/cosmos/job/drm/drm_ge.py @@ -1,17 +1,19 @@ import json +import logging import os import re import subprocess +import sys import time - from collections import OrderedDict from more_itertools import grouper from cosmos import TaskStatus from cosmos.job.drm.DRM_Base import DRM -from cosmos.job.drm.util import (check_output_and_stderr, convert_size_to_kb, div, - exit_process_group, DetailedCalledProcessError) +from cosmos.job.drm.util import (DetailedCalledProcessError, + check_output_and_stderr, convert_size_to_kb, + div, exit_process_group) from cosmos.util.signal_handlers import sleep_through_signals @@ -20,29 +22,15 @@ class DRM_GE(DRM): poll_interval = 5 def submit_job(self, task): - for p in [task.output_stdout_path, task.output_stderr_path]: - if os.path.exists(p): - os.unlink(p) + task.drm_jobID, task.status = qsub( + cmd_fn=task.output_command_script_path, + stdout_fn=task.output_stdout_path, + stderr_fn=task.output_stderr_path, + addl_args=task.drm_native_specification, + drm_name=task.drm, + logger=task.log, + log_prefix=str(task)) - ns = ' ' + task.drm_native_specification if task.drm_native_specification else '' - qsub = 'qsub -terse -o {stdout} -e {stderr} -b y -w e -cwd -S /bin/bash -V{ns} '.format( - stdout=task.output_stdout_path, stderr=task.output_stderr_path, ns=ns) - - try: - out = subprocess.check_output( - '{qsub} "{cmd_str}"'.format(cmd_str=task.output_command_script_path, qsub=qsub), - env=os.environ, preexec_fn=exit_process_group, shell=True, stderr=subprocess.STDOUT).decode() - - task.drm_jobID = unicode(int(out)) - except subprocess.CalledProcessError as cpe: - task.log.error('%s submission to %s failed with error %s: %s' % - (task, task.drm, cpe.returncode, cpe.output.decode().strip())) - task.status = TaskStatus.failed - except ValueError: - task.log.error('%s submission to %s returned unexpected text: %s' % (task, task.drm, out)) - task.status = TaskStatus.failed - else: - task.status = TaskStatus.submitted def filter_is_done(self, tasks): """ @@ -57,7 +45,7 @@ def filter_is_done(self, tasks): has been affected by this SGE bug. """ if tasks: - qjobs = _qstat_all() + qjobs = qstat() corrupt_data = {} for task in tasks: @@ -97,7 +85,7 @@ def drm_statuses(self, tasks): :returns: (dict) task.drm_jobID -> drm_status """ if tasks: - qjobs = _qstat_all() + qjobs = qstat() def f(task): return qjobs.get(unicode(task.drm_jobID), dict()).get('state', 'UNK_JOB_STATE') @@ -115,10 +103,10 @@ def _get_task_return_data(self, task): [1] a boolean indicating whether the metadata in [0] are affected by an SGE bug that causes qacct to occasionally return corrupt results. """ - d = _qacct_raw(task) + d = self.task_qacct(task) job_failed = d['failed'][0] != '0' - data_are_corrupt = _is_corrupt(d) + data_are_corrupt = is_corrupt(d) if job_failed or data_are_corrupt: task.workflow.log.warn('%s SGE (qacct -j %s) reports %s:\n%s' % @@ -137,15 +125,15 @@ def _get_task_return_data(self, task): system_time=float(d['ru_stime']), avg_rss_mem=d['ru_ixrss'], - max_rss_mem_kb=convert_size_to_kb(d['ru_maxrss']), + max_rss_mem_kb=convert_size_to_kb(d['maxrss']), avg_vms_mem_kb=None, max_vms_mem_kb=convert_size_to_kb(d['maxvmem']), io_read_count=int(d['ru_inblock']), io_write_count=int(d['ru_oublock']), io_wait=float(d['iow']), - io_read_kb=float(d['io']), - io_write_kb=float(d['io']), + io_read_kb=convert_size_to_kb("%fG" % float(d['io'])), + io_write_kb=convert_size_to_kb("%fG" % float(d['io'])), ctx_switch_voluntary=int(d['ru_nvcsw']), ctx_switch_involuntary=int(d['ru_nivcsw']), @@ -161,6 +149,14 @@ def _get_task_return_data(self, task): return processed_data, data_are_corrupt + @staticmethod + def task_qacct(task, timeout=1200, quantum=15): + """ + Return qacct data for the specified task. + """ + return qacct(task.drm_jobID, timeout, quantum, + task.workflow.log, str(task)) + def kill(self, task): """Terminate a task.""" raise NotImplementedError @@ -172,8 +168,21 @@ def kill_tasks(self, tasks): subprocess.call(['qdel', pids], preexec_fn=exit_process_group) -def _is_corrupt(qacct_dict): +def _get_null_logger(): + """ + Return a logger that drops all messages passed to it. """ + logger = logging.getLogger( + ".".join([sys.modules[__name__].__name__, "null_logger"])) + # only initialize the null logger the first time we load it + if not logger.handlers: + logger.addHandler(logging.NullHandler()) + + +def is_corrupt(qacct_dict): + """ + Return true if qacct returns bogus job data for a job id. + qacct may return multiple records for a job. They may all be corrupt. Yuk. This was allegedly fixed in 6.0u10 but we've seen it in UGE 8.3.1. @@ -195,14 +204,19 @@ def _is_corrupt(qacct_dict): ("before writing exit_status" not in qacct_dict.get('failed', '')) -def _qacct_raw(task, timeout=600, quantum=15): +def qacct(job_id, timeout=1200, quantum=15, logger=None, log_prefix=""): """ Parse qacct output into key/value pairs. If qacct reports results in multiple blocks (separated by a row of ===='s), - the most recently-generated block with valid data is returned. If no such - block exists, then return the most recently-generated block of corrupt data. + the most recently-generated block with valid data is returned. If no block + with valid data exists, then return the most recently-generated block of + corrupt data. Call ``is_corrupt()`` on the output of this method to see if + the data are suitable for use. """ + if not logger: + logger = _get_null_logger() + start = time.time() curr_qacct_dict = None good_qacct_dict = None @@ -212,7 +226,7 @@ def _qacct_raw(task, timeout=600, quantum=15): qacct_returncode = 0 try: qacct_stdout_str, qacct_stderr_str = check_output_and_stderr( - ['qacct', '-j', unicode(task.drm_jobID)], + ['qacct', '-j', unicode(job_id)], preexec_fn=exit_process_group) if qacct_stdout_str.strip(): break @@ -223,34 +237,34 @@ def _qacct_raw(task, timeout=600, quantum=15): if qacct_stderr_str and re.match(r'error: job id \d+ not found', qacct_stderr_str): if i > 0: - task.workflow.log.info('%s SGE (qacct -j %s) reports "not found"; this may mean ' - 'qacct is merely slow, or %s died in the \'qw\' state', - task, task.drm_jobID, task.drm_jobID) + logger.info('%s SGE (qacct -j %s) reports "not found"; this may mean ' + 'qacct is merely slow, or %s died in the \'qw\' state', + log_prefix, job_id, job_id) else: - task.workflow.log.error('%s SGE (qacct -j %s) returned error code %d', - task, task.drm_jobID, qacct_returncode) + logger.error('%s SGE (qacct -j %s) returned error code %d', + log_prefix, job_id, qacct_returncode) if qacct_stdout_str or qacct_stderr_str: - task.workflow.log.error('%s SGE (qacct -j %s) printed the following', task, task.drm_jobID) + logger.error('%s SGE (qacct -j %s) printed the following', log_prefix, job_id) if qacct_stdout_str: - task.workflow.log.error('stdout: "%s"', qacct_stdout_str) + logger.error('stdout: "%s"', qacct_stdout_str) if qacct_stderr_str: - task.workflow.log.error('stderr: "%s"', qacct_stderr_str) + logger.error('stderr: "%s"', qacct_stderr_str) if i > 0: - task.workflow.log.info( + logger.info( '%s SGE (qacct -j %s) attempt %d failed %d sec after first attempt%s', - task, task.drm_jobID, i + 1, time.time() - start, + log_prefix, job_id, i + 1, time.time() - start, '. Will recheck job status after %d sec' % quantum if i + 1 < num_retries else '') if i + 1 < num_retries: sleep_through_signals(timeout=quantum) else: # fallthrough: all retries failed - raise ValueError('No valid `qacct -j %s` output after %d tries and %d sec' % - (task.drm_jobID, i, time.time() - start)) + raise ValueError('%s No valid SGE (qacct -j %s) output after %d tries and %d sec' % + (log_prefix, job_id, i, time.time() - start)) for line in qacct_stdout_str.strip().split('\n'): if line.startswith('='): - if curr_qacct_dict and not _is_corrupt(curr_qacct_dict): + if curr_qacct_dict and not is_corrupt(curr_qacct_dict): # # Cache this non-corrupt block of qacct data just # in case all the more recent blocks are corrupt. @@ -263,22 +277,24 @@ def _qacct_raw(task, timeout=600, quantum=15): try: k, v = re.split(r'\s+', line, maxsplit=1) except ValueError: - raise EnvironmentError('%s with drm_jobID=%s has unparseable qacct output:\n%s' % - (task, task.drm_jobID, qacct_stdout_str)) + raise EnvironmentError('%s SGE (qacct -j %s) output is unparseable:\n%s' % + (log_prefix, job_id, qacct_stdout_str)) curr_qacct_dict[k] = v.strip() # if the last block of qacct data looks good, promote it - if curr_qacct_dict and not _is_corrupt(curr_qacct_dict): + if curr_qacct_dict and not is_corrupt(curr_qacct_dict): good_qacct_dict = curr_qacct_dict return good_qacct_dict if good_qacct_dict else curr_qacct_dict -def _qstat_all(): +def qstat(): """ - returns a dict keyed by lsf job ids, who's values are a dict of bjob - information about the job + Return a mapping of job ids to a dict of GE information about each job. + + The exact contents of the sub-dictionaries in the returned dictionary's + values() depend on the installed GE version. """ try: lines = subprocess.check_output(['qstat'], preexec_fn=exit_process_group).decode().strip().split('\n') @@ -290,3 +306,46 @@ def _qstat_all(): items = re.split(r"\s+", l.strip()) bjobs[items[0]] = dict(zip(keys, items)) return bjobs + + +def qsub(cmd_fn, stdout_fn, stderr_fn, addl_args=None, drm_name="GE", logger=None, log_prefix=""): + """ + Submit the requested (bash-parseable) script stored in cmd_fn to GE. + + The command is submitted relatove to the current CWD. Callers should change + this before calling if they need to run in a particular directory. + + Output will be written to two filenames, specified in stdout_fn and stderr_fn. + Additional arguments to SGE may be specified as a single string in addl_args. + Callers can optionally supply a logger object and a prefix to prepend to log messages. + """ + for p in [stdout_fn, stderr_fn]: + if os.path.exists(p): + os.unlink(p) + + qsub_cli = 'qsub -terse -o {stdout_fn} -e {stderr_fn} -b y -w e -cwd -S /bin/bash -V'.format( + stdout_fn=stdout_fn, stderr_fn=stderr_fn) + + if addl_args: + qsub_cli += ' %s' % addl_args + + job_id = None + try: + out = subprocess.check_output( + '{qsub_cli} "{cmd_fn}"'.format(cmd_fn=cmd_fn, qsub_cli=qsub_cli), + env=os.environ, preexec_fn=exit_process_group, shell=True, + stderr=subprocess.STDOUT).decode() + + job_id = unicode(int(out)) + except subprocess.CalledProcessError as cpe: + logger.error('%s submission to %s (%s) failed with error %s: %s' % + (log_prefix, drm_name, qsub, cpe.returncode, cpe.output.decode().strip())) + status = TaskStatus.failed + except ValueError: + logger.error('%s submission to %s returned unexpected text: %s' % + (log_prefix, drm_name, out)) + status = TaskStatus.failed + else: + status = TaskStatus.submitted + + return (job_id, status) diff --git a/cosmos/models/Cosmos.py b/cosmos/models/Cosmos.py index 5a9baf05..d14ddccb 100644 --- a/cosmos/models/Cosmos.py +++ b/cosmos/models/Cosmos.py @@ -32,9 +32,10 @@ def default_get_submit_args(task, parallel_env='orte'): time = ' -W 0:{0}'.format(task.time_req) if task.time_req else '' return '-R "{rusage}span[hosts=1]" -n {task.core_req}{time}{queue} -J "{jobname}"'.format(**locals()) elif task.drm in ['ge', 'drmaa:ge']: - return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{queue}'.format( + return '-cwd -pe {parallel_env} {core_req}{priority} -N "{jobname}"{job_class}{queue}'.format( priority=' -p %s' % default_job_priority if default_job_priority else '', - queue=' -q %s' % task.queue or '', + job_class=' -jc %s' % task.job_class if task.job_class else '', + queue=' -q %s' % task.queue if task.queue else '', jobname=jobname, core_req=task.core_req, parallel_env=parallel_env) @@ -60,7 +61,8 @@ def __init__(self, default_queue=None, default_time_req=None, default_max_attempts=1, - flask_app=None): + flask_app=None, + default_job_class=None): """ :param str database_url: A `sqlalchemy database url `_. ex: sqlite:///home/user/sqlite.db or mysql://user:pass@localhost/database_name or postgresql+psycopg2://user:pass@localhost/database_name @@ -109,6 +111,7 @@ def shutdown_session(exception=None): self.session.remove() self.default_drm = default_drm + self.default_job_class = default_job_class self.default_queue = default_queue self.default_max_attempts = default_max_attempts self.default_time_req = default_time_req diff --git a/cosmos/models/Task.py b/cosmos/models/Task.py index 0efc9f33..ebc8ca9d 100644 --- a/cosmos/models/Task.py +++ b/cosmos/models/Task.py @@ -7,16 +7,15 @@ import networkx as nx from flask import url_for from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy.orm import relationship, synonym +from sqlalchemy.ext.declarative.base import _declarative_constructor +from sqlalchemy.orm import reconstructor, relationship, synonym from sqlalchemy.schema import Column, ForeignKey, UniqueConstraint -from sqlalchemy.types import Boolean, Integer, String, DateTime +from sqlalchemy.types import Boolean, DateTime, Integer, String -from cosmos import TaskStatus, StageStatus, signal_task_status_change +from cosmos import StageStatus, TaskStatus, signal_task_status_change from cosmos.db import Base from cosmos.util.helpers import wait_for_file -from cosmos.util.sqla import Enum_ColumnType, MutableDict, JSONEncodedDict - -opj = os.path.join +from cosmos.util.sqla import Enum_ColumnType, JSONEncodedDict, MutableDict class ExpectedError(Exception): pass @@ -56,8 +55,10 @@ def task_status_changed(task): elif task.status == TaskStatus.submitted: task.stage.status = StageStatus.running if not task.NOOP: - task.log.info('%s %s. drm=%s; drm_jobid=%s; queue=%s' % (task, task.status, repr(task.drm), - repr(task.drm_jobID), repr(task.queue))) + task.log.info( + '%s %s. drm=%s; drm_jobid=%s; job_class=%s; queue=%s' % + (task, task.status, repr(task.drm), repr(task.drm_jobID), + repr(task.job_class), repr(task.queue))) task.submitted_on = datetime.datetime.now() elif task.status == TaskStatus.failed: @@ -112,7 +113,8 @@ def task_status_changed(task): def logplus(filename): prefix, suffix = os.path.splitext(filename) - return property(lambda self: opj(self.log_dir, "{0}_attempt{1}{2}".format(prefix, self.attempt, suffix))) + return property(lambda self: os.path.join( + self.log_dir, "{0}_attempt{1}{2}".format(prefix, self.attempt, suffix))) def readfile(path): @@ -174,6 +176,8 @@ class Task(Base): attempt = Column(Integer, nullable=False) must_succeed = Column(Boolean, nullable=False) drm = Column(String(255)) + # FIXME consider making job_class a proper field next time the schema changes + # job_class = Column(String(255)) queue = Column(String(255)) max_attempts = Column(Integer) parents = relationship("Task", @@ -348,3 +352,12 @@ def __repr__(self): def __str__(self): return self.__repr__() + + # FIXME consider making job_class a proper field next time the schema changes + def __init__(self, **kwargs): + self.job_class = kwargs.pop('job_class', None) + _declarative_constructor(self, **kwargs) + + @reconstructor + def init_on_load(self): + self.job_class = None diff --git a/cosmos/models/Workflow.py b/cosmos/models/Workflow.py index 06ba194c..e504f3c4 100644 --- a/cosmos/models/Workflow.py +++ b/cosmos/models/Workflow.py @@ -143,7 +143,7 @@ def make_output_dirs(self): def add_task(self, func, params=None, parents=None, stage_name=None, uid=None, drm=None, queue=None, must_succeed=True, time_req=None, core_req=None, mem_req=None, - max_attempts=None, noop=False): + max_attempts=None, noop=False, job_class=None): """ Adds a new Task to the Workflow. If the Task already exists (and was successful), return the successful Task stored in the database @@ -157,6 +157,7 @@ def add_task(self, func, params=None, parents=None, stage_name=None, uid=None, d database version will be returned and a new one will not be created. :param str stage_name: The name of the Stage to add this Task to. Defaults to `func.__name__`. :param str drm: The drm to use for this Task (example 'local', 'ge' or 'drmaa:lsf'). Defaults to the `default_drm` parameter of :meth:`Cosmos.start` + :param job_class: The name of a job_class to submit to; defaults to the `default_job_class` parameter of :meth:`Cosmos.start` :param queue: The name of a queue to submit to; defaults to the `default_queue` parameter of :meth:`Cosmos.start` :param bool must_succeed: Default True. If False, the Workflow will not fail if this Task does not succeed. Dependent Jobs will not be executed. :param bool time_req: The time requirement; will set the Task.time_req attribute which is intended to be used by :func:`get_submit_args` to request resources. @@ -259,6 +260,7 @@ def params_or_signature_default_or(name, default): output_map=output_map, uid=uid, drm=drm if drm is not None else self.cosmos_app.default_drm, + job_class=job_class if job_class is not None else self.cosmos_app.default_job_class, queue=queue if queue is not None else self.cosmos_app.default_queue, must_succeed=must_succeed, core_req=core_req if core_req is not None else params_or_signature_default_or('core_req', 1), diff --git a/examples/ex2.py b/examples/ex2.py index 83a9d60e..527d09f4 100644 --- a/examples/ex2.py +++ b/examples/ex2.py @@ -78,7 +78,8 @@ def recipe(workflow): if __name__ == '__main__': p = argparse.ArgumentParser() p.add_argument('-drm', default='local', help='', choices=('local', 'drmaa:ge', 'ge', 'slurm')) - p.add_argument('-q', '--queue', help='Submit to this queue of the DRM supports it') + p.add_argument('-j', '--job-class', help='Submit to this job class if the DRM supports it') + p.add_argument('-q', '--queue', help='Submit to this queue if the DRM supports it') args = p.parse_args() @@ -87,6 +88,7 @@ def recipe(workflow): get_submit_args=partial(default_get_submit_args, parallel_env='smp'), default_drm=args.drm, default_max_attempts=2, + default_job_class=args.job_class, default_queue=args.queue) cosmos.initdb() diff --git a/examples_py2/ex2.py b/examples_py2/ex2.py index 9100fde3..a4f26471 100644 --- a/examples_py2/ex2.py +++ b/examples_py2/ex2.py @@ -54,7 +54,8 @@ def recipe(workflow): p = argparse.ArgumentParser() p.add_argument('-drm', default='local', help='', choices=('local', 'drmaa:ge', 'ge', 'slurm')) - p.add_argument('-q', '--queue', help='Submit to this queue of the DRM supports it') + p.add_argument('-j', '--job-class', help='Submit to this job class if the DRM supports it') + p.add_argument('-q', '--queue', help='Submit to this queue if the DRM supports it') args = p.parse_args() @@ -62,6 +63,7 @@ def recipe(workflow): # example of how to change arguments if you're NOT using default_drm='local' get_submit_args=partial(default_get_submit_args, parallel_env='smp'), default_drm=args.drm, + default_job_class=args.job_class, default_queue=args.queue) cosmos.initdb()