diff --git a/scripts/beanstalk_queue.py b/scripts/beanstalk_queue.py
new file mode 100644
index 0000000000..88a8242847
--- /dev/null
+++ b/scripts/beanstalk_queue.py
@@ -0,0 +1,35 @@
+import docopt
+
+import teuthology.config
+import teuthology.beanstalk
+
+doc = """
+usage: teuthology-beanstalk-queue -h
+       teuthology-beanstalk-queue [-s|-d|-f] -m MACHINE_TYPE
+       teuthology-beanstalk-queue [-r] -m MACHINE_TYPE
+       teuthology-beanstalk-queue -m MACHINE_TYPE -D PATTERN
+       teuthology-beanstalk-queue -p SECONDS [-m MACHINE_TYPE]
+List Jobs in queue.
+If -D is passed, then jobs with PATTERN in the job name are deleted from the
+queue.
+Arguments:
+  -m, --machine_type MACHINE_TYPE [default: multi]
+                        Which machine type queue to work on.
+optional arguments:
+  -h, --help            Show this help message and exit
+  -D, --delete PATTERN  Delete Jobs with PATTERN in their name
+  -d, --description     Show job descriptions
+  -r, --runs            Only show run names
+  -f, --full            Print the entire job config. Use with caution.
+  -s, --status          Prints the status of the queue
+  -p, --pause SECONDS   Pause queues for a number of seconds. A value of 0
+                        will unpause. If -m is passed, pause that queue,
+                        otherwise pause all queues.
+"""
+
+
+def main():
+
+    args = docopt.docopt(doc)
+    print(args)
+    teuthology.beanstalk.main(args)
diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py
index c766de4307..21df578429 100644
--- a/scripts/dispatcher.py
+++ b/scripts/dispatcher.py
@@ -1,9 +1,9 @@
 """
 usage: teuthology-dispatcher --help
        teuthology-dispatcher --supervisor [-v] --bin-path BIN_PATH --job-config CONFIG --archive-dir DIR
-       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE
+       teuthology-dispatcher [-v] [--archive-dir DIR] --log-dir LOG_DIR --machine-type MACHINE_TYPE --queue-backend BACKEND
 
-Start a dispatcher for the specified machine type. Grab jobs from a paddles
+Start a dispatcher for the specified machine type. Grab jobs from a paddles/beanstalk
 queue and run the teuthology tests they describe as subprocesses. The
 subprocess invoked is a teuthology-dispatcher command run in supervisor
 mode.
@@ -21,6 +21,7 @@
   --supervisor                   run dispatcher in job supervisor mode
   --bin-path BIN_PATH            teuthology bin path
   --job-config CONFIG            file descriptor of job's config file
+  --queue-backend BACKEND        choose between paddles and beanstalk
 """
 
 import docopt
diff --git a/scripts/queue.py b/scripts/paddles_queue.py
similarity index 71%
rename from scripts/queue.py
rename to scripts/paddles_queue.py
index a07598a92f..6ecfe43406 100644
--- a/scripts/queue.py
+++ b/scripts/paddles_queue.py
@@ -4,14 +4,14 @@
 import teuthology.paddles_queue
 
 doc = """
-usage: teuthology-queue -h
-       teuthology-queue -s -m MACHINE_TYPE
-       teuthology-queue [-d|-f] -m MACHINE_TYPE -U USER
-       teuthology-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
-       teuthology-queue [-r] -m MACHINE_TYPE -U USER
-       teuthology-queue -m MACHINE_TYPE -D PATTERN -U USER
-       teuthology-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
-       teuthology-queue -u -m MACHINE_TYPE -U USER
+usage: teuthology-paddles-queue -h
+       teuthology-paddles-queue -s -m MACHINE_TYPE
+       teuthology-paddles-queue [-d|-f] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -m MACHINE_TYPE -P PRIORITY [-U USER|-R RUN_NAME]
+       teuthology-paddles-queue [-r] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -m MACHINE_TYPE -D PATTERN -U USER
+       teuthology-paddles-queue -p [-t SECONDS] -m MACHINE_TYPE -U USER
+       teuthology-paddles-queue -u -m MACHINE_TYPE -U USER
 
 List Jobs in queue.
 If -D is passed, then jobs with PATTERN in the job name are deleted from the
diff --git a/setup.py b/setup.py
index c4c33267ca..674f75a282 100644
--- a/setup.py
+++ b/setup.py
@@ -72,6 +72,7 @@
         'orchestra': [
             # For apache-libcloud when using python < 2.7.9
             'backports.ssl_match_hostname',
+            'beanstalkc3 >= 0.4.0',
             'httplib2',
             'ndg-httpsclient',  # for requests, urllib3
             'pyasn1',           # for requests, urllib3
@@ -125,7 +126,8 @@
             'teuthology-results = scripts.results:main',
             'teuthology-report = scripts.report:main',
             'teuthology-kill = scripts.kill:main',
-            'teuthology-queue = scripts.queue:main',
+            'teuthology-paddles-queue = scripts.paddles_queue:main',
+            'teuthology-beanstalk-queue = scripts.beanstalk_queue:main',
             'teuthology-prune-logs = scripts.prune_logs:main',
             'teuthology-describe = scripts.describe:main',
             'teuthology-reimage = scripts.reimage:main',
diff --git a/teuthology/beanstalk.py b/teuthology/beanstalk.py
new file mode 100644
index 0000000000..a1165becca
--- /dev/null
+++ b/teuthology/beanstalk.py
@@ -0,0 +1,214 @@
+import beanstalkc
+import yaml
+import logging
+import pprint
+import sys
+from collections import OrderedDict
+
+from teuthology.config import config
+from teuthology import report
+
+log = logging.getLogger(__name__)
+
+
+def connect():
+    host = config.queue_host
+    port = config.queue_port
+    if host is None or port is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.teuthology_yaml))
+    return beanstalkc.Connection(host=host, port=port)
+
+
+def watch_tube(connection, tube_name):
+    """
+    Watch a given tube, potentially correcting to 'multi' if necessary. Returns
+    the tube_name that was actually used.
+    """
+    if ',' in tube_name:
+        log.debug("Correcting tube name to 'multi'")
+        tube_name = 'multi'
+    connection.watch(tube_name)
+    connection.ignore('default')
+    return tube_name
+
+
+def walk_jobs(connection, tube_name, processor, pattern=None):
+    """
+    def callback(jobs_dict)
+    """
+    log.info("Checking Beanstalk Queue...")
+    job_count = connection.stats_tube(tube_name)['current-jobs-ready']
+    if job_count == 0:
+        log.info('No jobs in Beanstalk Queue')
+        return
+
+    # Try to figure out a sane timeout based on how many jobs are in the queue
+    timeout = job_count / 2000.0 * 60
+    for i in range(1, job_count + 1):
+        print_progress(i, job_count, "Loading")
+        job = connection.reserve(timeout=timeout)
+        if job is None or job.body is None:
+            continue
+        job_config = yaml.safe_load(job.body)
+        job_name = job_config['name']
+        job_id = job.stats()['id']
+        if pattern is not None and pattern not in job_name:
+            continue
+        processor.add_job(job_id, job_config, job)
+    end_progress()
+    processor.complete()
+
+
+def print_progress(index, total, message=None):
+    msg = "{m} ".format(m=message) if message else ''
+    sys.stderr.write("{msg}{i}/{total}\r".format(
+        msg=msg, i=index, total=total))
+    sys.stderr.flush()
+
+
+def end_progress():
+    sys.stderr.write('\n')
+    sys.stderr.flush()
+
+
+class JobProcessor(object):
+    def __init__(self):
+        self.jobs = OrderedDict()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_id = str(job_id)
+
+        job_dict = dict(
+            index=(len(self.jobs) + 1),
+            job_config=job_config,
+        )
+        if job_obj:
+            job_dict['job_obj'] = job_obj
+        self.jobs[job_id] = job_dict
+
+        self.process_job(job_id)
+
+    def process_job(self, job_id):
+        pass
+
+    def complete(self):
+        pass
+
+
+class JobPrinter(JobProcessor):
+    def __init__(self, show_desc=False, full=False):
+        super(JobPrinter, self).__init__()
+        self.show_desc = show_desc
+        self.full = full
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_index = self.jobs[job_id]['index']
+        job_priority = job_config['priority']
+        job_name = job_config['name']
+        job_desc = job_config['description']
+        print('Job: {i:>4} priority: {pri:>4} {job_name}/{job_id}'.format(
+            i=job_index,
+            pri=job_priority,
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        if self.full:
+            pprint.pprint(job_config)
+        elif job_desc and self.show_desc:
+            for desc in job_desc.split():
+                print('\t {}'.format(desc))
+
+
+class RunPrinter(JobProcessor):
+    def __init__(self):
+        super(RunPrinter, self).__init__()
+        self.runs = list()
+
+    def process_job(self, job_id):
+        run = self.jobs[job_id]['job_config']['name']
+        if run not in self.runs:
+            self.runs.append(run)
+            print(run)
+
+
+class JobDeleter(JobProcessor):
+    def __init__(self, pattern):
+        self.pattern = pattern
+        super(JobDeleter, self).__init__()
+
+    def add_job(self, job_id, job_config, job_obj=None):
+        job_name = job_config['name']
+        if self.pattern in job_name:
+            super(JobDeleter, self).add_job(job_id, job_config, job_obj)
+
+    def process_job(self, job_id):
+        job_config = self.jobs[job_id]['job_config']
+        job_name = job_config['name']
+        print('Deleting {job_name}/{job_id}'.format(
+            job_id=job_id,
+            job_name=job_name,
+            ))
+        job_obj = self.jobs[job_id].get('job_obj')
+        if job_obj:
+            job_obj.delete()
+        report.try_delete_jobs(job_name, job_id)
+
+
+def pause_tube(connection, tube, duration):
+    duration = int(duration)
+    if not tube:
+        tubes = sorted(connection.tubes())
+    else:
+        tubes = [tube]
+
+    prefix = 'Unpausing' if duration == 0 else "Pausing for {dur}s"
+    templ = prefix + ": {tubes}"
+    log.info(templ.format(dur=duration, tubes=tubes))
+    for tube in tubes:
+        connection.pause_tube(tube, duration)
+
+
+def stats_tube(connection, tube):
+    stats = connection.stats_tube(tube)
+    result = dict(
+        name=tube,
+        count=stats['current-jobs-ready'],
+        paused=(stats['pause'] != 0),
+    )
+    return result
+
+
+def main(args):
+    machine_type = args['--machine_type']
+    status = args['--status']
+    delete = args['--delete']
+    runs = args['--runs']
+    show_desc = args['--description']
+    full = args['--full']
+    pause_duration = args['--pause']
+    try:
+        connection = connect()
+        if machine_type and not pause_duration:
+            # watch_tube needs to be run before we inspect individual jobs;
+            # it is not needed for pausing tubes
+            watch_tube(connection, machine_type)
+        if status:
+            print(stats_tube(connection, machine_type))
+        elif pause_duration:
+            pause_tube(connection, machine_type, pause_duration)
+        elif delete:
+            walk_jobs(connection, machine_type,
+                      JobDeleter(delete))
+        elif runs:
+            walk_jobs(connection, machine_type,
+                      RunPrinter())
+        else:
+            walk_jobs(connection, machine_type,
+                      JobPrinter(show_desc=show_desc, full=full))
+    except KeyboardInterrupt:
+        log.info("Interrupted.")
+    finally:
+        connection.close()
diff --git a/teuthology/config.py b/teuthology/config.py
index 18e26d2cc3..4a2c7b13d9 100644
--- a/teuthology/config.py
+++ b/teuthology/config.py
@@ -140,6 +140,7 @@ class TeuthologyConfig(YamlConfig):
         'archive_upload_key': None,
         'archive_upload_url': None,
         'automated_scheduling': False,
+        'backend': 'paddles',
         'reserve_machines': 5,
         'ceph_git_base_url': 'https://github.com/ceph/',
         'ceph_git_url': None,
diff --git a/teuthology/dispatcher/__init__.py b/teuthology/dispatcher/__init__.py
index ce4144307c..718772f2eb 100644
--- a/teuthology/dispatcher/__init__.py
+++ b/teuthology/dispatcher/__init__.py
@@ -8,6 +8,7 @@
 from time import sleep
 
 from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
 from teuthology import report
 from teuthology.config import config as teuth_config
 from teuthology.exceptions import SkipJob
@@ -57,6 +58,8 @@ def load_config(archive_dir=None):
 def clean_config(config):
     result = {}
     for key in config:
+        if key == 'status':
+            continue
         if config[key] is not None:
             result[key] = config[key]
     return result
@@ -70,6 +73,7 @@ def main(args):
     machine_type = args["--machine-type"]
     log_dir = args["--log-dir"]
     archive_dir = args["--archive-dir"]
+    backend = args['--queue-backend']
 
     if archive_dir is None:
         archive_dir = teuth_config.archive_base
@@ -87,6 +91,10 @@ def main(args):
 
     load_config(archive_dir=archive_dir)
 
+    if backend == 'beanstalk':
+        connection = beanstalk.connect()
+        beanstalk.watch_tube(connection, machine_type)
+
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -108,17 +116,26 @@ def main(args):
             stop()
 
         load_config()
+        if backend == 'beanstalk':
+            job = connection.reserve(timeout=60)
+            if job is None:
+                continue
+            job.bury()
+            job_config = yaml.safe_load(job.body)
+            job_id = job_config.get('job_id')
+            log.info('Reserved job %s', job_id)
+            log.info('Config is: %s', job.body)
+        else:
+            job = report.get_queued_job(machine_type)
+            if job is None:
+                continue
+            job = clean_config(job)
+            report.try_push_job_info(job, dict(status='running'))
+            job_id = job.get('job_id')
+            log.info('Reserved job %s', job_id)
+            log.info('Config is: %s', job)
+            job_config = job
 
-        job = report.get_queued_job(machine_type)
-        if job is None:
-            continue
-        job = clean_config(job)
-        report.try_push_job_info(job, dict(status='running'))
-        job_id = job.get('job_id')
-        log.info('Reserved job %s', job_id)
-        log.info('Config is: %s', job)
-        job_config = job
-        
         if job_config.get('stop_worker'):
             keep_running = False
 
@@ -168,6 +185,13 @@ def main(args):
                 status='fail',
                 failure_reason=error_message))
 
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        if backend == 'beanstalk':
+            try:
+                job.delete()
+            except Exception:
+                log.exception("Saw exception while trying to delete job")
 
 
 def lock_machines(job_config):
@@ -189,7 +213,7 @@ def create_job_archive(job_name, job_archive_path, archive_dir):
 
 
 def pause_queue(machine_type, paused, paused_by, pause_duration=None):
-    if paused == True:
+    if paused:
         report.pause_queue(machine_type, paused, paused_by, pause_duration)
         '''
         If there is a pause duration specified
@@ -199,5 +223,5 @@ def pause_queue(machine_type, paused, paused_by, pause_duration=None):
             sleep(int(pause_duration))
             paused = False
             report.pause_queue(machine_type, paused, paused_by)
-    elif paused == False:
+    elif not paused:
         report.pause_queue(machine_type, paused, paused_by)
diff --git a/teuthology/dispatcher/supervisor.py b/teuthology/dispatcher/supervisor.py
index 5e03eb0315..31769dab35 100644
--- a/teuthology/dispatcher/supervisor.py
+++ b/teuthology/dispatcher/supervisor.py
@@ -65,6 +65,12 @@ def main(args):
 def run_job(job_config, teuth_bin_path, archive_dir, verbose):
     safe_archive = safepath.munge(job_config['name'])
     if job_config.get('first_in_suite') or job_config.get('last_in_suite'):
+        if teuth_config.results_server:
+            try:
+                report.try_delete_jobs(job_config['name'], job_config['job_id'])
+            except Exception as e:
+                log.warning("Unable to delete job %s, exception occurred: %s",
+                            job_config['job_id'], e)
         job_archive = os.path.join(archive_dir, safe_archive)
         args = [
             os.path.join(teuth_bin_path, 'teuthology-results'),
diff --git a/teuthology/kill.py b/teuthology/kill.py
index ac00795d17..cfa90e3e3a 100755
--- a/teuthology/kill.py
+++ b/teuthology/kill.py
@@ -8,8 +8,9 @@
 import logging
 import getpass
 
-
+from teuthology import beanstalk
 from teuthology import report
+from teuthology.config import config
 from teuthology import misc
 
 log = logging.getLogger(__name__)
@@ -57,6 +58,7 @@ def kill_run(run_name, archive_base=None, owner=None, machine_type=None,
                                "you must also pass --machine-type")
 
     if not preserve_queue:
+        remove_beanstalk_jobs(run_name, machine_type)
         remove_paddles_jobs(run_name)
     kill_processes(run_name, run_info.get('pids'))
     if owner is not None:
@@ -93,10 +95,13 @@ def find_run_info(serializer, run_name):
     job_info = {}
     job_num = 0
     jobs = serializer.jobs_for_run(run_name)
+    job_total = len(jobs)
     for (job_id, job_dir) in jobs.items():
         if not os.path.isdir(job_dir):
             continue
         job_num += 1
+        if config.backend == 'beanstalk':
+            beanstalk.print_progress(job_num, job_total, 'Reading Job: ')
         job_info = serializer.job_info(run_name, job_id, simple=True)
         for key in job_info.keys():
             if key in run_info_fields and key not in run_info:
@@ -115,6 +120,41 @@ def remove_paddles_jobs(run_name):
         report.try_delete_jobs(run_name, job_ids)
 
 
+def remove_beanstalk_jobs(run_name, tube_name):
+    qhost = config.queue_host
+    qport = config.queue_port
+    if qhost is None or qport is None:
+        raise RuntimeError(
+            'Beanstalk queue information not found in {conf_path}'.format(
+                conf_path=config.yaml_path))
+    log.info("Checking Beanstalk Queue...")
+    beanstalk_conn = beanstalk.connect()
+    real_tube_name = beanstalk.watch_tube(beanstalk_conn, tube_name)
+
+    curjobs = beanstalk_conn.stats_tube(real_tube_name)['current-jobs-ready']
+    if curjobs != 0:
+        x = 1
+        while x != curjobs:
+            x += 1
+            job = beanstalk_conn.reserve(timeout=20)
+            if job is None:
+                continue
+            job_config = yaml.safe_load(job.body)
+            if run_name == job_config['name']:
+                job_id = job_config['job_id']
+                msg = "Deleting job from queue. ID: " + \
+                    "{id} Name: {name} Desc: {desc}".format(
+                        id=str(job_id),
+                        name=job_config['name'],
+                        desc=job_config['description'],
+                    )
+                log.info(msg)
+                job.delete()
+    else:
+        print("No jobs in Beanstalk Queue")
+    beanstalk_conn.close()
+
+
 def kill_processes(run_name, pids=None):
     if pids:
         to_kill = set(pids).intersection(psutil.pids())
diff --git a/teuthology/report.py b/teuthology/report.py
index c149632b0b..83b0cc0d51 100644
--- a/teuthology/report.py
+++ b/teuthology/report.py
@@ -737,8 +737,8 @@ def try_push_job_info(job_config, extra_info=None):
     job_id = job_config['job_id']
 
     if extra_info is not None:
-        job_info = extra_info.copy()
-        job_info.update(job_config)
+        job_info = job_config.copy()
+        job_info.update(extra_info)
     else:
         job_info = job_config
 
diff --git a/teuthology/schedule.py b/teuthology/schedule.py
index 3a2b875b2a..db12b5d63d 100644
--- a/teuthology/schedule.py
+++ b/teuthology/schedule.py
@@ -1,6 +1,7 @@
 import os
 import yaml
 
+import teuthology.beanstalk
 from teuthology.misc import get_user, merge_configs
 from teuthology import report
 
@@ -22,11 +23,6 @@ def main(args):
             if args[opt]:
                 raise ValueError(msg_fmt.format(opt=opt))
 
-    if args['--first-in-suite'] or args['--last-in-suite']:
-        report_status = False
-    else:
-        report_status = True
-
     name = args['--name']
     if not name or name.isdigit():
         raise ValueError("Please use a more descriptive value for --name")
@@ -34,13 +30,15 @@ def main(args):
     backend = args['--queue-backend']
     if args['--dry-run']:
         print('---\n' + yaml.safe_dump(job_config))
-    elif backend == 'paddles':
-        schedule_job(job_config, args['--num'], report_status)
     elif backend.startswith('@'):
         dump_job_to_file(backend.lstrip('@'), job_config, args['--num'])
+    elif backend == 'paddles':
+        paddles_schedule_job(job_config, args['--num'])
+    elif backend == 'beanstalk':
+        beanstalk_schedule_job(job_config, args['--num'])
     else:
         raise ValueError("Provided schedule backend '%s' is not supported. "
-                         "Try 'paddles' or '@path-to-a-file" % backend)
+                         "Try 'paddles', 'beanstalk' or '@path-to-a-file" % backend)
 
 
 def build_config(args):
@@ -85,9 +83,9 @@ def build_config(args):
     return job_config
 
 
-def schedule_job(job_config, num=1, report_status=True):
+def paddles_schedule_job(job_config, backend, num=1):
     """
-    Schedule a job.
+    Schedule a job with Paddles as the backend.
 
     :param job_config: The complete job dict
     :param num:      The number of times to schedule the job
@@ -98,14 +96,40 @@ def schedule_job(job_config, num=1, report_status=True):
     '''
     report.create_machine_type_queue(job_config['machine_type'])
     while num > 0:
-
         job_id = report.try_create_job(job_config, dict(status='queued'))
-        print('Job scheduled with name {name} and ID {job_id}'.format(
+        print('Job scheduled in Paddles with name {name} and ID {job_id}'.format(
             name=job_config['name'], job_id=job_id))
         job_config['job_id'] = str(job_id)
-        
+
         num -= 1
 
+
+def beanstalk_schedule_job(job_config, backend, num=1):
+    """
+    Schedule a job with Beanstalk as the backend.
+
+    :param job_config: The complete job dict
+    :param num:      The number of times to schedule the job
+    """
+    num = int(num)
+    tube = job_config.pop('tube')
+    beanstalk = teuthology.beanstalk.connect()
+    beanstalk.use(tube)
+    report.create_machine_type_queue(job_config['machine_type'])
+    while num > 0:
+        job_id = report.try_create_job(job_config, dict(status='queued'))
+        job_config['job_id'] = str(job_id)
+        job = yaml.safe_dump(job_config)
+        _ = beanstalk.put(
+            job,
+            ttr=60 * 60 * 24,
+            priority=job_config['priority'],
+        )
+        print('Job scheduled in Beanstalk with name {name} and ID {job_id}'.format(
+            name=job_config['name'], job_id=job_id))
+        num -= 1
+
+
 def dump_job_to_file(path, job_config, num=1):
     """
     Schedule a job.
@@ -133,4 +157,3 @@ def dump_job_to_file(path, job_config, num=1):
             num -= 1
     with open(count_file_path, 'w') as f:
         f.write(str(jid))
-
diff --git a/teuthology/test/test_dispatcher.py b/teuthology/test/test_dispatcher.py
index 6b0dddfe2f..9a6d0ff564 100644
--- a/teuthology/test/test_dispatcher.py
+++ b/teuthology/test/test_dispatcher.py
@@ -65,7 +65,8 @@ def test_dispatcher_main(self, m_fetch_teuthology, m_fetch_qa_suite,
             '--description': 'the_description',
             '--machine-type': 'test_queue',
             '--supervisor': False,
-            '--verbose': False
+            '--verbose': False,
+            '--queue-backend': 'paddles'
         }
 
         m = mock.MagicMock()
diff --git a/teuthology/test/test_worker.py b/teuthology/test/test_worker.py
index e3c831d7d8..141eb88ec5 100644
--- a/teuthology/test/test_worker.py
+++ b/teuthology/test/test_worker.py
@@ -1,3 +1,4 @@
+import beanstalkc
 import os
 
 from unittest.mock import patch, Mock, MagicMock
@@ -43,7 +44,8 @@ def test_does_not_need_restart(self, m_datetime, m_exists, getmtime):
     @patch("os.symlink")
     def test_symlink_success(self, m_symlink):
         worker.symlink_worker_log("path/to/worker.log", "path/to/archive")
-        m_symlink.assert_called_with("path/to/worker.log", "path/to/archive/worker.log")
+        m_symlink.assert_called_with(
+            "path/to/worker.log", "path/to/archive/worker.log")
 
     @patch("teuthology.worker.log")
     @patch("os.symlink")
@@ -135,7 +137,8 @@ def test_run_job_no_watchdog(self, m_tempfile, m_safe_dump, m_mkdir,
         m_popen.return_value = m_p
         m_t_config.results_server = False
         worker.run_job(config, "teuth/bin/path", "archive/dir", verbose=False)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
 
     @patch("teuthology.worker.report.try_push_job_info")
     @patch("teuthology.worker.symlink_worker_log")
@@ -151,7 +154,8 @@ def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push
         process = Mock()
         process.poll.return_value = "not None"
         worker.run_with_watchdog(process, config)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
         m_try_push.assert_called_with(
             dict(name=config["name"], job_id=config["job_id"]),
             dict(status='dead')
@@ -160,7 +164,8 @@ def test_run_with_watchdog_no_reporting(self, m_sleep, m_symlink_log, m_try_push
     @patch("subprocess.Popen")
     @patch("teuthology.worker.symlink_worker_log")
     @patch("time.sleep")
-    def test_run_with_watchdog_with_reporting(self, m_sleep, m_symlink_log, m_popen):
+    @patch("teuthology.worker.report.try_push_job_info")
+    def test_run_with_watchdog_with_reporting(self, m_tpji, m_sleep, m_symlink_log, m_popen):
         config = {
             "name": "the_name",
             "job_id": "1",
@@ -174,12 +179,15 @@ def test_run_with_watchdog_with_reporting(self, m_sleep, m_symlink_log, m_popen)
         m_proc.poll.return_value = "not None"
         m_popen.return_value = m_proc
         worker.run_with_watchdog(process, config)
-        m_symlink_log.assert_called_with(config["worker_log"], config["archive_path"])
+        m_symlink_log.assert_called_with(
+            config["worker_log"], config["archive_path"])
 
     @patch("os.path.isdir")
     @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.teuth_config")
     @patch("teuthology.worker.fetch_qa_suite")
     def test_prep_job(self, m_fetch_qa_suite,
+                      m_teuth_config,
                       m_fetch_teuthology, m_isdir):
         config = dict(
             name="the_name",
@@ -190,6 +198,7 @@ def test_prep_job(self, m_fetch_qa_suite,
         m_fetch_teuthology.return_value = '/teuth/path'
         m_fetch_qa_suite.return_value = '/suite/path'
         m_isdir.return_value = True
+        m_teuth_config.teuthology_path = None
         got_config, teuth_bin_path = worker.prep_job(
             config,
             log_file_path,
@@ -207,63 +216,97 @@ def test_prep_job(self, m_fetch_qa_suite,
         assert m_fetch_qa_suite.called_once_with_args(branch='master')
         assert got_config['suite_path'] == '/suite/path'
 
-    def build_fake_jobs(self, job_bodies):
+    def build_fake_jobs(self, m_connection, m_job, job_bodies):
         """
+        Given patched copies of:
+            beanstalkc.Connection
+            beanstalkc.Job
         And a list of basic job bodies, return a list of mocked Job objects
         """
+        # Make sure instantiating m_job returns a new object each time
+        m_job.side_effect = lambda **kwargs: Mock(spec=beanstalkc.Job)
         jobs = []
         job_id = 0
         for job_body in job_bodies:
             job_id += 1
-            job = {}
-            job['job_id'] = job_id
-            job['body'] = job_body
+            job = m_job(conn=m_connection, jid=job_id, body=job_body)
+            job.jid = job_id
+            job_body += '\njob_id: ' + str(job_id)
+            job.body = job_body
             jobs.append(job)
         return jobs
 
-
-    @patch("teuthology.worker.setup_log_file")
-    @patch("os.path.isdir", return_value=True)
-    @patch("teuthology.worker.fetch_teuthology")
-    @patch("teuthology.worker.fetch_qa_suite")
     @patch("teuthology.worker.run_job")
+    @patch("teuthology.worker.prep_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.worker.fetch_qa_suite")
+    @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
+    def test_main_loop(
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_prep_job, m_run_job,
+    ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'foo: bar',
+                'stop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
+        m_prep_job.return_value = (dict(), '/bin/path')
+        worker.main(self.ctx)
+        # There should be one reserve call per item in the jobs list
+        expected_reserve_calls = [
+            dict(timeout=60) for i in range(len(jobs))
+        ]
+        got_reserve_calls = [
+            call[1] for call in m_connection.reserve.call_args_list
+        ]
+        assert got_reserve_calls == expected_reserve_calls
+        for job in jobs:
+            job.bury.assert_called_once_with()
+            job.delete.assert_called_once_with()
+
     @patch("teuthology.worker.report.try_push_job_info")
-    @patch("teuthology.worker.report.get_queued_job")
-    @patch("teuthology.worker.clean_config")
+    @patch("teuthology.worker.run_job")
+    @patch("beanstalkc.Job", autospec=True)
+    @patch("teuthology.worker.fetch_qa_suite")
+    @patch("teuthology.worker.fetch_teuthology")
+    @patch("teuthology.worker.beanstalk.watch_tube")
+    @patch("teuthology.worker.beanstalk.connect")
+    @patch("os.path.isdir", return_value=True)
+    @patch("teuthology.worker.setup_log_file")
     def test_main_loop_13925(
-        self, m_setup_log_file, m_isdir,
-        m_fetch_teuthology, m_fetch_qa_suite, m_run_job,
-        m_try_push_job_info, m_get_queued_job, m_clean_config
-                       ):
+        self, m_setup_log_file, m_isdir, m_connect, m_watch_tube,
+        m_fetch_teuthology, m_fetch_qa_suite, m_job, m_run_job,
+        m_try_push_job_info,
+    ):
+        m_connection = Mock()
+        jobs = self.build_fake_jobs(
+            m_connection,
+            m_job,
+            [
+                'name: name',
+                'name: name\nstop_worker: true',
+            ],
+        )
+        m_connection.reserve.side_effect = jobs
+        m_connect.return_value = m_connection
         m_fetch_qa_suite.side_effect = [
             '/suite/path',
             MaxWhileTries(),
             MaxWhileTries(),
         ]
-        job = {
-            'job_id': '1',
-            'description': 'DESC',
-            'email': 'EMAIL',
-            'first_in_suite': False,
-            'last_in_suite': True,
-            'machine_type': 'test_queue',
-            'name': 'NAME',
-            'owner': 'OWNER',
-            'priority': 99,
-            'results_timeout': '6',
-            'verbose': False,
-            'stop_worker': True
-        }
-        m_get_queued_job.return_value = job
-        m_clean_config.return_value = job
-
-        mock_prep_job_patcher = patch('teuthology.worker.prep_job')
-        mock_prep_job = mock_prep_job_patcher.start()
-        mock_prep_job.return_value = (dict(), '/teuth/bin/path')
-
         worker.main(self.ctx)
-        mock_prep_job_patcher.stop()
-        assert len(m_run_job.call_args_list) == 1
-        assert len(m_try_push_job_info.call_args_list) == 1
-        assert m_try_push_job_info.called_once_with(job, dict(status='running'))
-
+        assert len(m_run_job.call_args_list) == 0
+        assert len(m_try_push_job_info.call_args_list) == len(jobs)
+        for i in range(len(jobs)):
+            push_call = m_try_push_job_info.call_args_list[i]
+            assert push_call[0][1]['status'] == 'dead'
diff --git a/teuthology/worker.py b/teuthology/worker.py
index ff68acf1c3..1bb7bbff55 100644
--- a/teuthology/worker.py
+++ b/teuthology/worker.py
@@ -9,6 +9,7 @@
 from datetime import datetime
 
 from teuthology import setup_log_file, install_except_hook
+from teuthology import beanstalk
 from teuthology import report
 from teuthology import safepath
 from teuthology.config import config as teuth_config
@@ -57,14 +58,6 @@ def load_config(ctx=None):
             teuth_config.archive_base = ctx.archive_dir
 
 
-def clean_config(config):
-    result = {}
-    for key in config:
-        if config[key] is not None:
-            result[key] = config[key]
-    return result
-
-
 def main(ctx):
     loglevel = logging.INFO
     if ctx.verbose:
@@ -81,6 +74,8 @@ def main(ctx):
 
     set_config_attr(ctx)
 
+    connection = beanstalk.connect()
+    beanstalk.watch_tube(connection, ctx.tube)
     result_proc = None
 
     if teuth_config.teuthology_path is None:
@@ -103,16 +98,16 @@ def main(ctx):
 
         load_config()
 
-        job = report.get_queued_job(ctx.machine_type)
+        job = connection.reserve(timeout=60)
         if job is None:
             continue
 
-        job = clean_config(job)
-        report.try_push_job_info(job, dict(status='running'))
-        job_id = job.get('job_id')
+        # bury the job so it won't be re-run if it fails
+        job.bury()
+        job_config = yaml.safe_load(job.body)
+        job_id = job_config.get('job_id')
         log.info('Reserved job %s', job_id)
-        log.info('Config is: %s', job)
-        job_config = job
+        log.info('Config is: %s', job.body)
 
         if job_config.get('stop_worker'):
             keep_running = False
@@ -132,6 +127,13 @@ def main(ctx):
         except SkipJob:
             continue
 
+        # This try/except block is to keep the worker from dying when
+        # beanstalkc throws a SocketError
+        try:
+            job.delete()
+        except Exception:
+            log.exception("Saw exception while trying to delete job")
+
 
 def prep_job(job_config, log_file_path, archive_dir):
     job_id = job_config['job_id']