From fe3d0c90f2863d9c24aa208f5926a2ac5a3ed6ef Mon Sep 17 00:00:00 2001 From: Aishwarya Mathuria Date: Wed, 4 May 2022 19:37:40 +0530 Subject: [PATCH] teuthology/queue: Single command for queue operations Makes the same teuthology-queue commands work regardless of the queue backend, Paddles or Beanstalk. Signed-off-by: Aishwarya Mathuria --- scripts/paddles_queue.py | 45 ------------ scripts/queue.py | 15 +++- scripts/worker.py | 6 +- setup.py | 3 +- teuthology/dispatcher/__init__.py | 5 ++ teuthology/dispatcher/supervisor.py | 5 +- teuthology/queue/__init__.py | 106 ---------------------------- teuthology/queue/beanstalk.py | 1 + teuthology/queue/paddles.py | 35 +++++---- teuthology/queue/util.py | 99 ++++++++++++++++++++++++++ teuthology/report.py | 20 +++--- teuthology/schedule.py | 4 +- 12 files changed, 153 insertions(+), 191 deletions(-) delete mode 100644 scripts/paddles_queue.py create mode 100644 teuthology/queue/util.py diff --git a/scripts/paddles_queue.py b/scripts/paddles_queue.py deleted file mode 100644 index 8487fd938e..0000000000 --- a/scripts/paddles_queue.py +++ /dev/null @@ -1,45 +0,0 @@ -import docopt - -import teuthology.config -import teuthology.queue.paddles_queue -doc = """ -usage: teuthology-paddles-queue -h - teuthology-paddles-queue -s -m MACHINE_TYPE - teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] - teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER - teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER - teuthology-paddles-queue -u -m MACHINE_TYPE -U USER - -List Jobs in queue. -If -D is passed, then jobs with PATTERN in the job name are deleted from the -queue. - -Arguments: - -m, --machine_type MACHINE_TYPE - Which machine type queue to work on. - -optional arguments: - -h, --help Show this help message and exit - -D, --delete PATTERN Delete Jobs with PATTERN in their name - -d, --description Show job descriptions - -r, --runs Only show run names - -f, --full Print the entire job config. Use with caution. - -s, --status Prints the status of the queue - -t, --time SECONDS Pause queues for a number of seconds. - If -m is passed, pause that queue, - otherwise pause all queues. - -p, --pause Pause queue - -u, --unpause Unpause queue - -P, --priority PRIORITY - Change priority of queued jobs - -U, --user USER User who owns the jobs - -R, --run-name RUN_NAME - Used to change priority of all jobs in the run. -""" - - -def main(): - args = docopt.docopt(doc) - teuthology.paddles_queue.main(args) diff --git a/scripts/queue.py b/scripts/queue.py index 2c466a7be9..1d9112c22e 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,15 +1,16 @@ import docopt -import teuthology.config import teuthology.queue.beanstalk import teuthology.queue.paddles +from teuthology.config import config doc = """ usage: teuthology-queue -h teuthology-queue [-s|-d|-f] -m MACHINE_TYPE teuthology-queue [-r] -m MACHINE_TYPE teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -p SECONDS [-m MACHINE_TYPE] [-U USER] + teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME] List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the @@ -29,9 +30,17 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs (only in Paddles queues) + -U, --user USER User who owns the jobs + -R, --run-name RUN_NAME + Used to change priority of all jobs in the run. """ def main(): args = docopt.docopt(doc) - teuthology.queue.main(args) + if config.backend == 'beanstalk': + teuthology.queue.beanstalk.main(args) + else: + teuthology.queue.paddles.main(args) diff --git a/scripts/worker.py b/scripts/worker.py index 8d3228d8d0..a3e12c20d7 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -9,7 +9,7 @@ def main(): def parse_args(): parser = argparse.ArgumentParser(description=""" -Grab jobs from a paddles queue and run the teuthology tests they +Grab jobs from a beanstalk queue and run the teuthology tests they describe. One job is run at a time. """) parser.add_argument( @@ -29,8 +29,8 @@ def parse_args(): required=True, ) parser.add_argument( - '-m', '--machine-type', - help='which machine type the jobs will run on', + '-t', '--tube', + help='which beanstalk tube to read jobs from', required=True, ) diff --git a/setup.py b/setup.py index 674f75a282..87c573a4e5 100644 --- a/setup.py +++ b/setup.py @@ -126,8 +126,7 @@ 'teuthology-results = scripts.results:main', 'teuthology-report = scripts.report:main', 'teuthology-kill = scripts.kill:main', - 'teuthology-paddles-queue = scripts.paddles_queue:main', - 'teuthology-beanstalk-queue = scripts.beanstalk_queue:main', + 'teuthology-queue = scripts.queue:main', 'teuthology-prune-logs = scripts.prune_logs:main', 'teuthology-describe = scripts.describe:main', 'teuthology-reimage = scripts.reimage:main', diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index e29600463d..427941ae53 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -95,6 +95,8 @@ def main(args): if backend == 'beanstalk': connection = beanstalk.connect() beanstalk.watch_tube(connection, machine_type) + elif backend == 'paddles': + report.create_machine_type_queue(machine_type) result_proc = None @@ -131,6 +133,9 @@ def main(args): else: job = report.get_queued_job(machine_type) if job is None: + if exit_on_empty_queue and not job_procs: + log.info("Queue is empty and no supervisor processes running; exiting!") + break continue job = clean_config(job) report.try_push_job_info(job, dict(status='running')) diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index cea0edf4c5..6a14057ba6 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -71,8 +71,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): try: report.try_delete_jobs(job_config['name'], job_config['job_id']) except Exception as e: - log.warning("Unable to delete job %s, exception occurred: %s", - job_config['job_id'], e) + log.exception("Unable to delete job %s", job_config['job_id']) job_archive = os.path.join(archive_dir, safe_archive) args = [ os.path.join(teuth_bin_path, 'teuthology-results'), @@ -128,7 +127,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if 'description' in job_config: + if job_config.get('description') is not None: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/queue/__init__.py b/teuthology/queue/__init__.py index 2a0b6ff363..e69de29bb2 100644 --- a/teuthology/queue/__init__.py +++ b/teuthology/queue/__init__.py @@ -1,106 +0,0 @@ -import logging -import pprint -import sys -from collections import OrderedDict - -from teuthology import report -from teuthology.config import config - -log = logging.getLogger(__name__) - -def print_progress(index, total, message=None): - msg = "{m} ".format(m=message) if message else '' - sys.stderr.write("{msg}{i}/{total}\r".format( - msg=msg, i=index, total=total)) - sys.stderr.flush() - -def end_progress(): - sys.stderr.write('\n') - sys.stderr.flush() - -class JobProcessor(object): - def __init__(self): - self.jobs = OrderedDict() - - def add_job(self, job_id, job_config, job_obj=None): - job_id = str(job_id) - - job_dict = dict( - index=(len(self.jobs) + 1), - job_config=job_config, - ) - if job_obj: - job_dict['job_obj'] = job_obj - self.jobs[job_id] = job_dict - - self.process_job(job_id) - - def process_job(self, job_id): - pass - - def complete(self): - pass - - -class JobPrinter(JobProcessor): - def __init__(self, show_desc=False, full=False): - super(JobPrinter, self).__init__() - self.show_desc = show_desc - self.full = full - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_index = self.jobs[job_id]['index'] - job_priority = job_config['priority'] - job_name = job_config['name'] - job_desc = job_config['description'] - print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( - i=job_index, - pri=job_priority, - job_id=job_id, - job_name=job_name, - )) - if self.full: - pprint.pprint(job_config) - elif job_desc and self.show_desc: - for desc in job_desc.split(): - print('\t {}'.format(desc)) - - -class RunPrinter(JobProcessor): - def __init__(self): - super(RunPrinter, self).__init__() - self.runs = list() - - def process_job(self, job_id): - run = self.jobs[job_id]['job_config']['name'] - if run not in self.runs: - self.runs.append(run) - print(run) - - -class JobDeleter(JobProcessor): - def __init__(self, pattern): - self.pattern = pattern - super(JobDeleter, self).__init__() - - def add_job(self, job_id, job_config, job_obj=None): - job_name = job_config['name'] - if self.pattern in job_name: - super(JobDeleter, self).add_job(job_id, job_config, job_obj) - - def process_job(self, job_id): - job_config = self.jobs[job_id]['job_config'] - job_name = job_config['name'] - print('Deleting {job_name}/{job_id}'.format( - job_id=job_id, - job_name=job_name, - )) - report.try_delete_jobs(job_name, job_id) - - -def main(args): - if config.backend == 'paddles': - paddles.main(args) - else: - beanstalk.main(args) \ No newline at end of file diff --git a/teuthology/queue/beanstalk.py b/teuthology/queue/beanstalk.py index 90b1cbd6d3..4aff445790 100644 --- a/teuthology/queue/beanstalk.py +++ b/teuthology/queue/beanstalk.py @@ -7,6 +7,7 @@ from teuthology.config import config from teuthology import report +from teuthology.queue.util import * log = logging.getLogger(__name__) diff --git a/teuthology/queue/paddles.py b/teuthology/queue/paddles.py index f2ea8b84c8..ba4ad4338f 100644 --- a/teuthology/queue/paddles.py +++ b/teuthology/queue/paddles.py @@ -5,7 +5,7 @@ from teuthology import report from teuthology.dispatcher import pause_queue - +from teuthology.queue.util import * log = logging.getLogger(__name__) @@ -14,19 +14,17 @@ def stats_queue(machine_type): stats = report.get_queue_stats(machine_type) if stats['paused'] is None: log.info("%s queue is currently running with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) else: log.info("%s queue is paused with %s jobs queued", - stats['name'], - stats['count']) + stats['queue'], + stats['queued_jobs']) -def update_priority(machine_type, priority, user, run_name=None): +def update_priority(machine_type, priority, run_name=None): if run_name is not None: - jobs = report.get_user_jobs_queue(machine_type, user, run_name) - else: - jobs = report.get_user_jobs_queue(machine_type, user) + jobs = report.get_jobs_by_run(machine_type, run_name) for job in jobs: job['priority'] = priority report.try_push_job_info(job) @@ -34,11 +32,11 @@ def update_priority(machine_type, priority, user, run_name=None): def walk_jobs(machine_type, processor, user): log.info("Checking paddles queue...") - job_count = report.get_queue_stats(machine_type)['count'] + job_count = report.get_queue_stats(machine_type)['queued_jobs'] jobs = report.get_user_jobs_queue(machine_type, user) if job_count == 0: - log.info('No jobs in queue') + log.info('No jobs in Paddles queue') return for i in range(1, job_count + 1): @@ -54,24 +52,23 @@ def walk_jobs(machine_type, processor, user): def main(args): machine_type = args['--machine_type'] - #user = args['--user'] - #run_name = args['--run_name'] - #priority = args['--priority'] + user = args['--user'] + run_name = args['--run-name'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] show_desc = args['--description'] full = args['--full'] pause_duration = args['--pause'] - #unpause = args['--unpause'] - #pause_duration = args['--time'] + priority = args['--priority'] try: if status: stats_queue(machine_type) if pause_duration: - pause_queue(machine_type, pause, user, pause_duration) - #else: - #pause_queue(machine_type, pause, user) + if not user: + log.info('Please enter user to pause Paddles queue') + return + report.pause_queue(machine_type, user, pause_duration) elif priority: update_priority(machine_type, priority, run_name) elif delete: diff --git a/teuthology/queue/util.py b/teuthology/queue/util.py new file mode 100644 index 0000000000..18f2d3307e --- /dev/null +++ b/teuthology/queue/util.py @@ -0,0 +1,99 @@ +import logging +import pprint +import sys +from collections import OrderedDict + +from teuthology import report +from teuthology.config import config + +log = logging.getLogger(__name__) + +def print_progress(index, total, message=None): + msg = "{m} ".format(m=message) if message else '' + sys.stderr.write("{msg}{i}/{total}\r".format( + msg=msg, i=index, total=total)) + sys.stderr.flush() + +def end_progress(): + sys.stderr.write('\n') + sys.stderr.flush() + +class JobProcessor(object): + def __init__(self): + self.jobs = OrderedDict() + + def add_job(self, job_id, job_config, job_obj=None): + job_id = str(job_id) + + job_dict = dict( + index=(len(self.jobs) + 1), + job_config=job_config, + ) + if job_obj: + job_dict['job_obj'] = job_obj + self.jobs[job_id] = job_dict + + self.process_job(job_id) + + def process_job(self, job_id): + pass + + def complete(self): + pass + + +class JobPrinter(JobProcessor): + def __init__(self, show_desc=False, full=False): + super(JobPrinter, self).__init__() + self.show_desc = show_desc + self.full = full + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_index = self.jobs[job_id]['index'] + job_priority = job_config['priority'] + job_name = job_config['name'] + job_desc = job_config['description'] + print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format( + i=job_index, + pri=job_priority, + job_id=job_id, + job_name=job_name, + )) + if self.full: + pprint.pprint(job_config) + elif job_desc and self.show_desc: + for desc in job_desc.split(): + print('\t {}'.format(desc)) + + +class RunPrinter(JobProcessor): + def __init__(self): + super(RunPrinter, self).__init__() + self.runs = list() + + def process_job(self, job_id): + run = self.jobs[job_id]['job_config']['name'] + if run not in self.runs: + self.runs.append(run) + print(run) + + +class JobDeleter(JobProcessor): + def __init__(self, pattern): + self.pattern = pattern + super(JobDeleter, self).__init__() + + def add_job(self, job_id, job_config, job_obj=None): + job_name = job_config['name'] + if self.pattern in job_name: + super(JobDeleter, self).add_job(job_id, job_config, job_obj) + + def process_job(self, job_id): + job_config = self.jobs[job_id]['job_config'] + job_name = job_config['name'] + print('Deleting {job_name}/{job_id}'.format( + job_id=job_id, + job_name=job_name, + )) + report.try_delete_jobs(job_name, job_id) diff --git a/teuthology/report.py b/teuthology/report.py index 9b11f18056..d21ba47f82 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -387,7 +387,7 @@ def last_run(self): def get_top_job(self, queue): - uri = "{base}/queue/pop_queue?queue_name={queue}".format(base=self.base_uri, + uri = "{base}/queue/pop_queue?queue={queue}".format(base=self.base_uri, queue=queue) inc = random.uniform(0, 1) with safe_while( @@ -541,13 +541,14 @@ def create_queue(self, queue): response.raise_for_status() - def update_queue(self, queue, paused, paused_by, pause_duration=None): + def update_queue(self, queue, paused_by, pause_duration=None): uri = "{base}/queue/".format( base=self.base_uri ) + if pause_duration is not None: pause_duration = int(pause_duration) - queue_info = {'queue': queue, 'paused': paused, 'paused_by': paused_by, + queue_info = {'queue': queue, 'paused_by': paused_by, 'pause_duration': pause_duration} queue_json = json.dumps(queue_info) headers = {'content-type': 'application/json'} @@ -590,9 +591,6 @@ def queue_stats(self, queue): response = self.session.post(uri, data=queue_json, headers=headers) if response.status_code == 200: - self.log.info("Successfully retrieved stats for queue {queue}".format( - queue=queue, - )) return response.json() else: msg = response.text @@ -664,12 +662,18 @@ def get_user_jobs_queue(queue, user, run_name=None): return return reporter.queued_jobs(queue, user, run_name) +def get_jobs_by_run(queue, run_name): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(queue, None, run_name) + -def pause_queue(queue, paused, paused_by, pause_duration=None): +def pause_queue(queue, paused_by, pause_duration=None): reporter = ResultsReporter() if not reporter.base_uri: return - reporter.update_queue(queue, paused, paused_by, pause_duration) + reporter.update_queue(queue, paused_by, pause_duration) def is_queue_paused(queue): diff --git a/teuthology/schedule.py b/teuthology/schedule.py index efcffa5dfc..cfeb595e84 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,7 +1,7 @@ import os import yaml -import teuthology.beanstalk +import teuthology.queue.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report @@ -114,7 +114,7 @@ def beanstalk_schedule_job(job_config, backend, num=1): """ num = int(num) tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() + beanstalk = teuthology.queue.beanstalk.connect() beanstalk.use(tube) queue = report.create_machine_type_queue(job_config['machine_type']) job_config['queue'] = queue