diff --git a/main.sh b/main.sh index fee27bc..2e5b38c 100755 --- a/main.sh +++ b/main.sh @@ -4,7 +4,9 @@ if [ "$HOSTNAME" == "" ]; then HOSTNAME="$(hostname -f)" fi +luigi KillTask --module poltergust.tasks --scheduler-url="${SCHEDULER_URL}" &>/dev/null & + while true; do - luigi RunTasks --module poltergust --hostname="${HOSTNAME}" --path="${PIPELINE_URL}" --scheduler-url="${SCHEDULER_URL}" + luigi RunTasks --module poltergust.tasks --hostname="${HOSTNAME}" --path="${PIPELINE_URL}" --scheduler-url="${SCHEDULER_URL}" sleep 1 done diff --git a/poltergust/__init__.py b/poltergust/__init__.py index 79116e3..139597f 100644 --- a/poltergust/__init__.py +++ b/poltergust/__init__.py @@ -1,290 +1,2 @@ -import os -import re -import os.path -import yaml -from pathlib import Path -import sys -import traceback -import zipfile -import luigi -import luigi.contrib.opener -import luigi.local_target -import pieshell -import traceback -import math -import time -import requests -import datetime -import poltergust_luigi_utils # Add GCS luigi opener -import poltergust_luigi_utils.logging_task -import poltergust_luigi_utils.gcs_opener -import fnmatch -DB_URL = os.environ.get("DB_URL") -ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") -DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) -UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) -TAG = r"{{POLTERGUST_PIP}}" -def strnow(): - return datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S.%f") - -def make_environment(envpath, environment, log): - _ = pieshell.env(exports=dict(pieshell.env._exports)) - if not os.path.exists(envpath): - envdir = os.path.dirname(envpath) - if not os.path.exists(envdir): - os.makedirs(envdir) - +_.virtualenv(envpath, **environment.get("virtualenv", {})) - +_.bashsource(envpath + "/bin/activate") - for dep in environment["dependencies"]: - if TAG in dep: - dep = re.sub(TAG, _._exports["GITHUB_TOKEN_EMRLD"], dep) - for line in _.pip.install(dep): - log(line) - - -def zip_environment(envpath, log): - """ - Create a zip compression level 6 archive of a python virtualenv - Note that zipping virtualenvs is not recommended according to PEP405: - "Not considered as movable or copyable – you just recreate the - same environment in the target location" - - However, this will work if the two execution environments are "exactly" - the same - - @Args: - envpath: local path to the virtualenv directory - @Returns: - archive: name of the zipped archive - """ - archive = envpath + ".zip" - envdir = Path(envpath) - with zipfile.ZipFile(archive, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=6) as f: - for file in envdir.rglob('*'): - f.write(file, arcname=file.relative_to(envdir)) - log("Zipped: {}".format(archive)) - return archive - - -def download_environment(envpath, path, log): - """ - Download a python virtualenv and extract it - @Args - envpath: local path to the virtualenv directory - path: path to downloadable virtualenv (GCS/local disk) - log: luigi logger - """ - if not os.path.exists(envpath): - envdir = os.path.dirname(envpath) - if not os.path.exists(envdir): - os.makedirs(envdir) - - fs = luigi.contrib.opener.OpenerTarget(path).fs - with fs.download(path) as z: - with zipfile.ZipFile(z, mode="r") as arc: - arc.extractall(envpath) - -class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter() - retry_on_error = luigi.Parameter(default=False) - - def run(self): - with self.logging(self.retry_on_error): - zip_path = str(self.path) + ".zip" - if DOWNLOAD_ENVIRONMENT: - if luigi.contrib.opener.OpenerTarget(zip_path).exists(): - download_environment(self.envdir().path, zip_path, self.log) - else: - with luigi.contrib.opener.OpenerTarget(self.path).open("r") as f: - environment = yaml.load(f, Loader=yaml.SafeLoader) - print(environment) - make_environment(self.envdir().path, environment, self.log) - with self.output().open("w") as f: - f.write("DONE") - if UPLOAD_ENVIRONMENT: - archive = zip_environment(self.envdir().path, self.log) - fs = luigi.contrib.opener.OpenerTarget(self.path).fs - fs.put(archive, zip_path) - - - def envdir(self): - return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) - - def logfile(self): - return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) - - def output(self): - return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) - -def get_scheduler_url(task): - return task.set_progress_percentage.__self__._scheduler._url - -class RunTask(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter(significant=False, visibility=luigi.parameter.ParameterVisibility.HIDDEN) - retry_on_error = luigi.Parameter(default=False) - - @property - def scheduler(self): - return self.set_progress_percentage.__self__._scheduler - - @property - def scheduler_url(self): - return get_scheduler_url(self) - - def run(self): - with self.logging(self.retry_on_error): - self.log('RunTask start') - - src = '%s.config.yaml' % (self.path,) - fs = self.output().fs - - try: - cfg = luigi.contrib.opener.OpenerTarget('%s.config.yaml' % (self.path,)) - try: - with cfg.open("r") as f: - task = yaml.load(f, Loader=yaml.SafeLoader) - except: - if not cfg.fs.exists(cfg.path): - # The task has already been marked as done by another worker. - return - raise - self.log('RunTask config loaded') - - with luigi.contrib.opener.OpenerTarget(task["environment"]).open("r") as f: - environment = yaml.load(f, Loader=yaml.SafeLoader) - - env = MakeEnvironment(path=task["environment"], hostname=self.hostname, retry_on_error=self.retry_on_error) - if not env.output().exists(): - yield env - envpath = env.envdir().path - self.log('RunTask environment made') - - _ = pieshell.env(envpath, interactive=True) - +_.bashsource(envpath + "/bin/activate") - - _._exports.update(environment.get("variables", {})) - _._exports.update(task.get("variables", {})) - - self.log('RunTask loaded environment') - - command = task.get("command", None) - - task_args = dict(task.get("task", {})) - task_name = task_args.pop("name", None) - task_args["scheduler-url"] = self.scheduler_url - task_args["retcode-already-running"] = "10" - task_args["retcode-missing-data"] = "20" - task_args["retcode-not-run"] = "25" - task_args["retcode-task-failed"] = "30" - task_args["retcode-scheduling-error"] = "35" - task_args["retcode-unhandled-exception"] = "40" - - if command is None: - command = "luigi(task_name, **task_args)" - - scope = pieshell.environ.EnvScope(env=_) - scope["task_name"] = task_name - scope["task_args"] = task_args - - self.log('RunTask starting actual task' + strnow()) - - # Rerun the task until it is actually being run (or is done!) - # We need to do this, since luigi might exit because all - # dependencies of a task are already being run by other - # workers, and our task can't even start... - while True: - try: - for line in eval(command, scope): - self.log(line) - except pieshell.PipelineFailed as e: - self.log(str(e)) - if e.pipeline.exit_code <= 25: - time.sleep(5) - continue - raise - break - - self.log('RunTask end') - dst = '%s.done.yaml' % (self.path,) - - except Exception as e: - dst = '%s.error.yaml' % (self.path,) - self.log("Task failed: %s" % e) - self.log(traceback.format_exc()) - - src = '%s.config.yaml' % (self.path,) - fs = self.output().fs - try: - fs.move(src, dst) - except: - # Might already have been moved by another node... - pass - - if DB_URL is not None: - r = requests.get(DB_URL, params={"pipeline_url": self.path}) - if r.status_code != 200: - print("Unable to update status", r.status_code, r.text) - - def logfile(self): - return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) - - def output(self): - return luigi.contrib.opener.OpenerTarget( - '%s.done.yaml' % (self.path,)) - -def list_wildcard(fs, path): - if hasattr(fs, "list_wildcard"): - return fs.list_wildcard(path) - else: - dirpath, pattern = path.rsplit("/", 1) - return fnmatch.filter(fs.listdir(dirpath), path) - -class RunTasks(luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter() - - @property - def scheduler_url(self): - return get_scheduler_url(self) - - def run(self): - while True: - tasks = [RunTask(path=path.replace(".config.yaml", ""), hostname=self.hostname) - for path in list_wildcard(self.output().fs, '%s/*.config.yaml' % (self.path,))] - print("=============================================") - print(tasks) - for task in tasks: - res = luigi.build([task], scheduler_url=self.scheduler_url, detailed_summary=True) - print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - print(res.summary_text) - print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - time.sleep(1) - - def output(self): - # This target should never actually be created... - return luigi.contrib.opener.OpenerTarget('%s/done' % (self.path,)) - - -# luigi --module emerald_algorithms_evaluation.luigi Pipeline --param-evaluation_name=test-luigi-redhog-1 - -class TestTask(luigi.Task): - name = luigi.Parameter() - time = luigi.Parameter() - - def run(self): - t = int(self.time) - for x in range(t): - self.set_progress_percentage(100 * x / t) - time.sleep(1) - - with self.output().open("w") as f: - f.write("DONE") - - def output(self): - return luigi.contrib.opener.OpenerTarget(self.name) diff --git a/poltergust/lib.py b/poltergust/lib.py new file mode 100644 index 0000000..e177c61 --- /dev/null +++ b/poltergust/lib.py @@ -0,0 +1,149 @@ +import os +import re +import os.path +from pathlib import Path +import zipfile +import luigi +import luigi.contrib.opener +import luigi.local_target +import pieshell +import datetime +import psutil +import signal +import fnmatch + + +class Config: + DB_URL = os.environ.get("DB_URL") + ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") + DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) + UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) + PIPELINE_URL = os.environ.get("PIPELINE_URL", "") + TAG = r"{{POLTERGUST_PIP}}" + + +def strnow(): + return datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S.%f") + + +def makedirs_recursive(envpath): + if not os.path.exists(envpath): + envdir = os.path.dirname(envpath) + if not os.path.exists(envdir): + os.makedirs(envdir) + return envpath + + +def list_wildcard(fs, path): + if hasattr(fs, "list_wildcard"): + return fs.list_wildcard(path) + else: + dirpath, pattern = path.rsplit("/", 1) + return fnmatch.filter(fs.listdir(dirpath), path) + + +def make_environment(envpath, environment, log): + _ = pieshell.env(exports=dict(pieshell.env._exports)) + # if not os.path.exists(envpath): + # envdir = os.path.dirname(envpath) + # if not os.path.exists(envdir): + # os.makedirs(envdir) + makedirs_recursive(envpath) + +_.virtualenv(envpath, **environment.get("virtualenv", {})) + +_.bashsource(envpath + "/bin/activate") + for dep in environment["dependencies"]: + if TAG in dep: + dep = re.sub(TAG, _._exports["GITHUB_TOKEN_EMRLD"], dep) + for line in _.pip.install(dep): + log(line) + + +def zip_environment(envpath, log): + """ + Create a zip compression level 6 archive of a python virtualenv + Note that zipping virtualenvs is not recommended according to PEP405: + "Not considered as movable or copyable – you just recreate the + same environment in the target location" + + However, this will work if the two execution environments are "exactly" + the same + + @Args: + envpath: local path to the virtualenv directory + @Returns: + archive: name of the zipped archive + """ + archive = envpath + ".zip" + envdir = Path(envpath) + with zipfile.ZipFile(archive, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=6) as f: + for file in envdir.rglob('*'): + f.write(file, arcname=file.relative_to(envdir)) + log("Zipped: {}".format(archive)) + return archive + + +def download_environment(envpath, path, log): + """ + Download a python virtualenv and extract it + @Args + envpath: local path to the virtualenv directory + path: path to downloadable virtualenv (GCS/local disk) + log: luigi logger + """ + makedirs_recursive(envpath) + fs = luigi.contrib.opener.OpenerTarget(path).fs + with fs.download(path) as z: + with zipfile.ZipFile(z, mode="r") as arc: + arc.extractall(envpath) + + +def kill_proc_tree(pid, sig=signal.SIGTERM, timeout=5): + parent = psutil.Process(pid) + children = parent.children(recursive=True) + # children.append(parent) + + parent.terminate() + try: + parent.wait(timeout) + except psutil.TimeoutExpired: + parent.kill() + + for proc in children: + try: + proc.send_signal(sig) + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(children, timeout=2) + for p in alive: + p.kill() + + return gone + + +def rename_file(src, dst): + fs = luigi.contrib.opener.OpenerTarget(src).fs + if fs.exists(src): + try: + fs.move(src, dst) + except: + pass + finally: + return f"{src} moved to {dst}" + else: + return f"not exists!: {src}" + + +def remove_config_from_pipeline(cfg, dst=None, gcs=True): + cfg_src = cfg + ".config.yaml" + cfg_dst = cfg + ".done.yaml" + + if not Config.PIPELINE_URL.startswith("gs://"): + return f"GCS PIPELINE_URL not found!, {Config.PIPELINE_URL}" + + full_cfg_path = Config.PIPELINE_URL + "/" + cfg_src + full_dst_path = Config.PIPELINE_URL + "/" + cfg_dst + ret = rename_file(full_cfg_path, full_dst_path) + return ret + # ret = rename_file(cfg, dst) + # return ret + diff --git a/poltergust/tasks.py b/poltergust/tasks.py new file mode 100644 index 0000000..ae59bcd --- /dev/null +++ b/poltergust/tasks.py @@ -0,0 +1,267 @@ +import os +import os.path +import yaml +import traceback +import luigi +import luigi.contrib.opener +import luigi.local_target +import pieshell +import traceback +import time +import requests +import poltergust_luigi_utils # Add GCS luigi opener +import poltergust_luigi_utils.logging_task +import poltergust_luigi_utils.gcs_opener +import signal +# from multiprocessing import Queue +import queue as Queue +from multiprocessing import Event + +from .lib import Config +from .lib import ( + strnow, + make_environment, + download_environment, + kill_proc_tree, + zip_environment, + remove_config_from_pipeline, + list_wildcard, +) + + +class KillTask(luigi.Task): + accepts_messages = True + shutdown_event = Event() + + def run(self): + while not self.shutdown_event.is_set(): + try: + msg = self.scheduler_messages.get(block=True, timeout=0.05) + content = msg.content + if content == "END": + break + if content.startswith("kill"): + argv = content.split(",") + pid = int(argv[0].split("=")[1]) + if pid == os.getpid(): + msg.respond("Not me!") + continue + + if len(argv) == 2: + cfg = argv[1].split("=")[1] + status = remove_config_from_pipeline(cfg) + print(status) + + gone = kill_proc_tree(int(pid), sig=signal.SIGTERM) + print(f"[X] {gone}") + msg.respond(f"Killed: {pid}") + else: + msg.respond("Unknown message!") + except Queue.Empty: + continue + + def output(self): + return luigi.local_target.LocalTarget('killed_tasks.pid') + + +class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter() + retry_on_error = luigi.Parameter(default=False) + + def run(self): + with self.logging(self.retry_on_error): + zip_path = str(self.path) + ".zip" + if Config.DOWNLOAD_ENVIRONMENT: + if luigi.contrib.opener.OpenerTarget(zip_path).exists(): + download_environment(self.envdir().path, zip_path, self.log) + else: + with luigi.contrib.opener.OpenerTarget(self.path).open("r") as f: + environment = yaml.load(f, Loader=yaml.SafeLoader) + print(environment) + make_environment(self.envdir().path, environment, self.log) + with self.output().open("w") as f: + f.write("DONE") + if Config.UPLOAD_ENVIRONMENT: + archive = zip_environment(self.envdir().path, self.log) + fs = luigi.contrib.opener.OpenerTarget(self.path).fs + fs.put(archive, zip_path) + + + def envdir(self): + return luigi.local_target.LocalTarget( + os.path.join(Config.ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) + + def logfile(self): + return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) + + def output(self): + return luigi.local_target.LocalTarget( + os.path.join(Config.ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) + +def get_scheduler_url(task): + return task.set_progress_percentage.__self__._scheduler._url + +class RunTask(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter(significant=False, visibility=luigi.parameter.ParameterVisibility.HIDDEN) + retry_on_error = luigi.Parameter(default=False) + + @property + def scheduler(self): + return self.set_progress_percentage.__self__._scheduler + + @property + def scheduler_url(self): + return get_scheduler_url(self) + + def run(self): + with self.logging(self.retry_on_error): + self.log('RunTask start') + + src = '%s.config.yaml' % (self.path,) + fs = self.output().fs + + try: + cfg = luigi.contrib.opener.OpenerTarget('%s.config.yaml' % (self.path,)) + try: + with cfg.open("r") as f: + task = yaml.load(f, Loader=yaml.SafeLoader) + except: + if not cfg.fs.exists(cfg.path): + # The task has already been marked as done by another worker. + return + raise + self.log('RunTask config loaded') + + with luigi.contrib.opener.OpenerTarget(task["environment"]).open("r") as f: + environment = yaml.load(f, Loader=yaml.SafeLoader) + + env = MakeEnvironment(path=task["environment"], hostname=self.hostname, retry_on_error=self.retry_on_error) + if not env.output().exists(): + yield env + envpath = env.envdir().path + self.log('RunTask environment made') + + _ = pieshell.env(envpath, interactive=True) + +_.bashsource(envpath + "/bin/activate") + + _._exports.update(environment.get("variables", {})) + _._exports.update(task.get("variables", {})) + + self.log('RunTask loaded environment') + + command = task.get("command", None) + + task_args = dict(task.get("task", {})) + task_name = task_args.pop("name", None) + task_args["scheduler-url"] = self.scheduler_url + task_args["retcode-already-running"] = "10" + task_args["retcode-missing-data"] = "20" + task_args["retcode-not-run"] = "25" + task_args["retcode-task-failed"] = "30" + task_args["retcode-scheduling-error"] = "35" + task_args["retcode-unhandled-exception"] = "40" + + if command is None: + command = "luigi(task_name, **task_args)" + + scope = pieshell.environ.EnvScope(env=_) + scope["task_name"] = task_name + scope["task_args"] = task_args + + self.log('RunTask starting actual task' + strnow()) + + # Rerun the task until it is actually being run (or is done!) + # We need to do this, since luigi might exit because all + # dependencies of a task are already being run by other + # workers, and our task can't even start... + while True: + try: + for line in eval(command, scope): + self.log(line) + except pieshell.PipelineFailed as e: + self.log(str(e)) + if e.pipeline.exit_code <= 25: + time.sleep(5) + continue + raise + break + + self.log('RunTask end') + dst = '%s.done.yaml' % (self.path,) + + except Exception as e: + dst = '%s.error.yaml' % (self.path,) + self.log("Task failed: %s" % e) + self.log(traceback.format_exc()) + + src = '%s.config.yaml' % (self.path,) + fs = self.output().fs + try: + fs.move(src, dst) + except: + # Might already have been moved by another node... + pass + + if Config.DB_URL is not None: + r = requests.get(Config.DB_URL, params={"pipeline_url": self.path}) + if r.status_code != 200: + print("Unable to update status", r.status_code, r.text) + + def logfile(self): + return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) + + def output(self): + return luigi.contrib.opener.OpenerTarget( + '%s.done.yaml' % (self.path,)) + + + +class RunTasks(luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter() + + @property + def scheduler_url(self): + return get_scheduler_url(self) + + def run(self): + while True: + tasks = [RunTask(path=path.replace(".config.yaml", ""), hostname=self.hostname) + for path in list_wildcard(self.output().fs, '%s/*.config.yaml' % (self.path,))] + print("=============================================") + print(tasks) + for task in tasks: + res = luigi.build([task], scheduler_url=self.scheduler_url, detailed_summary=True) + print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + print(res.summary_text) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + time.sleep(1) + + def output(self): + # This target should never actually be created... + return luigi.contrib.opener.OpenerTarget('%s/done' % (self.path,)) + + + +class TestTask(luigi.Task): + name = luigi.Parameter() + time = luigi.Parameter() + + def run(self): + t = int(self.time) + for x in range(t): + self.set_progress_percentage(100 * x / t) + time.sleep(1) + print(x) + + with self.output().open("w") as f: + f.write("DONE") + + def output(self): + return luigi.contrib.opener.OpenerTarget(self.name) + + + +# luigi --module emerald_algorithms_evaluation.luigi Pipeline --param-evaluation_name=test-luigi-redhog-1