Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/stageclasses #56

Merged
merged 9 commits into from
May 27, 2021
146 changes: 97 additions & 49 deletions src/synpp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import os, stat, errno
import pickle
import shutil
from typing import Dict, List, Union, Callable
from types import ModuleType

import networkx as nx
import yaml
Expand Down Expand Up @@ -60,44 +62,40 @@ def execute(self, context):
else:
raise RuntimeError("Stage %s does not have execute method" % self.name)

def get_module_hash(module):
if os.path.exists(module.__file__):
with open(module.__file__) as f:
hash = hashlib.md5()
hash.update(f.read().encode("utf-8"))
return hash.hexdigest()

return None
def get_stage_hash(descriptor):
source = inspect.getsource(descriptor)
hash = hashlib.md5()
hash.update(source.encode("utf-8"))
return hash.hexdigest()

def resolve_stage(descriptor):
module_hash = None
stage_hash = None

if isinstance(descriptor, str):
try:
# Try to get the module referenced by the string
descriptor = importlib.import_module(descriptor)
module_hash = get_module_hash(descriptor)
stage_hash = get_stage_hash(descriptor)
except ModuleNotFoundError:
# Not a module, but maybe a class?
parts = descriptor.split(".")

module = importlib.import_module(".".join(parts[:-1]))
module_hash = get_module_hash(module)
stage_hash = get_stage_hash(module)

constructor = getattr(module, parts[-1])
descriptor = constructor()

if inspect.ismodule(descriptor):
module_hash = get_module_hash(descriptor)
return StageInstance(descriptor, descriptor.__name__, module_hash)
stage_hash = get_stage_hash(descriptor)
return StageInstance(descriptor, descriptor.__name__, stage_hash)

if inspect.isclass(descriptor):
module = importlib.import_module(descriptor.__module__)
module_hash = get_module_hash(module)
return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), module_hash)
stage_hash = get_stage_hash(descriptor)
return StageInstance(descriptor(), "%s.%s" % (descriptor.__module__, descriptor.__name__), stage_hash)

clazz = descriptor.__class__
return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), module_hash)
return StageInstance(descriptor, "%s.%s" % (clazz.__module__, clazz.__name__), stage_hash)

def get_config_path(name, config):
if name in config:
Expand Down Expand Up @@ -159,7 +157,8 @@ def __init__(self, instance, config, configuration_context):
self.hashed_name = hash_name(instance.name, configuration_context.required_config)

def configure(self, context):
return self.instance.configure(context)
if hasattr(self.instance, "configure"):
return self.instance.configure(context)

def execute(self, context):
return self.instance.execute(context)
Expand Down Expand Up @@ -481,15 +480,19 @@ def update_json(meta, working_directory):
shutil.move("%s/pipeline.json.new" % working_directory, "%s/pipeline.json" % working_directory)


def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False, logger = logging.getLogger("synpp")):
def run(definitions, config = {}, working_directory = None, flowchart_path = None, dryrun = False, verbose = False,
logger = logging.getLogger("synpp"), rerun_required=True, ensure_working_directory=False):
# 0) Construct pipeline config
pipeline_config = {}
if "processes" in config: pipeline_config["processes"] = config["processes"]
if "progress_interval" in config: pipeline_config["progress_interval"] = config["progress_interval"]

if not working_directory is None:
if not os.path.isdir(working_directory):
raise PipelineError("Working directory does not exist: %s" % working_directory)
if ensure_working_directory and working_directory is None:
working_directory = '.synpp_cache'
if working_directory is not None:
if not os.path.isdir(working_directory) and ensure_working_directory:
logger.warning("Working directory does not exist, it will be created: %s" % working_directory)
os.mkdir(working_directory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the point, but so far I was deliberately avoiding this, to make sure that the user is 100% aware where all the caching files will go. Maybe we can add a config flag, which is usually "false" like "auto_generate_working_directory" ?


working_directory = os.path.realpath(working_directory)

Expand Down Expand Up @@ -580,9 +583,11 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non

# 4) Devalidate stages
sorted_cached_hashes = sorted_hashes - ephemeral_counts.keys()
stale_hashes = set()

# 4.1) Devalidate if they are required
stale_hashes = set(required_hashes)
# 4.1) Devalidate if they are required (optional, otherwise will reload from cache)
if rerun_required:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this covered by a unit test ? If not, can you add one ?

stale_hashes.update(required_hashes)

# 4.2) Devalidate if not in meta
for hash in sorted_cached_hashes:
Expand All @@ -592,7 +597,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
# 4.3) Devalidate if configuration values have changed
# This devalidation step is obsolete since we have implicit config parameters

# 4.3) Devalidate if module hash of a stage has changed
# 4.4) Devalidate if module hash of a stage has changed
for hash in sorted_cached_hashes:
if hash in meta:
if not "module_hash" in meta[hash]:
Expand All @@ -605,7 +610,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
if previous_module_hash != current_module_hash:
stale_hashes.add(hash)

# 4.3) Devalidate if cache is not existant
# 4.5) Devalidate if cache is not existant
if not working_directory is None:
for hash in sorted_cached_hashes:
directory_path = "%s/%s.cache" % (working_directory, hash)
Expand All @@ -614,7 +619,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
if not hash in cache_available:
stale_hashes.add(hash)

# 4.4) Devalidate if parent has been updated
# 4.6) Devalidate if parent has been updated
for hash in sorted_cached_hashes:
if not hash in stale_hashes and hash in meta:
for dependency_hash, dependency_update in meta[hash]["dependencies"].items():
Expand All @@ -624,7 +629,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
if meta[dependency_hash]["updated"] > dependency_update:
stale_hashes.add(hash)

# 4.5) Devalidate if parents are not the same anymore
# 4.7) Devalidate if parents are not the same anymore
for hash in sorted_cached_hashes:
if not hash in stale_hashes and hash in meta:
cached_hashes = set(meta[hash]["dependencies"].keys())
Expand All @@ -633,7 +638,7 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
if not cached_hashes == current_hashes:
stale_hashes.add(hash)

# 4.6) Manually devalidate stages
# 4.8) Manually devalidate stages
for hash in sorted_cached_hashes:
stage = registry[hash]
cache_path = "%s/%s.cache" % (working_directory, hash)
Expand All @@ -645,13 +650,13 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
if not validation_token == existing_token:
stale_hashes.add(hash)

# 4.7) Devalidate descendants of devalidated stages
# 4.9) Devalidate descendants of devalidated stages
for hash in set(stale_hashes):
for descendant_hash in nx.descendants(graph, hash):
if not descendant_hash in stale_hashes:
stale_hashes.add(descendant_hash)

# 4.8) Devalidate ephemeral stages if necessary
# 4.10) Devalidate ephemeral stages if necessary
pending = set(stale_hashes)

while len(pending) > 0:
Expand Down Expand Up @@ -761,6 +766,14 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non
progress, len(stale_hashes), 100 * progress / len(stale_hashes)
))

if not rerun_required:
# Load remaining previously cached results
for hash in required_hashes:
if results[required_hashes.index(hash)] is None:
with open("%s/%s.p" % (working_directory, hash), "rb") as f:
logger.info("Loading cache for %s ..." % hash)
results[required_hashes.index(hash)] = pickle.load(f)

if verbose:
info = {}

Expand All @@ -778,26 +791,61 @@ def run(definitions, config = {}, working_directory = None, flowchart_path = Non


def run_from_yaml(path):
with open(path) as f:
settings = yaml.load(f, Loader = yaml.SafeLoader)

definitions = []
Synpp.build_from_yml(path).run_pipeline()

for item in settings["run"]:
parameters = {}

if type(item) == dict:
key = list(item.keys())[0]
parameters = item[key]
item = key
# Convenience class mostly for running stages individually (possibly interactively, e.g. in Jupyter)
class Synpp:
def __init__(self, config: dict, working_directory: str = None, logger: logging.Logger = logging.getLogger("synpp"),
definitions: List[Dict[str, Union[str, Callable, ModuleType]]] = None, flowchart_path: str = None,
dryrun: bool = False):
self.config = config
self.working_directory = working_directory
self.logger = logger
self.definitions = definitions
self.flowchart_path = flowchart_path
self.dryrun = dryrun

def run_pipeline(self, definitions=None, rerun_required=False, dryrun=None, verbose=False, flowchart_path=None):
if definitions is None and self.definitions is None:
raise PipelineError("A list of stage definitions must be available in object or provided explicitly.")
elif definitions is None:
definitions = self.definitions
if dryrun is None:
dryrun = self.dryrun
return run(definitions, self.config, self.working_directory, flowchart_path=flowchart_path,
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_required,
ensure_working_directory=True)

def run_single(self, descriptor, config={}, rerun_if_cached=False, dryrun=False, verbose=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice! If it had a unit test, would be even better :)

return run([{'descriptor': descriptor, 'config': config}], self.config, self.working_directory,
dryrun=dryrun, verbose=verbose, logger=self.logger, rerun_required=rerun_if_cached,
flowchart_path=self.flowchart_path, ensure_working_directory=True)[0]

@staticmethod
def build_from_yml(config_path):
with open(config_path) as f:
settings = yaml.load(f, Loader=yaml.SafeLoader)

definitions = []

for item in settings["run"]:
parameters = {}

if type(item) == dict:
key = list(item.keys())[0]
parameters = item[key]
item = key

definitions.append({
"descriptor": item, "config": parameters
})

definitions.append({
"descriptor": item, "config": parameters
})
config = settings["config"] if "config" in settings else {}
working_directory = settings["working_directory"] if "working_directory" in settings else None
flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None
dryrun = settings["dryrun"] if "dryrun" in settings else False

config = settings["config"] if "config" in settings else {}
working_directory = settings["working_directory"] if "working_directory" in settings else None
flowchart_path = settings["flowchart_path"] if "flowchart_path" in settings else None
dryrun = settings["dryrun"] if "dryrun" in settings else False
return Synpp(config=config, working_directory=working_directory, definitions=definitions,
flowchart_path=flowchart_path, dryrun=dryrun)

run(definitions=definitions, config=config, working_directory=working_directory, flowchart_path=flowchart_path, dryrun=dryrun)
22 changes: 22 additions & 0 deletions tests/test_run_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,25 @@ def test_cache_path(tmpdir):
}], working_directory = tmpdir.mkdir("sub"))

assert result[0] == "abc_uvw"

def test_rerun_required(tmpdir):
working_directory = tmpdir.mkdir("sub")

results1 = synpp.run([{
"descriptor": "tests.fixtures.sum_config",
"config": {"a": 5, "b": 9}
}], working_directory=working_directory)
results2 = synpp.run([{
"descriptor": "tests.fixtures.sum_config",
"config": {"a": 5, "b": 9},
}], working_directory=working_directory, rerun_required=False, verbose=True)
assert results2['results'][0] == results1[0]
assert len(results2['stale']) == 0

def test_wrapper(tmpdir):
working_directory = tmpdir.mkdir("sub")
wrapper = synpp.Synpp(config={'working_directory': working_directory})
assert 14 == wrapper.run_single(descriptor="tests.fixtures.sum_config", config={"a": 5, "b": 9})
res = wrapper.run_pipeline(definitions=[{"descriptor": "tests.fixtures.sum_config", "config": {"a": 5, "b": 9}}])
assert len(res) == 1
assert 14 == res[0]