Skip to content

Commit

Permalink
on cache hit, log the original run dir
Browse files Browse the repository at this point in the history
  • Loading branch information
mlin committed Sep 17, 2023
1 parent 6052fa8 commit bd8e7ec
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 8 deletions.
29 changes: 25 additions & 4 deletions WDL/runtime/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,30 @@ def get(
return None

cache = None
run_dir = None
try:
with open(file_path, "rb") as file_reader:
cache = values_from_json(json.loads(file_reader.read()), output_types) # pyre-fixme
envelope = json.loads(file_reader.read())
if "miniwdlCallCacheVersion" in envelope:
outputs = envelope["outputs"]
run_dir = envelope.get("dir", None)
else:
outputs = envelope
cache = values_from_json(outputs, output_types) # pyre-fixme
except FileNotFoundError:
self._logger.info(_("call cache miss", cache_file=file_path))
except Exception as exn:
self._logger.warning(
_("call cache entry present, but unreadable", cache_file=file_path, error=str(exn))
)
if cache:
self._logger.notice(_("call cache hit", cache_file=file_path)) # pyre-fixme
self._logger.notice( # pyre-fixme
_(
"call cache hit",
run_dir=(run_dir if run_dir else "?"),
cache_file=file_path,
)
)
# check that no files/directories referenced by the inputs & cached outputs are newer
# than the cache file itself
if _check_files_coherence(
Expand All @@ -119,7 +132,9 @@ def get(
)
return None

def put(self, key: str, outputs: Env.Bindings[Value.Base]) -> None:
def put(
self, key: str, outputs: Env.Bindings[Value.Base], run_dir: Optional[str] = None
) -> None:
"""
Store call outputs for future reuse
"""
Expand All @@ -128,7 +143,13 @@ def put(self, key: str, outputs: Env.Bindings[Value.Base]) -> None:
if self._cfg["call_cache"].get_bool("put"):
filename = os.path.join(self._call_cache_dir, key + ".json")
Path(filename).parent.mkdir(parents=True, exist_ok=True)
write_atomic(json.dumps(values_to_json(outputs), indent=2), filename) # pyre-ignore
envelope = {
"miniwdlCallCacheVersion": 1,
"outputs": values_to_json(outputs), # pyre-ignore
}
if run_dir:
envelope["dir"] = run_dir
write_atomic(json.dumps(envelope, indent=2), filename)
self._logger.info(_("call cache insert", cache_file=filename))

# specialized caching logic for file downloads (not sensitive to the downloader task details,
Expand Down
2 changes: 1 addition & 1 deletion WDL/runtime/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def run_local_task(
)
logger.notice("done") # pyre-fixme
if not run_id.startswith("download-"):
cache.put(cache_key, outputs)
cache.put(cache_key, outputs, run_dir=run_dir)
return (run_dir, outputs)
except Exception as exn:
tbtxt = traceback.format_exc()
Expand Down
8 changes: 6 additions & 2 deletions WDL/runtime/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,11 @@ def _virtualize_filename(self, filename: str) -> str:
filename = cache_out.resolve("file").value
else:
# otherwise, put our newly-written file to the cache, and proceed to use it
self.cache.put(cache_key, Env.Bindings().bind("file", Value.File(filename)))
self.cache.put(
cache_key,
Env.Bindings().bind("file", Value.File(filename)),
run_dir=self.state.run_dir,
)

# whichever path we took: allow-list the filename
self.state.fspath_allowlist.add(filename)
Expand Down Expand Up @@ -936,7 +940,7 @@ def run_local_workflow(
os.kill(os.getpid(), signal.SIGUSR1)
raise

cache.put(cache_key, outputs)
cache.put(cache_key, outputs, run_dir=run_dir)

return (run_dir, outputs)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_8cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def test_task_input_cache_matches_output(self):
task_digest = self.doc.tasks[0].digest
with open(os.path.join(self.cache_dir, f"{self.doc.tasks[0].name}/{task_digest}/{input_digest}.json")) as f:
read_data = json.loads(f.read())
self.assertEqual(read_data, WDL.values_to_json(outputs))
self.assertEqual(read_data["outputs"], WDL.values_to_json(outputs))
self.assertTrue(os.path.isdir(read_data["dir"]))

def test_cache_prevents_task_rerun(self):
# run task twice, check _try_task not called for second run
Expand Down

0 comments on commit bd8e7ec

Please sign in to comment.