Skip to content

Commit

Permalink
fix merge issues, and a hidden bug in the compatibility layer when ig…
Browse files Browse the repository at this point in the history
…noring code changes
  • Loading branch information
TyberiusPrime committed Nov 28, 2024
1 parent f88a57b commit 6b28384
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
14 changes: 8 additions & 6 deletions python/pypipegraph2/jobs.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from functools import total_ordering

from . import hashers, exceptions, ppg_traceback
from .enums import Resources,RunMode
from .enums import Resources, RunMode
from .util import escape_logging
import hashlib
import shutil
Expand Down Expand Up @@ -2107,9 +2107,9 @@ def load():
return CachedJobTuple(load_job, cache_job)


(??)class AttributeLoadingJob(
(??) Job
(??)): # Todo: refactor with DataLoadingJob. Also figure out how to hash the result?
class AttributeLoadingJob(
Job, _InputHashAwareJobMixin
): # Todo: refactor with DataLoadingJob. Also figure out how to hash the result?
eval_job_kind = "Ephemeral"

def __new__(cls, job_id, *args, **kwargs):
Expand Down Expand Up @@ -2178,7 +2178,9 @@ def readd(self): # Todo: refactor
def run(self, runner, historical_output):
current_hash = self._derive_output_name(runner)
last_hash = self.get_hash()
log_warning(f"{self.job_id} loaded hash hash {last_hash} -current: {current_hash}")
log_warning(
f"{self.job_id} loaded hash hash {last_hash} -current: {current_hash}"
)

if current_hash != last_hash:
value = self.callback()
Expand Down Expand Up @@ -3199,7 +3201,7 @@ def ExternalJob(
resources: Resources = Resources.SingleCore,
cwd: Optional[Union(Path, str)] = None,
start_new_session: bool = False,
std_prefix = ""
std_prefix="",
):
"""A job that calls an external program,
logging the command, stdout & stderr to files
Expand Down
10 changes: 7 additions & 3 deletions python/pypipegraph2/ppg1_compatibility.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -308,17 +308,21 @@ def _ignore_code_changes(job):
if hasattr(job, "func_invariant"):
log_job_trace(f"ignoring changes for {job.job_id}")
util.global_pipegraph.job_dag.remove_edge(job.func_invariant.job_id, job.job_id)
for k in job.func_invariant.outputs:
print(f"removing from {job.job_id} input: {k}, id: {util.global_pipegraph.job_inputs[job.job_id]}")
util.global_pipegraph.job_inputs[job.job_id].remove(k)

if hasattr(job.func_invariant, "usage_counter"):
job.func_invariant.usage_counter -= 1
if (
not hasattr(job.func_invariant, "usage_counter")
or job.func_invariant.usage_counter == 0
or job.func_invariant.usage_counter <= 0
):
util.global_pipegraph.job_dag.remove_node(job.func_invariant.job_id)
for k in job.func_invariant.outputs:
util.global_pipegraph.job_inputs[job.job_id].remove(k)
del util.global_pipegraph.jobs[job.func_invariant.job_id]
for k in job.func_invariant.outputs:
del util.global_pipegraph.outputs_to_job_ids[k]


del job.func_invariant
if hasattr(job, "lfg"):
Expand Down
2 changes: 2 additions & 0 deletions python/pypipegraph2/runner.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ def __init__(
with _with_changed_global_pipegraph(JobCollector(job_graph.run_mode)):
self.job_graph = job_graph
self.jobs = job_graph.jobs.copy()
print("Now cloning job_inputs")
self.job_inputs = copy.deepcopy(
job_graph.job_inputs
) # job_graph.job_inputs.copy()
print(self.job_inputs)
self.outputs_to_job_ids = job_graph.outputs_to_job_ids.copy()
self.next_job_number = self.job_graph.next_job_number
self.core_lock = CoreLock(job_graph.cores)
Expand Down
2 changes: 2 additions & 0 deletions tests/ppg1_compatibility_layer/test_cache_jobs.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def calc2():
return ", ".join(str(x) for x in range(0, 200))

job = ppg.CachedAttributeLoadingJob("out/mycalc", o, "a", calc2)
print("before ignore code changes", ppg.util.global_pipegraph.job_inputs)
job.ignore_code_changes()
print("after ignore code changes", ppg.util.global_pipegraph.job_inputs)
ppg.FileGeneratingJob(of, do_write).depends_on(job)
ppg.run_pipegraph()
assert read(of) == ", ".join(str(x) for x in range(0, 200)) # ppg2 change
Expand Down

0 comments on commit 6b28384

Please sign in to comment.