-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/stageclasses #56
Merged
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a137310
devalidate based on the source of the object instead of the file's
3918fd5
don't raise error if pipeline file isn't there, but mkdir with warn
c923afe
allow option to get required stages directly from cache
5ed276e
util class for interactive use (especially useful with rerun_required…
9b51f6d
fix hashing and make configure optional
ecc8123
fix type hint
61559f4
ensuring working_directory should be optional and off by default (exc…
davibicudo 723dd39
add a couple of tests
davibicudo 194b95b
Update CHANGELOG.md
davibicudo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ | |
import os, stat, errno | ||
import pickle | ||
import shutil | ||
from typing import Dict, List, Union, Callable | ||
from types import ModuleType | ||
|
||
import networkx as nx | ||
import yaml | ||
|
@@ -60,44 +62,40 @@ def execute(self, context): | |
else: | ||
raise RuntimeError("Stage %s does not have execute method" % self.name) | ||
|
||
def get_module_hash(module): | ||
if os.path.exists(module.__file__): | ||
with open(module.__file__) as f: | ||
hash = hashlib.md5() | ||
hash.update(f.read().encode("utf-8")) | ||
return hash.hexdigest() | ||
|
||
return None | ||
def get_stage_hash(descriptor): | ||
source = inspect.getsource(descriptor) | ||
hash = hashlib.md5() | ||
hash.update(source.encode("utf-8")) | ||
return hash.hexdigest() | ||
|
||
def resolve_stage(descriptor): | ||
module_hash = None | ||
stage_hash = None | ||
|
||
if isinstance(descriptor, str): | ||
try: | ||
# Try to get the module referenced by the string | ||
descriptor = importlib.import_module(descriptor) | ||
module_hash = get_module_hash(descriptor) | ||
stage_hash = get_stage_hash(descriptor) | ||
except ModuleNotFoundError: | ||
# Not a module, but maybe a class? | ||
parts = descriptor.split(".") | ||
|
||
module = importlib.import_module(".".join(parts[:-1])) | ||
module_hash = get_module_hash(module) | ||
stage_hash = get_stage_hash(module) | ||
|
||
constructor = getattr(module, parts[-1]) | ||
descriptor = constructor() | ||
|
||
if inspect.ismodule(descriptor): | ||
module_hash = get_module_hash(descriptor) | ||
return StageInstance(descriptor, descriptor.__name__, module_hash) | ||
stage_hash = get_stage_hash(descriptor) | ||
return StageInstance(descriptor, descriptor.__name__, stage_hash) | ||
|
||
if inspect.isclass(descriptor): | ||
module = importlib.import_module(descriptor.__module__) | ||
module_hash = get_module_hash(module) | ||
return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), module_hash) | ||
stage_hash = get_stage_hash(descriptor) | ||
return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash) | ||
|
||
clazz = descriptor.__class__ | ||
return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), module_hash) | ||
return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), stage_hash) | ||
|
||
def get_config_path(name, config): | ||
if name in config: | ||
|
@@ -159,7 +157,8 @@ def __init__(self, instance, config, configuration_context): | |
self.hashed_name = hash_name(instance.name, configuration_context.required_config) | ||
|
||
def configure(self, context): | ||
return self.instance.configure(context) | ||
if hasattr(self.instance, "configure"): | ||
return self.instance.configure(context) | ||
|
||
def execute(self, context): | ||
return self.instance.execute(context) | ||
|
@@ -481,15 +480,19 @@ def update_json(meta, working_directory): | |
shutil.move("%s/pipeline.json.new" % working_directory, "%s/pipeline.json" % working_directory) | ||
|
||
|
||
def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, logger = logging.getLogger("synpp")): | ||
def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, | ||
logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False): | ||
# 0) Construct pipeline config | ||
pipeline_config = {} | ||
if "processes" in config: pipeline_config["processes"] = config["processes"] | ||
if "progress_interval" in config: pipeline_config["progress_interval"] = config["progress_interval"] | ||
|
||
if not working_directory is None: | ||
if not os.path.isdir(working_directory): | ||
raise PipelineError("Working directory does not exist: %s" % working_directory) | ||
if ensure_working_directory and working_directory is None: | ||
working_directory = '.synpp_cache' | ||
if working_directory is not None: | ||
if not os.path.isdir(working_directory) and ensure_working_directory: | ||
logger.warning("Working directory does not exist, it will be created: %s" % working_directory) | ||
os.mkdir(working_directory) | ||
|
||
working_directory = os.path.realpath(working_directory) | ||
|
||
|
@@ -580,9 +583,11 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
|
||
# 4) Devalidate stages | ||
sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys() | ||
stale_hashes = set() | ||
|
||
# 4.1) Devalidate if they are required | ||
stale_hashes = set(required_hashes) | ||
# 4.1) Devalidate if they are required (optional, otherwise will reload from cache) | ||
if rerun_required: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this covered by a unit test ? If not, can you add one ? |
||
stale_hashes.update(required_hashes) | ||
|
||
# 4.2) Devalidate if not in meta | ||
for hash in sorted_cached_hashes: | ||
|
@@ -592,7 +597,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
# 4.3) Devalidate if configuration values have changed | ||
# This devalidation step is obsolete since we have implicit config parameters | ||
|
||
# 4.3) Devalidate if module hash of a stage has changed | ||
# 4.4) Devalidate if module hash of a stage has changed | ||
for hash in sorted_cached_hashes: | ||
if hash in meta: | ||
if not "module_hash" in meta[hash]: | ||
|
@@ -605,7 +610,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
if previous_module_hash != current_module_hash: | ||
stale_hashes.add(hash) | ||
|
||
# 4.3) Devalidate if cache is not existant | ||
# 4.5) Devalidate if cache is not existant | ||
if not working_directory is None: | ||
for hash in sorted_cached_hashes: | ||
directory_path = "%s/%s.cache" % (working_directory, hash) | ||
|
@@ -614,7 +619,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
if not hash in cache_available: | ||
stale_hashes.add(hash) | ||
|
||
# 4.4) Devalidate if parent has been updated | ||
# 4.6) Devalidate if parent has been updated | ||
for hash in sorted_cached_hashes: | ||
if not hash in stale_hashes and hash in meta: | ||
for dependency_hash, dependency_update in meta[hash]["dependencies"].items(): | ||
|
@@ -624,7 +629,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
if meta[dependency_hash]["updated"] > dependency_update: | ||
stale_hashes.add(hash) | ||
|
||
# 4.5) Devalidate if parents are not the same anymore | ||
# 4.7) Devalidate if parents are not the same anymore | ||
for hash in sorted_cached_hashes: | ||
if not hash in stale_hashes and hash in meta: | ||
cached_hashes = set(meta[hash]["dependencies"].keys()) | ||
|
@@ -633,7 +638,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
if not cached_hashes == current_hashes: | ||
stale_hashes.add(hash) | ||
|
||
# 4.6) Manually devalidate stages | ||
# 4.8) Manually devalidate stages | ||
for hash in sorted_cached_hashes: | ||
stage = registry[hash] | ||
cache_path = "%s/%s.cache" % (working_directory, hash) | ||
|
@@ -645,13 +650,13 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
if not validation_token == existing_token: | ||
stale_hashes.add(hash) | ||
|
||
# 4.7) Devalidate descendants of devalidated stages | ||
# 4.9) Devalidate descendants of devalidated stages | ||
for hash in set(stale_hashes): | ||
for descendant_hash in nx.descendants(graph, hash): | ||
if not descendant_hash in stale_hashes: | ||
stale_hashes.add(descendant_hash) | ||
|
||
# 4.8) Devalidate ephemeral stages if necessary | ||
# 4.10) Devalidate ephemeral stages if necessary | ||
pending = set(stale_hashes) | ||
|
||
while len(pending) > 0: | ||
|
@@ -761,6 +766,14 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
progress, len(stale_hashes), 100 * progress / len(stale_hashes) | ||
)) | ||
|
||
if not rerun_required: | ||
# Load remaining previously cached results | ||
for hash in required_hashes: | ||
if results[required_hashes.index(hash)] is None: | ||
with open("%s/%s.p" % (working_directory, hash), "rb") as f: | ||
logger.info("Loading cache for %s ..." % hash) | ||
results[required_hashes.index(hash)] = pickle.load(f) | ||
|
||
if verbose: | ||
info = {} | ||
|
||
|
@@ -778,26 +791,61 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non | |
|
||
|
||
def run_from_yaml(path): | ||
with open(path) as f: | ||
settings = yaml.load(f, Loader = yaml.SafeLoader) | ||
|
||
definitions = [] | ||
Synpp.build_from_yml(path).run_pipeline() | ||
|
||
for item in settings["run"]: | ||
parameters = {} | ||
|
||
if type(item) == dict: | ||
key = list(item.keys())[0] | ||
parameters = item[key] | ||
item = key | ||
# Convenience class mostly for running stages individually (possibly interactively, e.g. in Jupyter) | ||
class Synpp: | ||
def __init__(self, config: dict, working_directory: str = None, logger: logging.Logger = logging.getLogger("synpp"), | ||
definitions: List[Dict[str, Union[str, Callable, ModuleType]]] = None, flowchart_path: str = None, | ||
dryrun: bool = False): | ||
self.config = config | ||
self.working_directory = working_directory | ||
self.logger = logger | ||
self.definitions = definitions | ||
self.flowchart_path = flowchart_path | ||
self.dryrun = dryrun | ||
|
||
def run_pipeline(self, definitions=None, rerun_required=False, dryrun=None, verbose=False, flowchart_path=None): | ||
if definitions is None and self.definitions is None: | ||
raise PipelineError("A list of stage definitions must be available in object or provided explicitly.") | ||
elif definitions is None: | ||
definitions = self.definitions | ||
if dryrun is None: | ||
dryrun = self.dryrun | ||
return run(definitions, self.config, self.working_directory, flowchart_path=flowchart_path, | ||
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_required, | ||
ensure_working_directory=True) | ||
|
||
def run_single(self, descriptor, config={}, rerun_if_cached=False, dryrun=False, verbose=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really nice! If it had a unit test, would be even better :) |
||
return run([{'descriptor': descriptor, 'config': config}], self.config, self.working_directory, | ||
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_if_cached, | ||
flowchart_path=self.flowchart_path, ensure_working_directory=True)[0] | ||
|
||
@staticmethod | ||
def build_from_yml(config_path): | ||
with open(config_path) as f: | ||
settings = yaml.load(f, Loader=yaml.SafeLoader) | ||
|
||
definitions = [] | ||
|
||
for item in settings["run"]: | ||
parameters = {} | ||
|
||
if type(item) == dict: | ||
key = list(item.keys())[0] | ||
parameters = item[key] | ||
item = key | ||
|
||
definitions.append({ | ||
"descriptor": item, "config": parameters | ||
}) | ||
|
||
definitions.append({ | ||
"descriptor": item, "config": parameters | ||
}) | ||
config = settings["config"] if "config" in settings else {} | ||
working_directory = settings["working_directory"] if "working_directory" in settings else None | ||
flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None | ||
dryrun = settings["dryrun"] if "dryrun" in settings else False | ||
|
||
config = settings["config"] if "config" in settings else {} | ||
working_directory = settings["working_directory"] if "working_directory" in settings else None | ||
flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None | ||
dryrun = settings["dryrun"] if "dryrun" in settings else False | ||
return Synpp(config=config, working_directory=working_directory, definitions=definitions, | ||
flowchart_path=flowchart_path, dryrun=dryrun) | ||
|
||
run(definitions=definitions, config=config, working_directory=working_directory, flowchart_path=flowchart_path, dryrun=dryrun) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the point, but so far I was deliberately avoiding this, to make sure that the user is 100% aware where all the caching files will go. Maybe we can add a config flag, which is usually "false" like "auto_generate_working_directory" ?