diff --git a/amqp_messager.py b/amqp_messager.py index 18de360..1cf0b24 100644 --- a/amqp_messager.py +++ b/amqp_messager.py @@ -7,13 +7,14 @@ class AMQPMessager(MessagingHandler): def __init__(self, server, receiver_queues, sender_queue, task_queue, - result_queue): + result_queue, preparation_queue): super(AMQPMessager, self).__init__() self.server = server self.receiver_queues = receiver_queues self.sender_queue = sender_queue self.tasks = task_queue self.results = result_queue + self.preparations = preparation_queue def on_start(self, event): conn = event.container.connect(self.server) @@ -28,8 +29,10 @@ def on_start(self, event): def on_message(self, event): # ToDO: ignore duplicate message - # print(event.message.body) - self.tasks.put(event.message.body) + if event.message.address != "preparations": + self.tasks.put(event.message.body) + elif event.message.address == "preparations": + self.preparations.put(event.message.body) def on_result(self, event): # check if we are finished diff --git a/backend.py b/backend.py index 7f86e51..3579851 100644 --- a/backend.py +++ b/backend.py @@ -8,6 +8,7 @@ import re import json import tempfile +from xml.etree.ElementPath import prepare_star import docker import requests import time @@ -34,6 +35,7 @@ def __init__(self, config_file): print("Using env AMQPServer %s"%os.getenv('AMQPServer')) self.tasks = multiprocessing.Queue(3) self.results = multiprocessing.Queue() + self.preparations = multiprocessing.Queue() self.running_computations = {} self.client = docker.from_env() # ToDO: store errors and send them within result-message back @@ -46,7 +48,8 @@ def __init__(self, config_file): self.config.getlist("AMQP", "computationqueues"), self.config["AMQP"]["resultqueue"], self.tasks, - self.results)) + self.results, + self.preparations)) self.messager_process = multiprocessing.Process(target=messager.run) self.messager_process.start() @@ -103,6 +106,20 @@ def main(self): comp2trash.append(key) for key in comp2trash: del self.running_computations[key] + + # prepare containers if available + # TODO also prepare matlab + try: + prepare_task = self.preparations.get(block=True, timeout=1) + except Empty: + pass + else: + print("--- Got preparation task. ---") + json_prepare_task = json.loads(prepare_task) + tmp_dir, files = self._prepare_all_environments(json_prepare_task) + if json_prepare_task["environment"] == "Container": + comp_conf = ConfigurationContainerSchema().load(json_prepare_task["configuration"]) + self.load_image(comp_conf, tmp_dir.name) def _prepare_all_environments(self, computation): # create tmp-dir for this computation and store files there @@ -137,7 +154,33 @@ def _prepare_container_backend(self, computation, tmp_dir): # ToDO: create in-between status messages for frontend comp_conf = ConfigurationContainerSchema().load( computation["configuration"]) + # load image + image_filename, image_id = self.load_image(comp_conf, tmp_dir) + + # create container + if comp_conf["volume"] is not None: + print("Creating volume ...") + volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex}) + + print("Creating container ...") + container = self.client.containers.create( + image_id, + command=comp_conf["command_line_arguments"], + auto_remove=False, + cpu_quota=100000*comp_conf["num_cpus"], + detach=True, + entrypoint=comp_conf["entrypoint"], + mem_limit=comp_conf["memory"], + mounts=[Mount(comp_conf["volume"],volume.id)] \ + if comp_conf["volume"] is not None else None) + print("... Done.") + if comp_conf["volume"] is not None: + return container, image_filename, volume + else: + return container, image_filename, None + + def load_image(self, comp_conf, tmp_dir): image_filename = None image_uri = comp_conf["image"] if image_uri.startswith("file"): @@ -175,27 +218,7 @@ def _prepare_container_backend(self, computation, tmp_dir): with open(os.path.join(tmp_dir, image_filename), 'rb') as bf: image_id = self.client.images.load(bf)[0].id - # create container - if comp_conf["volume"] is not None: - print("Creating volume ...") - volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex}) - - print("Creating container ...") - container = self.client.containers.create( - image_id, - command=comp_conf["command_line_arguments"], - auto_remove=False, - cpu_quota=100000*comp_conf["num_cpus"], - detach=True, - entrypoint=comp_conf["entrypoint"], - mem_limit=comp_conf["memory"], - mounts=[Mount(comp_conf["volume"],volume.id)] \ - if comp_conf["volume"] is not None else None) - print("... Done.") - if comp_conf["volume"] is not None: - return container, image_filename, volume - else: - return container, image_filename, None + return image_filename, image_id def copy_to_container(self, ip_add, basepath, files): print(basepath, files) @@ -210,7 +233,6 @@ def add_mimetypes(self): mimetypes.add_type("application/x-vgf", ".vgf") mimetypes.add_type("application/x-vgf3", ".vgf3") mimetypes.add_type("application/x-vgfc", ".vgfc") - class ResultStreamer(Thread): def __init__(self, stream, tmp_dir, files, result_queue, computation_id, sidekick): diff --git a/config.sample.ini b/config.sample.ini index a6c65d9..257f7e3 100644 --- a/config.sample.ini +++ b/config.sample.ini @@ -5,5 +5,5 @@ KeepContainer = no # yes for debugging [AMQP] Server = localhost:5672 -ComputationQueues = ["computations", "computations/container"] +ComputationQueues = ["computations", "computations/container", "preparations"] ResultQueue = results \ No newline at end of file