From 64c0192802723890ffa06bd10552eca7603b2ef4 Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Wed, 24 Apr 2024 11:38:19 +0200 Subject: [PATCH 1/7] Add a KillTask --- poltergust/__init__.py | 55 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/poltergust/__init__.py b/poltergust/__init__.py index 79116e3..1884f6c 100644 --- a/poltergust/__init__.py +++ b/poltergust/__init__.py @@ -19,6 +19,7 @@ import poltergust_luigi_utils.logging_task import poltergust_luigi_utils.gcs_opener import fnmatch +import psutil DB_URL = os.environ.get("DB_URL") ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") @@ -86,6 +87,48 @@ def download_environment(envpath, path, log): with zipfile.ZipFile(z, mode="r") as arc: arc.extractall(envpath) + +def kill_task(pid): + try: + parent = psutil.Process(pid) + children = parent.children(recursive=True) + + for child in children: + try: + child.kill() + except psutil.NoSuchProcess: + continue + except psutil.NoSuchProcess: + return f"No process found: {pid}" + + return f"Process: {pid} killed!" + + + +class KillTask(luigi.Task): + accepts_messages = True + # scheduler_url = "http://scheduler:8082" + + def run(self): + while True: + if not self.scheduler_messages.empty(): + msg = self.scheduler_messages.get() + content = msg.content + if content.startswith("terminate"): + pid = content.split(":")[-1] + ret = kill_task(int(pid)) + msg.respond(ret) + else: + msg.respond("unknown message") + time.sleep(1) + + def output(self): + return luigi.local_target.LocalTarget('killed_tasks.pid') + + + + + class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): path = luigi.Parameter() hostname = luigi.Parameter() @@ -257,13 +300,13 @@ def run(self): while True: tasks = [RunTask(path=path.replace(".config.yaml", ""), hostname=self.hostname) for path in list_wildcard(self.output().fs, '%s/*.config.yaml' % (self.path,))] - print("=============================================") - print(tasks) + #print("=============================================") + #print(tasks) for task in tasks: res = luigi.build([task], scheduler_url=self.scheduler_url, detailed_summary=True) - print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - print(res.summary_text) - print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + #print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + #print(res.summary_text) + #print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") time.sleep(1) def output(self): @@ -282,9 +325,11 @@ def run(self): for x in range(t): self.set_progress_percentage(100 * x / t) time.sleep(1) + print("¯\_( ͡° ͜ʖ ͡°)_/¯\n") with self.output().open("w") as f: f.write("DONE") def output(self): return luigi.contrib.opener.OpenerTarget(self.name) + From 98da90ec59aa9f94cb798520cb97f37d069a37f1 Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Wed, 24 Apr 2024 11:38:47 +0200 Subject: [PATCH 2/7] run KillTask --- main.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/main.sh b/main.sh index fee27bc..3a9147f 100755 --- a/main.sh +++ b/main.sh @@ -4,6 +4,12 @@ if [ "$HOSTNAME" == "" ]; then HOSTNAME="$(hostname -f)" fi +#PIPELINE_URL="gs://experimental-pipeline-inversion-ali-v1/pipeline" +#SCHEDULER_URL="http://localhost:8082" + +luigi KillTask --module poltergust --scheduler-url="${SCHEDULER_URL}" & +##>/dev/null & + while true; do luigi RunTasks --module poltergust --hostname="${HOSTNAME}" --path="${PIPELINE_URL}" --scheduler-url="${SCHEDULER_URL}" sleep 1 From f96ce49b18b7bec4bbf6ba054f63f9ab3b55e579 Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Wed, 24 Apr 2024 20:57:34 +0200 Subject: [PATCH 3/7] kill whole process tree --- poltergust/__init__.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/poltergust/__init__.py b/poltergust/__init__.py index 1884f6c..6b03788 100644 --- a/poltergust/__init__.py +++ b/poltergust/__init__.py @@ -20,6 +20,7 @@ import poltergust_luigi_utils.gcs_opener import fnmatch import psutil +import signal DB_URL = os.environ.get("DB_URL") ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") @@ -88,26 +89,25 @@ def download_environment(envpath, path, log): arc.extractall(envpath) -def kill_task(pid): - try: - parent = psutil.Process(pid) - children = parent.children(recursive=True) - - for child in children: - try: - child.kill() - except psutil.NoSuchProcess: - continue - except psutil.NoSuchProcess: - return f"No process found: {pid}" - - return f"Process: {pid} killed!" +def kill_proc_tree(pid, sig=signal.SIGTERM): + # assert pid != os.getpid(), "Not me!" + parent = psutil.Process(pid) + children = parent.children(recursive=True) + children.append(parent) + for proc in children: + try: + proc.send_signal(sig) + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(children, timeout=5) + for p in alive: + p.kill() class KillTask(luigi.Task): accepts_messages = True - # scheduler_url = "http://scheduler:8082" + #priority = 10 def run(self): while True: @@ -116,16 +116,15 @@ def run(self): content = msg.content if content.startswith("terminate"): pid = content.split(":")[-1] - ret = kill_task(int(pid)) - msg.respond(ret) + assert pid != os.getpid(), msg.respond("Not me!") + kill_proc_tree(int(pid), sig=signal.SIGTERM) + msg.respond(f"Killed: {pid}") else: - msg.respond("unknown message") + msg.respond("Unknown message!") time.sleep(1) def output(self): return luigi.local_target.LocalTarget('killed_tasks.pid') - - @@ -316,6 +315,7 @@ def output(self): # luigi --module emerald_algorithms_evaluation.luigi Pipeline --param-evaluation_name=test-luigi-redhog-1 +import random class TestTask(luigi.Task): name = luigi.Parameter() time = luigi.Parameter() From bb17e54aa821b0087f2d85200d720aab3588adf0 Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Thu, 25 Apr 2024 12:26:57 +0200 Subject: [PATCH 4/7] refactor --- poltergust/__init__.py | 333 ----------------------------------------- poltergust/lib.py | 124 +++++++++++++++ poltergust/tasks.py | 256 +++++++++++++++++++++++++++++++ 3 files changed, 380 insertions(+), 333 deletions(-) create mode 100644 poltergust/lib.py create mode 100644 poltergust/tasks.py diff --git a/poltergust/__init__.py b/poltergust/__init__.py index 6b03788..139597f 100644 --- a/poltergust/__init__.py +++ b/poltergust/__init__.py @@ -1,335 +1,2 @@ -import os -import re -import os.path -import yaml -from pathlib import Path -import sys -import traceback -import zipfile -import luigi -import luigi.contrib.opener -import luigi.local_target -import pieshell -import traceback -import math -import time -import requests -import datetime -import poltergust_luigi_utils # Add GCS luigi opener -import poltergust_luigi_utils.logging_task -import poltergust_luigi_utils.gcs_opener -import fnmatch -import psutil -import signal -DB_URL = os.environ.get("DB_URL") -ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") -DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) -UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) -TAG = r"{{POLTERGUST_PIP}}" - -def strnow(): - return datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S.%f") - -def make_environment(envpath, environment, log): - _ = pieshell.env(exports=dict(pieshell.env._exports)) - if not os.path.exists(envpath): - envdir = os.path.dirname(envpath) - if not os.path.exists(envdir): - os.makedirs(envdir) - +_.virtualenv(envpath, **environment.get("virtualenv", {})) - +_.bashsource(envpath + "/bin/activate") - for dep in environment["dependencies"]: - if TAG in dep: - dep = re.sub(TAG, _._exports["GITHUB_TOKEN_EMRLD"], dep) - for line in _.pip.install(dep): - log(line) - - -def zip_environment(envpath, log): - """ - Create a zip compression level 6 archive of a python virtualenv - Note that zipping virtualenvs is not recommended according to PEP405: - "Not considered as movable or copyable – you just recreate the - same environment in the target location" - - However, this will work if the two execution environments are "exactly" - the same - - @Args: - envpath: local path to the virtualenv directory - @Returns: - archive: name of the zipped archive - """ - archive = envpath + ".zip" - envdir = Path(envpath) - with zipfile.ZipFile(archive, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=6) as f: - for file in envdir.rglob('*'): - f.write(file, arcname=file.relative_to(envdir)) - log("Zipped: {}".format(archive)) - return archive - - -def download_environment(envpath, path, log): - """ - Download a python virtualenv and extract it - @Args - envpath: local path to the virtualenv directory - path: path to downloadable virtualenv (GCS/local disk) - log: luigi logger - """ - if not os.path.exists(envpath): - envdir = os.path.dirname(envpath) - if not os.path.exists(envdir): - os.makedirs(envdir) - - fs = luigi.contrib.opener.OpenerTarget(path).fs - with fs.download(path) as z: - with zipfile.ZipFile(z, mode="r") as arc: - arc.extractall(envpath) - - -def kill_proc_tree(pid, sig=signal.SIGTERM): - # assert pid != os.getpid(), "Not me!" - parent = psutil.Process(pid) - children = parent.children(recursive=True) - children.append(parent) - - for proc in children: - try: - proc.send_signal(sig) - except psutil.NoSuchProcess: - pass - gone, alive = psutil.wait_procs(children, timeout=5) - for p in alive: - p.kill() - - -class KillTask(luigi.Task): - accepts_messages = True - #priority = 10 - - def run(self): - while True: - if not self.scheduler_messages.empty(): - msg = self.scheduler_messages.get() - content = msg.content - if content.startswith("terminate"): - pid = content.split(":")[-1] - assert pid != os.getpid(), msg.respond("Not me!") - kill_proc_tree(int(pid), sig=signal.SIGTERM) - msg.respond(f"Killed: {pid}") - else: - msg.respond("Unknown message!") - time.sleep(1) - - def output(self): - return luigi.local_target.LocalTarget('killed_tasks.pid') - - - -class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter() - retry_on_error = luigi.Parameter(default=False) - - def run(self): - with self.logging(self.retry_on_error): - zip_path = str(self.path) + ".zip" - if DOWNLOAD_ENVIRONMENT: - if luigi.contrib.opener.OpenerTarget(zip_path).exists(): - download_environment(self.envdir().path, zip_path, self.log) - else: - with luigi.contrib.opener.OpenerTarget(self.path).open("r") as f: - environment = yaml.load(f, Loader=yaml.SafeLoader) - print(environment) - make_environment(self.envdir().path, environment, self.log) - with self.output().open("w") as f: - f.write("DONE") - if UPLOAD_ENVIRONMENT: - archive = zip_environment(self.envdir().path, self.log) - fs = luigi.contrib.opener.OpenerTarget(self.path).fs - fs.put(archive, zip_path) - - - def envdir(self): - return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) - - def logfile(self): - return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) - - def output(self): - return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) - -def get_scheduler_url(task): - return task.set_progress_percentage.__self__._scheduler._url - -class RunTask(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter(significant=False, visibility=luigi.parameter.ParameterVisibility.HIDDEN) - retry_on_error = luigi.Parameter(default=False) - - @property - def scheduler(self): - return self.set_progress_percentage.__self__._scheduler - - @property - def scheduler_url(self): - return get_scheduler_url(self) - - def run(self): - with self.logging(self.retry_on_error): - self.log('RunTask start') - - src = '%s.config.yaml' % (self.path,) - fs = self.output().fs - - try: - cfg = luigi.contrib.opener.OpenerTarget('%s.config.yaml' % (self.path,)) - try: - with cfg.open("r") as f: - task = yaml.load(f, Loader=yaml.SafeLoader) - except: - if not cfg.fs.exists(cfg.path): - # The task has already been marked as done by another worker. - return - raise - self.log('RunTask config loaded') - - with luigi.contrib.opener.OpenerTarget(task["environment"]).open("r") as f: - environment = yaml.load(f, Loader=yaml.SafeLoader) - - env = MakeEnvironment(path=task["environment"], hostname=self.hostname, retry_on_error=self.retry_on_error) - if not env.output().exists(): - yield env - envpath = env.envdir().path - self.log('RunTask environment made') - - _ = pieshell.env(envpath, interactive=True) - +_.bashsource(envpath + "/bin/activate") - - _._exports.update(environment.get("variables", {})) - _._exports.update(task.get("variables", {})) - - self.log('RunTask loaded environment') - - command = task.get("command", None) - - task_args = dict(task.get("task", {})) - task_name = task_args.pop("name", None) - task_args["scheduler-url"] = self.scheduler_url - task_args["retcode-already-running"] = "10" - task_args["retcode-missing-data"] = "20" - task_args["retcode-not-run"] = "25" - task_args["retcode-task-failed"] = "30" - task_args["retcode-scheduling-error"] = "35" - task_args["retcode-unhandled-exception"] = "40" - - if command is None: - command = "luigi(task_name, **task_args)" - - scope = pieshell.environ.EnvScope(env=_) - scope["task_name"] = task_name - scope["task_args"] = task_args - - self.log('RunTask starting actual task' + strnow()) - - # Rerun the task until it is actually being run (or is done!) - # We need to do this, since luigi might exit because all - # dependencies of a task are already being run by other - # workers, and our task can't even start... - while True: - try: - for line in eval(command, scope): - self.log(line) - except pieshell.PipelineFailed as e: - self.log(str(e)) - if e.pipeline.exit_code <= 25: - time.sleep(5) - continue - raise - break - - self.log('RunTask end') - dst = '%s.done.yaml' % (self.path,) - - except Exception as e: - dst = '%s.error.yaml' % (self.path,) - self.log("Task failed: %s" % e) - self.log(traceback.format_exc()) - - src = '%s.config.yaml' % (self.path,) - fs = self.output().fs - try: - fs.move(src, dst) - except: - # Might already have been moved by another node... - pass - - if DB_URL is not None: - r = requests.get(DB_URL, params={"pipeline_url": self.path}) - if r.status_code != 200: - print("Unable to update status", r.status_code, r.text) - - def logfile(self): - return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) - - def output(self): - return luigi.contrib.opener.OpenerTarget( - '%s.done.yaml' % (self.path,)) - -def list_wildcard(fs, path): - if hasattr(fs, "list_wildcard"): - return fs.list_wildcard(path) - else: - dirpath, pattern = path.rsplit("/", 1) - return fnmatch.filter(fs.listdir(dirpath), path) - -class RunTasks(luigi.Task): - path = luigi.Parameter() - hostname = luigi.Parameter() - - @property - def scheduler_url(self): - return get_scheduler_url(self) - - def run(self): - while True: - tasks = [RunTask(path=path.replace(".config.yaml", ""), hostname=self.hostname) - for path in list_wildcard(self.output().fs, '%s/*.config.yaml' % (self.path,))] - #print("=============================================") - #print(tasks) - for task in tasks: - res = luigi.build([task], scheduler_url=self.scheduler_url, detailed_summary=True) - #print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") - #print(res.summary_text) - #print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") - time.sleep(1) - - def output(self): - # This target should never actually be created... - return luigi.contrib.opener.OpenerTarget('%s/done' % (self.path,)) - - -# luigi --module emerald_algorithms_evaluation.luigi Pipeline --param-evaluation_name=test-luigi-redhog-1 - -import random -class TestTask(luigi.Task): - name = luigi.Parameter() - time = luigi.Parameter() - - def run(self): - t = int(self.time) - for x in range(t): - self.set_progress_percentage(100 * x / t) - time.sleep(1) - print("¯\_( ͡° ͜ʖ ͡°)_/¯\n") - - with self.output().open("w") as f: - f.write("DONE") - - def output(self): - return luigi.contrib.opener.OpenerTarget(self.name) diff --git a/poltergust/lib.py b/poltergust/lib.py new file mode 100644 index 0000000..981d3fd --- /dev/null +++ b/poltergust/lib.py @@ -0,0 +1,124 @@ +import os +import re +import os.path +from pathlib import Path +import zipfile +import luigi +import luigi.contrib.opener +import luigi.local_target +import pieshell +import datetime +import psutil +import signal + +DB_URL = os.environ.get("DB_URL") +ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") +DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) +UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) +TAG = r"{{POLTERGUST_PIP}}" +PIPELINE_URL = os.environ.get("PIPELINE_URL") + + +def strnow(): + return datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S.%f") + +def make_environment(envpath, environment, log): + _ = pieshell.env(exports=dict(pieshell.env._exports)) + if not os.path.exists(envpath): + envdir = os.path.dirname(envpath) + if not os.path.exists(envdir): + os.makedirs(envdir) + +_.virtualenv(envpath, **environment.get("virtualenv", {})) + +_.bashsource(envpath + "/bin/activate") + for dep in environment["dependencies"]: + if TAG in dep: + dep = re.sub(TAG, _._exports["GITHUB_TOKEN_EMRLD"], dep) + for line in _.pip.install(dep): + log(line) + + +def zip_environment(envpath, log): + """ + Create a zip compression level 6 archive of a python virtualenv + Note that zipping virtualenvs is not recommended according to PEP405: + "Not considered as movable or copyable – you just recreate the + same environment in the target location" + + However, this will work if the two execution environments are "exactly" + the same + + @Args: + envpath: local path to the virtualenv directory + @Returns: + archive: name of the zipped archive + """ + archive = envpath + ".zip" + envdir = Path(envpath) + with zipfile.ZipFile(archive, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=6) as f: + for file in envdir.rglob('*'): + f.write(file, arcname=file.relative_to(envdir)) + log("Zipped: {}".format(archive)) + return archive + + +def download_environment(envpath, path, log): + """ + Download a python virtualenv and extract it + @Args + envpath: local path to the virtualenv directory + path: path to downloadable virtualenv (GCS/local disk) + log: luigi logger + """ + if not os.path.exists(envpath): + envdir = os.path.dirname(envpath) + if not os.path.exists(envdir): + os.makedirs(envdir) + + fs = luigi.contrib.opener.OpenerTarget(path).fs + with fs.download(path) as z: + with zipfile.ZipFile(z, mode="r") as arc: + arc.extractall(envpath) + + +def kill_proc_tree(pid, sig=signal.SIGTERM, timeout=5): + parent = psutil.Process(pid) + children = parent.children(recursive=True) + # children.append(parent) + + parent.terminate() + try: + parent.wait(timeout) + except psutil.TimeoutExpired: + parent.kill() + + for proc in children: + try: + proc.send_signal(sig) + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(children, timeout=2) + for p in alive: + p.kill() + + return gone + + +def remove_config_from_pipeline(cfg): + cfg_src = cfg + ".config.yaml" + cfg_dst = cfg + ".done.yaml" + + fs = luigi.contrib.opener.OpenerTarget(cfg_src).fs + if PIPELINE_URL: + full_cfg_path = PIPELINE_URL + "/" + cfg_src + if fs.exists(full_cfg_path): + try: + fs.move(full_cfg_path, cfg_dst) + except: + pass + finally: + return f"{full_cfg_path} moved!" + else: + return f"not exists!: {full_cfg_path}" + return "PIPELINE_URL not found!" + + diff --git a/poltergust/tasks.py b/poltergust/tasks.py new file mode 100644 index 0000000..8565ffc --- /dev/null +++ b/poltergust/tasks.py @@ -0,0 +1,256 @@ +import os +import os.path +import yaml +import traceback +import luigi +import luigi.contrib.opener +import luigi.local_target +import pieshell +import traceback +import time +import requests +import poltergust_luigi_utils # Add GCS luigi opener +import poltergust_luigi_utils.logging_task +import poltergust_luigi_utils.gcs_opener +import fnmatch + +from .lib import * + +class KillTask(luigi.Task, poltergust_luigi_utils.logging_task.LoggingTask): + accepts_messages = True + #priority = 10 + + def run(self): + while True: + if not self.scheduler_messages.empty(): + msg = self.scheduler_messages.get() + content = msg.content + if content.startswith("kill"): + args = content.split(":") + pid = args[1] + assert pid != os.getpid(), msg.respond("Not me!") + + cfg = args[2] + status = remove_config_from_pipeline(cfg) + msg.respond(status) + + gone = kill_proc_tree(int(pid), sig=signal.SIGTERM) + print(gone) + self.log(gone) + msg.respond(f"Killed: {pid}") + else: + msg.respond("Unknown message!") + time.sleep(1) + + def output(self): + return luigi.local_target.LocalTarget('killed_tasks.pid') + + + +class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter() + retry_on_error = luigi.Parameter(default=False) + + def run(self): + with self.logging(self.retry_on_error): + zip_path = str(self.path) + ".zip" + if DOWNLOAD_ENVIRONMENT: + if luigi.contrib.opener.OpenerTarget(zip_path).exists(): + download_environment(self.envdir().path, zip_path, self.log) + else: + with luigi.contrib.opener.OpenerTarget(self.path).open("r") as f: + environment = yaml.load(f, Loader=yaml.SafeLoader) + print(environment) + make_environment(self.envdir().path, environment, self.log) + with self.output().open("w") as f: + f.write("DONE") + if UPLOAD_ENVIRONMENT: + archive = zip_environment(self.envdir().path, self.log) + fs = luigi.contrib.opener.OpenerTarget(self.path).fs + fs.put(archive, zip_path) + + + def envdir(self): + return luigi.local_target.LocalTarget( + os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) + + def logfile(self): + return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) + + def output(self): + return luigi.local_target.LocalTarget( + os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) + +def get_scheduler_url(task): + return task.set_progress_percentage.__self__._scheduler._url + +class RunTask(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter(significant=False, visibility=luigi.parameter.ParameterVisibility.HIDDEN) + retry_on_error = luigi.Parameter(default=False) + + @property + def scheduler(self): + return self.set_progress_percentage.__self__._scheduler + + @property + def scheduler_url(self): + return get_scheduler_url(self) + + def run(self): + with self.logging(self.retry_on_error): + self.log('RunTask start') + + src = '%s.config.yaml' % (self.path,) + fs = self.output().fs + + try: + cfg = luigi.contrib.opener.OpenerTarget('%s.config.yaml' % (self.path,)) + try: + with cfg.open("r") as f: + task = yaml.load(f, Loader=yaml.SafeLoader) + except: + if not cfg.fs.exists(cfg.path): + # The task has already been marked as done by another worker. + return + raise + self.log('RunTask config loaded') + + with luigi.contrib.opener.OpenerTarget(task["environment"]).open("r") as f: + environment = yaml.load(f, Loader=yaml.SafeLoader) + + env = MakeEnvironment(path=task["environment"], hostname=self.hostname, retry_on_error=self.retry_on_error) + if not env.output().exists(): + yield env + envpath = env.envdir().path + self.log('RunTask environment made') + + _ = pieshell.env(envpath, interactive=True) + +_.bashsource(envpath + "/bin/activate") + + _._exports.update(environment.get("variables", {})) + _._exports.update(task.get("variables", {})) + + self.log('RunTask loaded environment') + + command = task.get("command", None) + + task_args = dict(task.get("task", {})) + task_name = task_args.pop("name", None) + task_args["scheduler-url"] = self.scheduler_url + task_args["retcode-already-running"] = "10" + task_args["retcode-missing-data"] = "20" + task_args["retcode-not-run"] = "25" + task_args["retcode-task-failed"] = "30" + task_args["retcode-scheduling-error"] = "35" + task_args["retcode-unhandled-exception"] = "40" + + if command is None: + command = "luigi(task_name, **task_args)" + + scope = pieshell.environ.EnvScope(env=_) + scope["task_name"] = task_name + scope["task_args"] = task_args + + self.log('RunTask starting actual task' + strnow()) + + # Rerun the task until it is actually being run (or is done!) + # We need to do this, since luigi might exit because all + # dependencies of a task are already being run by other + # workers, and our task can't even start... + while True: + try: + for line in eval(command, scope): + self.log(line) + except pieshell.PipelineFailed as e: + self.log(str(e)) + if e.pipeline.exit_code <= 25: + time.sleep(5) + continue + raise + break + + self.log('RunTask end') + dst = '%s.done.yaml' % (self.path,) + + except Exception as e: + dst = '%s.error.yaml' % (self.path,) + self.log("Task failed: %s" % e) + self.log(traceback.format_exc()) + + src = '%s.config.yaml' % (self.path,) + fs = self.output().fs + try: + fs.move(src, dst) + except: + # Might already have been moved by another node... + pass + + if DB_URL is not None: + r = requests.get(DB_URL, params={"pipeline_url": self.path}) + if r.status_code != 200: + print("Unable to update status", r.status_code, r.text) + + def logfile(self): + return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) + + def output(self): + return luigi.contrib.opener.OpenerTarget( + '%s.done.yaml' % (self.path,)) + +def list_wildcard(fs, path): + if hasattr(fs, "list_wildcard"): + return fs.list_wildcard(path) + else: + dirpath, pattern = path.rsplit("/", 1) + return fnmatch.filter(fs.listdir(dirpath), path) + +class RunTasks(luigi.Task): + path = luigi.Parameter() + hostname = luigi.Parameter() + + @property + def scheduler_url(self): + return get_scheduler_url(self) + + def run(self): + while True: + tasks = [RunTask(path=path.replace(".config.yaml", ""), hostname=self.hostname) + for path in list_wildcard(self.output().fs, '%s/*.config.yaml' % (self.path,))] + print("=============================================") + print(tasks) + for task in tasks: + res = luigi.build([task], scheduler_url=self.scheduler_url, detailed_summary=True) + print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<") + print(res.summary_text) + print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + time.sleep(1) + + def output(self): + # This target should never actually be created... + return luigi.contrib.opener.OpenerTarget('%s/done' % (self.path,)) + + + +import random +class TestTask(luigi.Task): + name = luigi.Parameter() + time = luigi.Parameter() + + def run(self): + t = int(self.time) + for x in range(t): + self.set_progress_percentage(100 * x / t) + time.sleep(1) + print("¯\_( ͡° ͜ʖ ͡°)_/¯\n") + + with self.output().open("w") as f: + f.write("DONE") + + def output(self): + return luigi.contrib.opener.OpenerTarget(self.name) + + + +# luigi --module emerald_algorithms_evaluation.luigi Pipeline --param-evaluation_name=test-luigi-redhog-1 From ffc30ea0da55fa5b741202c7732c08d3ddde7f6f Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Thu, 25 Apr 2024 12:27:20 +0200 Subject: [PATCH 5/7] refactor main script --- main.sh | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/main.sh b/main.sh index 3a9147f..2e5b38c 100755 --- a/main.sh +++ b/main.sh @@ -4,13 +4,9 @@ if [ "$HOSTNAME" == "" ]; then HOSTNAME="$(hostname -f)" fi -#PIPELINE_URL="gs://experimental-pipeline-inversion-ali-v1/pipeline" -#SCHEDULER_URL="http://localhost:8082" - -luigi KillTask --module poltergust --scheduler-url="${SCHEDULER_URL}" & -##>/dev/null & +luigi KillTask --module poltergust.tasks --scheduler-url="${SCHEDULER_URL}" &>/dev/null & while true; do - luigi RunTasks --module poltergust --hostname="${HOSTNAME}" --path="${PIPELINE_URL}" --scheduler-url="${SCHEDULER_URL}" + luigi RunTasks --module poltergust.tasks --hostname="${HOSTNAME}" --path="${PIPELINE_URL}" --scheduler-url="${SCHEDULER_URL}" sleep 1 done From 62e9e55d47edd9c0f772443091c36d6c459f0e14 Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Thu, 25 Apr 2024 23:15:36 +0200 Subject: [PATCH 6/7] improve KillTask + refactor --- poltergust/tasks.py | 79 ++++++++++++++++++++++++++------------------- 1 file changed, 45 insertions(+), 34 deletions(-) diff --git a/poltergust/tasks.py b/poltergust/tasks.py index 8565ffc..ae59bcd 100644 --- a/poltergust/tasks.py +++ b/poltergust/tasks.py @@ -12,41 +12,58 @@ import poltergust_luigi_utils # Add GCS luigi opener import poltergust_luigi_utils.logging_task import poltergust_luigi_utils.gcs_opener -import fnmatch - -from .lib import * - -class KillTask(luigi.Task, poltergust_luigi_utils.logging_task.LoggingTask): +import signal +# from multiprocessing import Queue +import queue as Queue +from multiprocessing import Event + +from .lib import Config +from .lib import ( + strnow, + make_environment, + download_environment, + kill_proc_tree, + zip_environment, + remove_config_from_pipeline, + list_wildcard, +) + + +class KillTask(luigi.Task): accepts_messages = True - #priority = 10 + shutdown_event = Event() def run(self): - while True: - if not self.scheduler_messages.empty(): - msg = self.scheduler_messages.get() + while not self.shutdown_event.is_set(): + try: + msg = self.scheduler_messages.get(block=True, timeout=0.05) content = msg.content + if content == "END": + break if content.startswith("kill"): - args = content.split(":") - pid = args[1] - assert pid != os.getpid(), msg.respond("Not me!") + argv = content.split(",") + pid = int(argv[0].split("=")[1]) + if pid == os.getpid(): + msg.respond("Not me!") + continue - cfg = args[2] - status = remove_config_from_pipeline(cfg) - msg.respond(status) + if len(argv) == 2: + cfg = argv[1].split("=")[1] + status = remove_config_from_pipeline(cfg) + print(status) gone = kill_proc_tree(int(pid), sig=signal.SIGTERM) - print(gone) - self.log(gone) + print(f"[X] {gone}") msg.respond(f"Killed: {pid}") else: msg.respond("Unknown message!") - time.sleep(1) - + except Queue.Empty: + continue + def output(self): return luigi.local_target.LocalTarget('killed_tasks.pid') - class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Task): path = luigi.Parameter() hostname = luigi.Parameter() @@ -55,7 +72,7 @@ class MakeEnvironment(poltergust_luigi_utils.logging_task.LoggingTask, luigi.Tas def run(self): with self.logging(self.retry_on_error): zip_path = str(self.path) + ".zip" - if DOWNLOAD_ENVIRONMENT: + if Config.DOWNLOAD_ENVIRONMENT: if luigi.contrib.opener.OpenerTarget(zip_path).exists(): download_environment(self.envdir().path, zip_path, self.log) else: @@ -65,7 +82,7 @@ def run(self): make_environment(self.envdir().path, environment, self.log) with self.output().open("w") as f: f.write("DONE") - if UPLOAD_ENVIRONMENT: + if Config.UPLOAD_ENVIRONMENT: archive = zip_environment(self.envdir().path, self.log) fs = luigi.contrib.opener.OpenerTarget(self.path).fs fs.put(archive, zip_path) @@ -73,14 +90,14 @@ def run(self): def envdir(self): return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) + os.path.join(Config.ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"))) def logfile(self): return luigi.contrib.opener.OpenerTarget("%s.%s.log.txt" % (self.path, self.hostname)) def output(self): return luigi.local_target.LocalTarget( - os.path.join(ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) + os.path.join(Config.ENVIRONMENTS_DIR, self.path.replace("://", "/").lstrip("/"), "done")) def get_scheduler_url(task): return task.set_progress_percentage.__self__._scheduler._url @@ -187,8 +204,8 @@ def run(self): # Might already have been moved by another node... pass - if DB_URL is not None: - r = requests.get(DB_URL, params={"pipeline_url": self.path}) + if Config.DB_URL is not None: + r = requests.get(Config.DB_URL, params={"pipeline_url": self.path}) if r.status_code != 200: print("Unable to update status", r.status_code, r.text) @@ -199,13 +216,8 @@ def output(self): return luigi.contrib.opener.OpenerTarget( '%s.done.yaml' % (self.path,)) -def list_wildcard(fs, path): - if hasattr(fs, "list_wildcard"): - return fs.list_wildcard(path) - else: - dirpath, pattern = path.rsplit("/", 1) - return fnmatch.filter(fs.listdir(dirpath), path) + class RunTasks(luigi.Task): path = luigi.Parameter() hostname = luigi.Parameter() @@ -233,7 +245,6 @@ def output(self): -import random class TestTask(luigi.Task): name = luigi.Parameter() time = luigi.Parameter() @@ -243,7 +254,7 @@ def run(self): for x in range(t): self.set_progress_percentage(100 * x / t) time.sleep(1) - print("¯\_( ͡° ͜ʖ ͡°)_/¯\n") + print(x) with self.output().open("w") as f: f.write("DONE") From 9bc3b8175efba0e249d9e445f17557d334bae4ba Mon Sep 17 00:00:00 2001 From: MetalnAlloys Date: Thu, 25 Apr 2024 23:15:51 +0200 Subject: [PATCH 7/7] more fucntions + refactor --- poltergust/lib.py | 81 +++++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/poltergust/lib.py b/poltergust/lib.py index 981d3fd..e177c61 100644 --- a/poltergust/lib.py +++ b/poltergust/lib.py @@ -10,25 +10,46 @@ import datetime import psutil import signal +import fnmatch -DB_URL = os.environ.get("DB_URL") -ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") -DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) -UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) -TAG = r"{{POLTERGUST_PIP}}" -PIPELINE_URL = os.environ.get("PIPELINE_URL") + +class Config: + DB_URL = os.environ.get("DB_URL") + ENVIRONMENTS_DIR = os.environ.get("ENVIRONMENTS_DIR", "/tmp/environments") + DOWNLOAD_ENVIRONMENT = os.environ.get("DOWNLOAD_ENVIRONMENT", False) + UPLOAD_ENVIRONMENT = os.environ.get("UPLOAD_ENVIRONMENT", False) + PIPELINE_URL = os.environ.get("PIPELINE_URL", "") + TAG = r"{{POLTERGUST_PIP}}" def strnow(): return datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S.%f") -def make_environment(envpath, environment, log): - _ = pieshell.env(exports=dict(pieshell.env._exports)) + +def makedirs_recursive(envpath): if not os.path.exists(envpath): envdir = os.path.dirname(envpath) if not os.path.exists(envdir): os.makedirs(envdir) - +_.virtualenv(envpath, **environment.get("virtualenv", {})) + return envpath + + +def list_wildcard(fs, path): + if hasattr(fs, "list_wildcard"): + return fs.list_wildcard(path) + else: + dirpath, pattern = path.rsplit("/", 1) + return fnmatch.filter(fs.listdir(dirpath), path) + + +def make_environment(envpath, environment, log): + _ = pieshell.env(exports=dict(pieshell.env._exports)) + # if not os.path.exists(envpath): + # envdir = os.path.dirname(envpath) + # if not os.path.exists(envdir): + # os.makedirs(envdir) + makedirs_recursive(envpath) + +_.virtualenv(envpath, **environment.get("virtualenv", {})) +_.bashsource(envpath + "/bin/activate") for dep in environment["dependencies"]: if TAG in dep: @@ -69,11 +90,7 @@ def download_environment(envpath, path, log): path: path to downloadable virtualenv (GCS/local disk) log: luigi logger """ - if not os.path.exists(envpath): - envdir = os.path.dirname(envpath) - if not os.path.exists(envdir): - os.makedirs(envdir) - + makedirs_recursive(envpath) fs = luigi.contrib.opener.OpenerTarget(path).fs with fs.download(path) as z: with zipfile.ZipFile(z, mode="r") as arc: @@ -103,22 +120,30 @@ def kill_proc_tree(pid, sig=signal.SIGTERM, timeout=5): return gone -def remove_config_from_pipeline(cfg): +def rename_file(src, dst): + fs = luigi.contrib.opener.OpenerTarget(src).fs + if fs.exists(src): + try: + fs.move(src, dst) + except: + pass + finally: + return f"{src} moved to {dst}" + else: + return f"not exists!: {src}" + + +def remove_config_from_pipeline(cfg, dst=None, gcs=True): cfg_src = cfg + ".config.yaml" cfg_dst = cfg + ".done.yaml" - fs = luigi.contrib.opener.OpenerTarget(cfg_src).fs - if PIPELINE_URL: - full_cfg_path = PIPELINE_URL + "/" + cfg_src - if fs.exists(full_cfg_path): - try: - fs.move(full_cfg_path, cfg_dst) - except: - pass - finally: - return f"{full_cfg_path} moved!" - else: - return f"not exists!: {full_cfg_path}" - return "PIPELINE_URL not found!" + if not Config.PIPELINE_URL.startswith("gs://"): + return f"GCS PIPELINE_URL not found!, {Config.PIPELINE_URL}" + full_cfg_path = Config.PIPELINE_URL + "/" + cfg_src + full_dst_path = Config.PIPELINE_URL + "/" + cfg_dst + ret = rename_file(full_cfg_path, full_dst_path) + return ret + # ret = rename_file(cfg, dst) + # return ret