Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Marigold committed Jan 10, 2025
1 parent bd542a0 commit 21a594a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
57 changes: 54 additions & 3 deletions etl/scripts/archive/run_all_snapshots/run_all_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""

import base64
import hashlib
import json
import subprocess
Expand All @@ -19,6 +20,7 @@

import click
import requests
import yaml
from owid.datautils.io import save_json
from structlog import get_logger
from tqdm.auto import tqdm
Expand Down Expand Up @@ -48,6 +50,21 @@ def get_active_snapshots() -> Set[str]:
return {s.split(".")[0] + ".py" for s in active_snapshots}


def fetch_file(file_path, branch):
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}

url = f"{GITHUB_API_BASE}/contents/{file_path}?ref={branch}"
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()

content_base64 = data["content"] # Base64-encoded
return base64.b64decode(content_base64).decode("utf-8")


def create_autoupdate_pr(snapshot: str):
assert snapshot.endswith(".py"), f"Expected snapshot to end with .py, got {snapshot}"

Expand Down Expand Up @@ -91,6 +108,11 @@ def create_autoupdate_pr(snapshot: str):
with filepath.open("r", encoding="utf-8") as fp:
content = fp.read()

# Skip if the remote content is the same
remote_content = fetch_file(filepath.relative_to(BASE_DIR), branch_name)
if remote_content == content:
continue

# Build the tree structure
repo_path = str(filepath.relative_to(BASE_DIR))
tree_items.append(
Expand All @@ -102,6 +124,11 @@ def create_autoupdate_pr(snapshot: str):
}
)

# Don't update if there are no changes
if not tree_items:
log.info(f"No changes in {snapshot}")
return

# Create a tree for all files
tree_data = {"base_tree": master_sha, "tree": tree_items}
create_tree_resp = requests.post(f"{GITHUB_API_BASE}/git/trees", json=tree_data, headers=headers)
Expand Down Expand Up @@ -210,15 +237,39 @@ def main(dry_run: bool, create_pr: bool, filter: str, timeout: int):
try:
log.info(f"Executing {snapshot}.")

# Find .dvc file belonging to the snapshot script
files = list(snapshot_script.parent.glob(f"{snapshot_script.stem}.*.dvc"))
assert len(files) == 1, f"Expected to find exactly one .dvc file, got {files}"
dvc_file = files[0]

subprocess.run(["git", "restore", "--", str(dvc_file)])

# Load md5 and size from the .dvc file from YAML
with open(dvc_file, "r") as f:
original_outs = yaml.safe_load(f)["outs"][0]

# Try to execute snapshot.
result = subprocess.run(
subprocess.run(
["python", snapshot_script, "--upload"], check=True, capture_output=True, timeout=timeout, text=True
)

# Load md5 and size from the (possibly) updated file
with open(dvc_file, "r") as f:
new_outs = yaml.safe_load(f)["outs"][0]

execution_results[snapshot]["status"] = "SUCCESS"

# Data is not new, MD5 is identical.
execution_results[snapshot]["identical"] = "File already exists with the same md5" in result.stdout
# Data is not new, MD5 or size is identical.
# NOTE: Some snapshots may have the same data but different md5s (e.g. scraped htmls).
if original_outs["md5"] == new_outs["md5"]:
execution_results[snapshot]["identical"] = True

elif original_outs["size"] == new_outs["size"]:
execution_results[snapshot]["identical"] = True
subprocess.run(["git", "restore", "--", str(dvc_file)])

else:
execution_results[snapshot]["identical"] = False

# Add duration time for successfully executed snapshot.
duration = time.time() - start_time
Expand Down
2 changes: 1 addition & 1 deletion etl/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def dvc_add(self, upload: bool) -> None:
meta = ruamel_load(f)

# If the file already exists with the same md5, skip the upload
if meta["outs"] and meta["outs"][0]["md5"] == md5:
if "outs" in meta and meta["outs"][0]["md5"] == md5:
log.info("File already exists with the same md5, skipping upload", snapshot=self.uri)
return

Expand Down

0 comments on commit 21a594a

Please sign in to comment.