Skip to content

Commit

Permalink
debugged test_hash and test_typing
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Jan 30, 2025
1 parent 290bdca commit d584074
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 139 deletions.
28 changes: 20 additions & 8 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ def run(self, rerun: bool = False):
self._populate_filesystem()
os.chdir(self.output_dir)
result = Result(
outputs=None, runtime=None, errored=False, output_dir=self.output_dir
outputs=None,
runtime=None,
errored=False,
output_dir=self.output_dir,
definition=self.definition,
)
self.hooks.pre_run_task(self)
self.audit.start_audit(odir=self.output_dir)
Expand Down Expand Up @@ -409,7 +413,11 @@ async def run_async(self, rerun: bool = False) -> Result:
cwd = os.getcwd()
self._populate_filesystem()
result = Result(
outputs=None, runtime=None, errored=False, output_dir=self.output_dir
outputs=None,
runtime=None,
errored=False,
output_dir=self.output_dir,
definition=self.definition,
)
self.hooks.pre_run_task(self)
self.audit.start_audit(odir=self.output_dir)
Expand Down Expand Up @@ -506,7 +514,11 @@ def result(self, return_inputs=False):
"""
if self.errored:
return Result(
outputs=None, runtime=None, errored=True, output_dir=self.output_dir
outputs=None,
runtime=None,
errored=True,
output_dir=self.output_dir,
definition=self.definition,
)

checksum = self.checksum
Expand Down Expand Up @@ -801,17 +813,17 @@ def _create_graph(
# adding an edge to the graph if task id expecting output from a different task
if lf.name != self.name:
# checking if the connection is already in the graph
if (self[lf.name], node) not in graph.edges:
graph.add_edges((self[lf.name], node))
if (graph.node(lf.name), node) not in graph.edges:
graph.add_edges((graph.node(lf.name), node))
if detailed:
graph.add_edges_description(
(node.name, field.name, lf.name, lf.field)
)
logger.debug("Connecting %s to %s", lf.name, node.name)
# adding a state from the previous task to other_states
if (
self[lf.name].state
and self[lf.name].state.splitter_rpn_final
graph.node(lf.name).state
and graph.node(lf.name).state.splitter_rpn_final
):
# variables that are part of inner splitters should be
# treated as a containers
Expand All @@ -823,7 +835,7 @@ def _create_graph(
# adding task_name: (task.state, [a field from the connection]
if lf.name not in other_states:
other_states[lf.name] = (
self[lf.name].state,
graph.node(lf.name).state,
[field.name],
)
else:
Expand Down
29 changes: 19 additions & 10 deletions pydra/engine/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from fileformats.core import FileSet

if ty.TYPE_CHECKING:
from .specs import TaskDef, Result
from .specs import TaskDef, Result, WorkflowOutputs
from .core import Task
from pydra.design.base import Field

Expand Down Expand Up @@ -149,6 +149,7 @@ def save(
task : :class:`~pydra.engine.core.TaskBase`
Task to pickle and write
"""
from pydra.engine.core import is_workflow

if task is None and result is None:
raise ValueError("Nothing to be saved")
Expand All @@ -162,27 +163,35 @@ def save(
lockfile = task_path.parent / (task_path.name + "_save.lock")
with SoftFileLock(lockfile):
if result:
if task_path.name.startswith("Workflow") and result.outputs is not None:
if (
result.definition
and is_workflow(result.definition)
and result.outputs is not None
):
# copy files to the workflow directory
result.outputs = copyfile_workflow(wf_path=task_path, result=result)
result.outputs = copyfile_workflow(
wf_path=task_path, outputs=result.outputs
)
with (task_path / f"{name_prefix}_result.pklz").open("wb") as fp:
cp.dump(result, fp)
if task:
with (task_path / f"{name_prefix}_task.pklz").open("wb") as fp:
cp.dump(task, fp)


def copyfile_workflow(wf_path: os.PathLike, result: "Result") -> "Result":
def copyfile_workflow(
wf_path: os.PathLike, outputs: "WorkflowOutputs"
) -> "WorkflowOutputs":
"""if file in the wf results, the file will be copied to the workflow directory"""
from .helpers_file import copy_nested_files

for field in attrs_fields(result.outputs):
value = getattr(result.outputs, field.name)
for field in attrs_fields(outputs):
value = getattr(outputs, field.name)
# if the field is a path or it can contain a path _copyfile_single_value is run
# to move all files and directories to the workflow directory
new_value = copy_nested_files(value, wf_path, mode=FileSet.CopyMode.hardlink)
setattr(result.outputs, field.name, new_value)
return result
setattr(outputs, field.name, new_value)
return outputs


def gather_runtime_info(fname):
Expand Down Expand Up @@ -457,7 +466,7 @@ def load_and_run(task_pkl: Path, rerun: bool = False) -> Path:
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
errorfile = record_error(task_pkl.parent, error=traceback)
result = Result(output=None, runtime=None, errored=True)
result = Result(output=None, runtime=None, errored=True, definition=None)
save(task_pkl.parent, result=result)
raise

Expand All @@ -472,7 +481,7 @@ def load_and_run(task_pkl: Path, rerun: bool = False) -> Path:
traceback = format_exception(etype, eval, etr)
errorfile = record_error(task.output_dir, error=traceback)
if not resultfile.exists(): # not sure if this is needed
result = Result(output=None, runtime=None, errored=True)
result = Result(output=None, runtime=None, errored=True, definition=None)
save(task.output_dir, result=result)
e.add_note(f" full crash report is here: {errorfile}")
raise
Expand Down
1 change: 1 addition & 0 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ class Result(ty.Generic[OutputsType]):
outputs: OutputsType | None = None
runtime: Runtime | None = None
errored: bool = False
definition: TaskDef[OutputsType] | None = None

def __getstate__(self):
state = attrs_values(self)
Expand Down
9 changes: 7 additions & 2 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from .node import Node
from .specs import TaskDef, WorkflowDef
from .environments import Environment
from .state import State

DefType = ty.TypeVar("DefType", bound="TaskDef")

Expand Down Expand Up @@ -456,7 +457,7 @@ def __init__(
self.queued = {}
self.running = {} # Not used in logic, but may be useful for progress tracking
self.unrunnable = defaultdict(list)
self.state_names = self.node.state.names
self.state_names = self.node.state.names if self.node.state else []
self.workflow_inputs = workflow_inputs
self.graph = None

Expand All @@ -468,6 +469,10 @@ def inputs(self) -> "Node.Inputs":
def _definition(self) -> "Node":
return self.node._definition

@property
def state(self) -> "State":
return self.node.state

@property
def tasks(self) -> ty.Iterable["Task[DefType]"]:
if self._tasks is None:
Expand Down Expand Up @@ -535,7 +540,7 @@ def _generate_tasks(self) -> ty.Iterable["Task[DefType]"]:
yield Task(
definition=self.node._definition._resolve_lazy_inputs(
workflow_inputs=self.workflow_inputs,
exec_graph=self.graph,
graph=self.graph,
state_index=None,
),
submitter=self.submitter,
Expand Down
17 changes: 10 additions & 7 deletions pydra/engine/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import pytest
import cloudpickle as cp
from unittest.mock import Mock
from pydra.engine.submitter import Submitter
from pydra.engine.specs import Result
from pydra.engine.core import Task
from fileformats.generic import Directory, File
from fileformats.core import FileSet
from .utils import Multiply, RaiseXeq1
Expand All @@ -24,24 +27,24 @@ def test_save(tmpdir):
outdir = Path(tmpdir)
with pytest.raises(ValueError):
save(tmpdir)
foo = Multiply(name="mult", x=1, y=2)
foo = Task(name="mult", definition=Multiply(x=1, y=2), submitter=Submitter())
# save task
save(outdir, task=foo)
del foo
# load saved task
task_pkl = outdir / "_task.pklz"
foo = cp.loads(task_pkl.read_bytes())
foo: Task = cp.loads(task_pkl.read_bytes())
assert foo.name == "mult"
assert foo.inputs.x == 1 and foo.inputs.y == 2
assert foo.inputs["x"] == 1 and foo.inputs["y"] == 2
# execute task and save result
res = foo()
assert res.output.out == 2
res: Result = foo.run()
assert res.outputs.out == 2
save(outdir, result=res)
del res
# load saved result
res_pkl = outdir / "_result.pklz"
res = cp.loads(res_pkl.read_bytes())
assert res.output.out == 2
res: Result = cp.loads(res_pkl.read_bytes())
assert res.outputs.out == 2


def test_hash_file(tmpdir):
Expand Down
8 changes: 4 additions & 4 deletions pydra/utils/tests/test_hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import typing as ty
from fileformats.application import Zip, Json
from fileformats.text import TextFile
from ..hash import (
from pydra.utils.hash import (
Cache,
bytes_repr,
hash_object,
Expand Down Expand Up @@ -190,9 +190,9 @@ def test_bytes_repr_type2():
class MyClass(ty.Generic[T]):
pass

obj_repr = join_bytes_repr(MyClass[int])
assert (
obj_repr == b"type:(pydra.utils.tests.test_hash.MyClass[type:(builtins.int)])"
obj_repr = join_bytes_repr(MyClass[int]).decode()
assert re.match(
r"type:\([\w\.]*test_hash.MyClass\[type:\(builtins.int\)\]\)", obj_repr
)


Expand Down
Loading

0 comments on commit d584074

Please sign in to comment.