diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 669f125111..c766de4307 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -1,9 +1,9 @@ """ usage: teuthology-dispatcher --help - teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config COFNFIG --archive-dir DIR - teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --tube TUBE + teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR + teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE -Start a dispatcher for the specified tube. Grab jobs from a beanstalk +Start a dispatcher for the specified machine type. Grab jobs from a paddles queue and run the teuthology tests they describe as subprocesses. The subprocess invoked is a teuthology-dispatcher command run in supervisor mode. @@ -15,9 +15,9 @@ standard arguments: -h, --help show this help message and exit -v, --verbose be more verbose - -t, --tube TUBE which beanstalk tube to read jobs from -l, --log-dir LOG_DIR path in which to store logs -a DIR, --archive-dir DIR path to archive results in + --machine-type MACHINE_TYPE the machine type for the job --supervisor run dispatcher in job supervisor mode --bin-path BIN_PATH teuthology bin path --job-config CONFIG file descriptor of job's config file diff --git a/scripts/kill.py b/scripts/kill.py index 31acc8b1a4..e2a1a4ef09 100644 --- a/scripts/kill.py +++ b/scripts/kill.py @@ -12,7 +12,7 @@ teuthology-kill [-p] -o OWNER -m MACHINE_TYPE -r RUN Kill running teuthology jobs: -1. Removes any queued jobs from the beanstalk queue +1. Removes any queued jobs from the paddles queue 2. Kills any running jobs 3. Nukes any machines involved diff --git a/scripts/queue.py b/scripts/queue.py index 8ea5ca5c2c..285a0adac9 100644 --- a/scripts/queue.py +++ b/scripts/queue.py @@ -1,21 +1,22 @@ import docopt import teuthology.config -import teuthology.beanstalk +import teuthology.paddles_queue doc = """ usage: teuthology-queue -h - teuthology-queue [-s|-d|-f] -m MACHINE_TYPE - teuthology-queue [-r] -m MACHINE_TYPE - teuthology-queue -m MACHINE_TYPE -D PATTERN - teuthology-queue -p SECONDS [-m MACHINE_TYPE] + teuthology-queue -s -m MACHINE_TYPE + teuthology-queue [-d|-f] -m MACHINE_TYPE [-P PRIORITY] -u USER + teuthology-queue [-r] -m MACHINE_TYPE -u USER + teuthology-queue -m MACHINE_TYPE -D PATTERN -u USER + teuthology-queue -p SECONDS -m MACHINE_TYPE -u USER List Jobs in queue. If -D is passed, then jobs with PATTERN in the job name are deleted from the queue. Arguments: - -m, --machine_type MACHINE_TYPE [default: multi] + -m, --machine_type MACHINE_TYPE Which machine type queue to work on. optional arguments: @@ -28,9 +29,12 @@ -p, --pause SECONDS Pause queues for a number of seconds. A value of 0 will unpause. If -m is passed, pause that queue, otherwise pause all queues. + -P, --priority PRIORITY + Change priority of queued jobs + -u, --user USER User who owns the jobs """ def main(): args = docopt.docopt(doc) - teuthology.beanstalk.main(args) + teuthology.paddles_queue.main(args) diff --git a/scripts/schedule.py b/scripts/schedule.py index ffcb112b45..c3f9067d81 100644 --- a/scripts/schedule.py +++ b/scripts/schedule.py @@ -20,7 +20,7 @@ Queue backend name, use prefix '@' to append job config to the given file path as yaml. - [default: beanstalk] + [default: paddles] -n , --name Name of suite run the job is part of -d , --description Job description -o , --owner Job owner diff --git a/scripts/worker.py b/scripts/worker.py index a3e12c20d7..8d3228d8d0 100644 --- a/scripts/worker.py +++ b/scripts/worker.py @@ -9,7 +9,7 @@ def main(): def parse_args(): parser = argparse.ArgumentParser(description=""" -Grab jobs from a beanstalk queue and run the teuthology tests they +Grab jobs from a paddles queue and run the teuthology tests they describe. One job is run at a time. """) parser.add_argument( @@ -29,8 +29,8 @@ def parse_args(): required=True, ) parser.add_argument( - '-t', '--tube', - help='which beanstalk tube to read jobs from', + '-m', '--machine-type', + help='which machine type the jobs will run on', required=True, ) diff --git a/setup.py b/setup.py index 87c573a4e5..c4c33267ca 100644 --- a/setup.py +++ b/setup.py @@ -72,7 +72,6 @@ 'orchestra': [ # For apache-libcloud when using python < 2.7.9 'backports.ssl_match_hostname', - 'beanstalkc3 >= 0.4.0', 'httplib2', 'ndg-httpsclient', # for requests, urllib3 'pyasn1', # for requests, urllib3 diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py index f6eea469d9..06ce581e35 100644 --- a/teuthology/dispatcher/__init__.py +++ b/teuthology/dispatcher/__init__.py @@ -3,11 +3,11 @@ import subprocess import sys import yaml +import json from datetime import datetime from teuthology import setup_log_file, install_except_hook -from teuthology import beanstalk from teuthology import report from teuthology.config import config as teuth_config from teuthology.exceptions import SkipJob @@ -54,6 +54,12 @@ def load_config(archive_dir=None): else: teuth_config.archive_base = archive_dir +def clean_config(config): + result = {} + for key in config: + if config[key] is not None: + result[key] = config[key] + return result def main(args): # run dispatcher in job supervisor mode if --supervisor passed @@ -61,26 +67,26 @@ def main(args): return supervisor.main(args) verbose = args["--verbose"] - tube = args["--tube"] + machine_type = args["--machine-type"] log_dir = args["--log-dir"] archive_dir = args["--archive-dir"] if archive_dir is None: archive_dir = teuth_config.archive_base + if machine_type is None and teuth_config.machine_type is None: + return # setup logging for disoatcher in {log_dir} loglevel = logging.INFO if verbose: loglevel = logging.DEBUG log.setLevel(loglevel) - log_file_path = os.path.join(log_dir, f"dispatcher.{tube}.{os.getpid()}") + log_file_path = os.path.join(log_dir, f"dispatcher.{machine_type}.{os.getpid()}") setup_log_file(log_file_path) install_except_hook() load_config(archive_dir=archive_dir) - connection = beanstalk.connect() - beanstalk.watch_tube(connection, tube) result_proc = None if teuth_config.teuthology_path is None: @@ -103,18 +109,16 @@ def main(args): load_config() - job = connection.reserve(timeout=60) + job = report.get_queued_job(machine_type) if job is None: continue - - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) - + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job + if job_config.get('stop_worker'): keep_running = False @@ -164,12 +168,6 @@ def main(args): status='fail', failure_reason=error_message)) - # This try/except block is to keep the worker from dying when - # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") def lock_machines(job_config): diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py index 4ce9fa8531..5e03eb0315 100644 --- a/teuthology/dispatcher/supervisor.py +++ b/teuthology/dispatcher/supervisor.py @@ -120,7 +120,7 @@ def run_job(job_config, teuth_bin_path, archive_dir, verbose): '--archive', job_config['archive_path'], '--name', job_config['name'], ]) - if job_config['description'] is not None: + if 'description' in job_config: arg.extend(['--description', job_config['description']]) job_archive = os.path.join(job_config['archive_path'], 'orig.config.yaml') arg.extend(['--', job_archive]) diff --git a/teuthology/kill.py b/teuthology/kill.py index b8ae348cf4..623e2a0b9e 100755 --- a/teuthology/kill.py +++ b/teuthology/kill.py @@ -9,7 +9,6 @@ import getpass -from teuthology import beanstalk from teuthology import report from teuthology.config import config from teuthology import misc @@ -59,7 +58,6 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None, "you must also pass --machine-type") if not preserve_queue: - remove_beanstalk_jobs(run_name, machine_type) remove_paddles_jobs(run_name) kill_processes(run_name, run_info.get('pids')) if owner is not None: @@ -101,7 +99,6 @@ def find_run_info(serializer, run_name): if not os.path.isdir(job_dir): continue job_num += 1 - beanstalk.print_progress(job_num, job_total, 'Reading Job: ') job_info = serializer.job_info(run_name, job_id, simple=True) for key in job_info.keys(): if key in run_info_fields and key not in run_info: @@ -120,41 +117,6 @@ def remove_paddles_jobs(run_name): report.try_delete_jobs(run_name, job_ids) -def remove_beanstalk_jobs(run_name, tube_name): - qhost = config.queue_host - qport = config.queue_port - if qhost is None or qport is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.yaml_path)) - log.info("Checking Beanstalk Queue...") - beanstalk_conn = beanstalk.connect() - real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name) - - curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready'] - if curjobs != 0: - x = 1 - while x != curjobs: - x += 1 - job = beanstalk_conn.reserve(timeout=20) - if job is None: - continue - job_config = yaml.safe_load(job.body) - if run_name == job_config['name']: - job_id = job.stats()['id'] - msg = "Deleting job from queue. ID: " + \ - "{id} Name: {name} Desc: {desc}".format( - id=str(job_id), - name=job_config['name'], - desc=job_config['description'], - ) - log.info(msg) - job.delete() - else: - print("No jobs in Beanstalk Queue") - beanstalk_conn.close() - - def kill_processes(run_name, pids=None): if pids: to_kill = set(pids).intersection(psutil.pids()) diff --git a/teuthology/beanstalk.py b/teuthology/paddles_queue.py similarity index 57% rename from teuthology/beanstalk.py rename to teuthology/paddles_queue.py index a1165becca..74ca185ab9 100644 --- a/teuthology/beanstalk.py +++ b/teuthology/paddles_queue.py @@ -1,5 +1,3 @@ -import beanstalkc -import yaml import logging import pprint import sys @@ -8,57 +6,32 @@ from teuthology.config import config from teuthology import report + log = logging.getLogger(__name__) -def connect(): - host = config.queue_host - port = config.queue_port - if host is None or port is None: - raise RuntimeError( - 'Beanstalk queue information not found in {conf_path}'.format( - conf_path=config.teuthology_yaml)) - return beanstalkc.Connection(host=host, port=port) - - -def watch_tube(connection, tube_name): - """ - Watch a given tube, potentially correcting to 'multi' if necessary. Returns - the tube_name that was actually used. - """ - if ',' in tube_name: - log.debug("Correcting tube name to 'multi'") - tube_name = 'multi' - connection.watch(tube_name) - connection.ignore('default') - return tube_name - - -def walk_jobs(connection, tube_name, processor, pattern=None): - """ - def callback(jobs_dict) - """ - log.info("Checking Beanstalk Queue...") - job_count = connection.stats_tube(tube_name)['current-jobs-ready'] - if job_count == 0: - log.info('No jobs in Beanstalk Queue') - return +def stats_queue(machine_type): + stats = report.get_queue_stats(machine_type) + stats = report.get_queue_stats(machine_type) + if stats['paused'] is None: + log.info("%s queue is currently running with %s jobs queued", + stats['name'], + stats['count']) + else: + log.info("%s queue is paused with %s jobs queued", + stats['name'], + stats['count']) - # Try to figure out a sane timeout based on how many jobs are in the queue - timeout = job_count / 2000.0 * 60 - for i in range(1, job_count + 1): - print_progress(i, job_count, "Loading") - job = connection.reserve(timeout=timeout) - if job is None or job.body is None: - continue - job_config = yaml.safe_load(job.body) - job_name = job_config['name'] - job_id = job.stats()['id'] - if pattern is not None and pattern not in job_name: - continue - processor.add_job(job_id, job_config, job) - end_progress() - processor.complete() + +def update_priority(machine_type, priority, user): + jobs = report.get_user_jobs_queue(machine_type, user) + for job in jobs: + job['priority'] = priority + report.try_push_job_info(job) + + +def pause_queue(machine_type, pause_duration, paused_by): + report.pause_queue(machine_type, paused_by, pause_duration) def print_progress(index, total, message=None): @@ -73,6 +46,29 @@ def end_progress(): sys.stderr.flush() +def walk_jobs(machine_type, processor, user): + log.info("Checking paddles queue...") + job_count = report.get_queue_stats(machine_type)['count'] + + jobs = report.get_user_jobs_queue(machine_type, user) + if job_count == 0: + log.info('No jobs in queue') + return + + # Try to figure out a sane timeout based on how many jobs are in the queue + timeout = job_count / 2000.0 * 60 + for i in range(1, job_count + 1): + print_progress(i, job_count, "Loading") + job = jobs[i-1] + if job is None: + continue + job_name = job['name'] + job_id = job['job_id'] + processor.add_job(job_id, job) + end_progress() + processor.complete() + + class JobProcessor(object): def __init__(self): self.jobs = OrderedDict() @@ -151,38 +147,13 @@ def process_job(self, job_id): job_id=job_id, job_name=job_name, )) - job_obj = self.jobs[job_id].get('job_obj') - if job_obj: - job_obj.delete() report.try_delete_jobs(job_name, job_id) -def pause_tube(connection, tube, duration): - duration = int(duration) - if not tube: - tubes = sorted(connection.tubes()) - else: - tubes = [tube] - - prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s" - templ = prefix + ": {tubes}" - log.info(templ.format(dur=duration, tubes=tubes)) - for tube in tubes: - connection.pause_tube(tube, duration) - - -def stats_tube(connection, tube): - stats = connection.stats_tube(tube) - result = dict( - name=tube, - count=stats['current-jobs-ready'], - paused=(stats['pause'] != 0), - ) - return result - - def main(args): machine_type = args['--machine_type'] + user = args['--user'] + priority = args['--priority'] status = args['--status'] delete = args['--delete'] runs = args['--runs'] @@ -190,25 +161,21 @@ def main(args): full = args['--full'] pause_duration = args['--pause'] try: - connection = connect() - if machine_type and not pause_duration: - # watch_tube needs to be run before we inspect individual jobs; - # it is not needed for pausing tubes - watch_tube(connection, machine_type) if status: - print(stats_tube(connection, machine_type)) + stats_queue(machine_type) elif pause_duration: - pause_tube(connection, machine_type, pause_duration) + pause_queue(machine_type, pause_duration, user) + elif priority: + update_priority(machine_type, priority, user) elif delete: - walk_jobs(connection, machine_type, - JobDeleter(delete)) + walk_jobs(machine_type, + JobDeleter(delete), user) elif runs: - walk_jobs(connection, machine_type, - RunPrinter()) + walk_jobs(machine_type, + RunPrinter(), user) else: - walk_jobs(connection, machine_type, - JobPrinter(show_desc=show_desc, full=full)) + walk_jobs(machine_type, + JobPrinter(show_desc=show_desc, full=full), + user) except KeyboardInterrupt: log.info("Interrupted.") - finally: - connection.close() diff --git a/teuthology/report.py b/teuthology/report.py index 2d06356772..df7da03c92 100644 --- a/teuthology/report.py +++ b/teuthology/report.py @@ -6,6 +6,7 @@ import logging import random import socket +import threading from datetime import datetime import teuthology @@ -262,6 +263,39 @@ def report_run(self, run_name, dead=False): self.log.debug(" no jobs; skipped") return len(jobs) + def write_new_job(self, run_name, job_info): + """ + Report a new job to the results server. + + :param run_name: The name of the run. The run must already exist. + :param job_info: The job's info dict. Must be present since this is a new job + """ + if job_info is None or not isinstance(job_info, dict): + raise TypeError("Job info must be a dict") + run_uri = "{base}/runs/{name}/jobs/".format( + base=self.base_uri, name=run_name, + ) + job_json = json.dumps(job_info) + headers = {'content-type': 'application/json'} + response = self.session.post(run_uri, data=job_json, headers=headers) + + if response.status_code == 200: + resp_json = response.json() + job_id = resp_json['job_id'] + return job_id + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=run_uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + return None + + def report_jobs(self, run_name, job_ids, dead=False): """ Report several jobs to the results server. @@ -291,12 +325,13 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): set_status(job_info, 'dead') job_json = json.dumps(job_info) headers = {'content-type': 'application/json'} + job_uri = os.path.join(run_uri, job_id, '') inc = random.uniform(0, 1) with safe_while( sleep=1, increment=inc, action=f'report job {job_id}') as proceed: while proceed(): - response = self.session.post(run_uri, data=job_json, headers=headers) + response = self.session.put(job_uri, data=job_json, headers=headers) if response.status_code == 200: return @@ -313,15 +348,9 @@ def report_job(self, run_name, job_id, job_info=None, dead=False): else: msg = response.text - if msg and msg.endswith('already exists'): - job_uri = os.path.join(run_uri, job_id, '') - response = self.session.put(job_uri, data=job_json, - headers=headers) - if response.status_code == 200: - return - elif msg: + if msg: self.log.error( - "POST to {uri} failed with status {status}: {msg}".format( + "PUT to {uri} failed with status {status}: {msg}".format( uri=run_uri, status=response.status_code, msg=msg, @@ -351,6 +380,14 @@ def last_run(self): self.__last_run = None if os.path.exists(self.last_run_file): os.remove(self.last_run_file) + + def get_top_job(self, machine_type): + + uri = "{base}/queue/pop_queue?machine_type={machine_type}".format(base=self.base_uri, + machine_type=machine_type) + response = self.session.get(uri) + response.raise_for_status() + return response.json() def get_jobs(self, run_name, job_id=None, fields=None): """ @@ -453,6 +490,164 @@ def delete_run(self, run_name): response = self.session.delete(uri) response.raise_for_status() + def create_queue(self, machine_type): + """ + Create a queue on the results server + + :param machine_type: The machine type specified for the job + """ + uri = "{base}/queue/".format( + base=self.base_uri + ) + queue_info = {'machine_type': machine_type} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully created queue for {machine_type}".format( + machine_type=machine_type, + )) + else: + resp_json = response.json() + if resp_json: + msg = resp_json.get('message', '') + else: + msg = response.text + if msg and msg.endswith('already exists'): + return + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + def update_queue(self, machine_type, paused, paused_by=None, pause_duration=None): + uri = "{base}/queue/".format( + base=self.base_uri + ) + if pause_duration is not None: + pause_duration = int(pause_duration) + queue_info = {'machine_type': machine_type, 'paused': paused, 'paused_by': paused_by, + 'pause_duration': pause_duration} + queue_json = json.dumps(queue_info) + headers = {'content-type': 'application/json'} + response = self.session.put(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully updated queue for {machine_type}".format( + machine_type=machine_type, + )) + else: + msg = response.text + self.log.error( + "PUT to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + + response.raise_for_status() + + + def queue_stats(self, machine_type): + uri = "{base}/queue/stats/".format( + base=self.base_uri + ) + queue_info = {'machine_type': machine_type} + queue_json = json.dumps(queue_info) + + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=queue_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully retrieved stats for queue {machine_type}".format( + machine_type=machine_type, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + def queued_jobs(self, machine_type, user): + uri = "{base}/queue/queued_jobs/".format( + base=self.base_uri + ) + request_info = {'machine_type': machine_type, + 'user': user} + request_json = json.dumps(request_info) + headers = {'content-type': 'application/json'} + response = self.session.post(uri, data=request_json, headers=headers) + + if response.status_code == 200: + self.log.info("Successfully retrieved jobs for user {user}".format( + user=user, + )) + return response.json() + else: + msg = response.text + self.log.error( + "POST to {uri} failed with status {status}: {msg}".format( + uri=uri, + status=response.status_code, + msg=msg, + )) + response.raise_for_status() + + +def create_machine_type_queue(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + reporter.create_queue(machine_type) + + +def get_user_jobs_queue(machine_type, user): + reporter = ResultsReporter() + if not reporter.base_uri: + return + return reporter.queued_jobs(machine_type, user) + + +def pause_queue(machine_type, paused_by, pause_duration): + reporter = ResultsReporter() + if not reporter.base_uri: + return + paused = True + reporter.update_queue(machine_type, paused, paused_by, pause_duration) + paused = False + timer = threading.Timer(int(pause_duration), reporter.update_queue, [machine_type, paused, paused_by]) + timer.daemon = True + timer.start() + timer.join() + + +def is_queue_paused(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(machine_type) + if stats['paused'] != 0 and stats['paused'] is not None: + return True + return False + + +def get_queue_stats(machine_type): + reporter = ResultsReporter() + if not reporter.base_uri: + return + stats = reporter.queue_stats(machine_type) + return stats + def push_job_info(run_name, job_id, job_info, base_uri=None): """ @@ -470,6 +665,23 @@ def push_job_info(run_name, job_id, job_info, base_uri=None): reporter.report_job(run_name, job_id, job_info) +def get_queued_job(machine_type): + """ + Retrieve a job that is queued depending on priority + + """ + log = init_logging() + reporter = ResultsReporter() + if not reporter.base_uri: + return + if is_queue_paused(machine_type) == True: + log.info("Teuthology queue for machine type %s is currently paused", + machine_type) + return None + else: + return reporter.get_top_job(machine_type) + + def try_push_job_info(job_config, extra_info=None): """ Wrap push_job_info, gracefully doing nothing if: @@ -509,6 +721,36 @@ def try_push_job_info(job_config, extra_info=None): config.results_server) +def try_create_job(job_config, extra_info=None): + log = init_logging() + + if not config.results_server: + log.warning('No results_server in config; not reporting results') + return + + reporter = ResultsReporter() + if not reporter.base_uri: + return + + run_name = job_config['name'] + + if extra_info is not None: + job_info = extra_info.copy() + job_info.update(job_config) + else: + job_info = job_config + + try: + log.debug("Writing job info to %s", config.results_server) + job_id = reporter.write_new_job(run_name, job_info) + log.info("Job ID: %s", job_id) + if job_id is not None: + return job_id + except report_exceptions: + log.exception("Could not report results to %s", + config.results_server) + + def try_delete_jobs(run_name, job_ids, delete_empty_run=True): """ Using the same error checking and retry mechanism as try_push_job_info(), diff --git a/teuthology/schedule.py b/teuthology/schedule.py index 01c998367e..5fbc7601a4 100644 --- a/teuthology/schedule.py +++ b/teuthology/schedule.py @@ -1,9 +1,9 @@ import os import yaml -import teuthology.beanstalk from teuthology.misc import get_user, merge_configs from teuthology import report +from teuthology.config import config def main(args): @@ -35,13 +35,13 @@ def main(args): backend = args['--queue-backend'] if args['--dry-run']: print('---\n' + yaml.safe_dump(job_config)) - elif backend == 'beanstalk': + elif backend == 'paddles': schedule_job(job_config, args['--num'], report_status) elif backend.startswith('@'): dump_job_to_file(backend.lstrip('@'), job_config, args['--num']) else: raise ValueError("Provided schedule backend '%s' is not supported. " - "Try 'beanstalk' or '@path-to-a-file" % backend) + "Try 'paddles' or '@path-to-a-file" % backend) def build_config(args): @@ -95,22 +95,19 @@ def schedule_job(job_config, num=1, report_status=True): """ num = int(num) job = yaml.safe_dump(job_config) - tube = job_config.pop('tube') - beanstalk = teuthology.beanstalk.connect() - beanstalk.use(tube) + + ''' + Add 'machine_type' queue to DB here. + ''' + report.create_machine_type_queue(job_config['machine_type']) while num > 0: - jid = beanstalk.put( - job, - ttr=60 * 60 * 24, - priority=job_config['priority'], - ) - print('Job scheduled with name {name} and ID {jid}'.format( - name=job_config['name'], jid=jid)) - job_config['job_id'] = str(jid) - if report_status: - report.try_push_job_info(job_config, dict(status='queued')) - num -= 1 + job_id = report.try_create_job(job_config, dict(status='queued')) + print('Job scheduled with name {name} and ID {job_id}'.format( + name=job_config['name'], job_id=job_id)) + job_config['job_id'] = str(job_id) + + num -= 1 def dump_job_to_file(path, job_config, num=1): """ diff --git a/teuthology/worker.py b/teuthology/worker.py index 1437083e60..ff68acf1c3 100644 --- a/teuthology/worker.py +++ b/teuthology/worker.py @@ -9,7 +9,6 @@ from datetime import datetime from teuthology import setup_log_file, install_except_hook -from teuthology import beanstalk from teuthology import report from teuthology import safepath from teuthology.config import config as teuth_config @@ -58,14 +57,22 @@ def load_config(ctx=None): teuth_config.archive_base = ctx.archive_dir +def clean_config(config): + result = {} + for key in config: + if config[key] is not None: + result[key] = config[key] + return result + + def main(ctx): loglevel = logging.INFO if ctx.verbose: loglevel = logging.DEBUG log.setLevel(loglevel) - log_file_path = os.path.join(ctx.log_dir, 'worker.{tube}.{pid}'.format( - pid=os.getpid(), tube=ctx.tube,)) + log_file_path = os.path.join(ctx.log_dir, 'worker.{machine_type}.{pid}'.format( + pid=os.getpid(), machine_type=ctx.machine_type,)) setup_log_file(log_file_path) install_except_hook() @@ -74,8 +81,6 @@ def main(ctx): set_config_attr(ctx) - connection = beanstalk.connect() - beanstalk.watch_tube(connection, ctx.tube) result_proc = None if teuth_config.teuthology_path is None: @@ -98,17 +103,16 @@ def main(ctx): load_config() - job = connection.reserve(timeout=60) + job = report.get_queued_job(ctx.machine_type) if job is None: continue - # bury the job so it won't be re-run if it fails - job.bury() - job_id = job.jid - log.info('Reserved job %d', job_id) - log.info('Config is: %s', job.body) - job_config = yaml.safe_load(job.body) - job_config['job_id'] = str(job_id) + job = clean_config(job) + report.try_push_job_info(job, dict(status='running')) + job_id = job.get('job_id') + log.info('Reserved job %s', job_id) + log.info('Config is: %s', job) + job_config = job if job_config.get('stop_worker'): keep_running = False @@ -128,13 +132,6 @@ def main(ctx): except SkipJob: continue - # This try/except block is to keep the worker from dying when - # beanstalkc throws a SocketError - try: - job.delete() - except Exception: - log.exception("Saw exception while trying to delete job") - def prep_job(job_config, log_file_path, archive_dir): job_id = job_config['job_id']