Skip to content

Commit

Permalink
Merge branch 'master' of github.com:owid/etl into add-daily-calories-…
Browse files Browse the repository at this point in the history
…data
  • Loading branch information
pabloarosado committed May 27, 2024
2 parents 339a45f + 0e18c57 commit e750a5a
Show file tree
Hide file tree
Showing 70 changed files with 1,461 additions and 1,048 deletions.
2 changes: 1 addition & 1 deletion api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from sqlalchemy.exc import NoResultFound
from sqlmodel import Session
from sqlalchemy.orm import Session

from apps.backport.datasync.datasync import upload_gzip_dict
from etl import config, paths
Expand Down
5 changes: 3 additions & 2 deletions apps/backport/backport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from git.repo import Repo
from owid.catalog import Source
from sqlalchemy.engine import Engine
from sqlmodel import Session
from sqlalchemy.orm import Session

from apps.backport.datasync.data_metadata import (
_variable_metadata,
Expand Down Expand Up @@ -266,7 +266,7 @@ def _upload_data_metadata(lg: Any, backport_short_name: str, dry_run: bool) -> N

# artificial variable with id just to get s3 paths
db_var = gm.Variable(
id=db_variable_row["id"],
description="",
datasetId=1,
unit="",
coverage="",
Expand All @@ -275,6 +275,7 @@ def _upload_data_metadata(lg: Any, backport_short_name: str, dry_run: bool) -> N
display={},
dimensions=None,
)
db_var.id = db_variable_row["id"]

upload_variable_data = variable_data(var_data)
if not dry_run:
Expand Down
27 changes: 12 additions & 15 deletions apps/backport/datasync/data_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@

import numpy as np
import pandas as pd
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlmodel import Session
from sqlalchemy.orm import Session
from tenacity import Retrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from etl import config
from etl.db import read_sql


def _fetch_data_df_from_s3(variable_id: int):
Expand Down Expand Up @@ -67,14 +69,9 @@ def _fetch_entities(session: Session, entity_ids: List[int]) -> pd.DataFrame:
name AS entityName,
code AS entityCode
FROM entities
WHERE id in :entity_ids
WHERE id in %(entity_ids)s
"""

# Execute the SQL using session
result_proxy = session.execute(q, {"entity_ids": entity_ids}) # type: ignore

# Convert the result into a DataFrame
return pd.DataFrame(result_proxy.fetchall(), columns=result_proxy.keys())
return read_sql(q, session, params={"entity_ids": entity_ids})


def add_entity_code_and_name(session: Session, df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -124,11 +121,11 @@ def _load_variable(session: Session, variable_id: int) -> Dict[str, Any]:
"""

# Using the session to execute raw SQL and fetching one row as a result
result = session.execute(sql, {"variable_id": variable_id}).fetchone() # type: ignore
result = session.execute(text(sql), {"variable_id": variable_id}).fetchone()

# Ensure result exists and convert to dictionary
assert result, f"variableId `{variable_id}` not found"
return dict(result)
return dict(result._mapping)


def _load_topic_tags(session: Session, variable_id: int) -> List[str]:
Expand All @@ -142,7 +139,7 @@ def _load_topic_tags(session: Session, variable_id: int) -> List[str]:
"""

# Using the session to execute raw SQL
result = session.execute(sql, {"variable_id": variable_id}).fetchall() # type: ignore
result = session.execute(text(sql), {"variable_id": variable_id}).fetchall()

# Extract tag names from the result and return as a list
return [row[0] for row in result]
Expand All @@ -159,10 +156,10 @@ def _load_faqs(session: Session, variable_id: int) -> List[Dict[str, Any]]:
"""

# Using the session to execute raw SQL
result = session.execute(sql, {"variable_id": variable_id}).fetchall() # type: ignore
result = session.execute(text(sql), {"variable_id": variable_id}).fetchall()

# Convert the result rows to a list of dictionaries
return [dict(row) for row in result]
return [dict(row._mapping) for row in result]


def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame:
Expand All @@ -176,7 +173,7 @@ def _load_origins_df(session: Session, variable_id: int) -> pd.DataFrame:
"""

# Use the session to execute the raw SQL
result_proxy = session.execute(sql, {"variable_id": variable_id}) # type: ignore
result_proxy = session.execute(text(sql), {"variable_id": variable_id})

# Fetch the results into a DataFrame
df = pd.DataFrame(result_proxy.fetchall(), columns=result_proxy.keys())
Expand Down Expand Up @@ -291,7 +288,7 @@ def _variable_metadata(
# convert timestamp to string
time_format = "%Y-%m-%dT%H:%M:%S.000Z"
for col in ("createdAt", "updatedAt"):
variableMetadata[col] = variableMetadata[col].strftime(time_format) # type: ignore
variableMetadata[col] = variableMetadata[col].strftime(time_format)

# add origins
variableMetadata["origins"] = _move_population_origin_to_end(
Expand Down
2 changes: 1 addition & 1 deletion apps/metadata_migrate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from rich.syntax import Syntax
from rich_click.rich_command import RichCommand
from sqlalchemy.engine import Engine
from sqlmodel import Session
from sqlalchemy.orm import Session

from etl import config
from etl import grapher_model as gm
Expand Down
2 changes: 1 addition & 1 deletion apps/owidbot/chart_diff.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pandas as pd
from sqlmodel import Session
from sqlalchemy.orm import Session
from structlog import get_logger

from apps.staging_sync.cli import _modified_chart_ids_by_admin
Expand Down
17 changes: 9 additions & 8 deletions apps/owidbot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,8 @@ def cli(
repo = gh_utils.get_repo(repo_name)
pr = gh_utils.get_pr(repo, branch)

comment = gh_utils.get_comment_from_pr(pr)

# prefill services from existing PR comment
if comment:
services_body = services_from_comment(comment)
else:
services_body = {}

# recalculate services
services_body = {}
for service in services:
if service == "data-diff":
services_body["data-diff"] = data_diff.run(include)
Expand All @@ -84,6 +77,14 @@ def cli(
else:
raise AssertionError("Invalid service")

# get existing comment (do this as late as possible to avoid race conditions)
comment = gh_utils.get_comment_from_pr(pr)

# fill in services from existing comment if not run via --services to avoid deleting them
if comment:
for service, content in services_from_comment(comment).items():
services_body.setdefault(service, content)

body = create_comment_body(branch, services_body, start_time)

if dry_run:
Expand Down
9 changes: 6 additions & 3 deletions apps/staging_sync/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from typing import Any, Dict, List, Optional

import requests
from sqlmodel import Session
from sqlalchemy import text
from sqlalchemy.orm import Session

from etl import grapher_model as gm
from etl.config import GRAPHER_USER_ID, TAILSCALE_ADMIN_HOST
Expand Down Expand Up @@ -92,12 +93,14 @@ def _create_user_session(session: Session, user_email: str, expiration_seconds=3
# Base64 encode
session_data = base64.b64encode(("prefix:" + json_str).encode("utf-8")).decode("utf-8")

query = """
query = text(
"""
INSERT INTO sessions (session_key, session_data, expire_date)
VALUES (:session_key, :session_data, :expire_date);
"""
)
session.execute(
query, # type: ignore
query,
params={
"session_key": session_key,
"session_data": session_data,
Expand Down
12 changes: 8 additions & 4 deletions apps/staging_sync/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from rich_click.rich_command import RichCommand
from slack_sdk import WebClient
from sqlalchemy.exc import IntegrityError
from sqlmodel import Session
from sqlalchemy.orm import Session

from apps.staging_sync.admin_api import AdminAPI
from apps.wizard.pages.chart_diff.chart_diff import ChartDiffModified
Expand Down Expand Up @@ -260,10 +260,14 @@ def cli(
if not dry_run:
target_api.update_chart(chart_id, diff.source_chart.config)
else:
assert config.GRAPHER_USER_ID
GRAPHER_USER_ID = int(config.GRAPHER_USER_ID)

# there's already a chart with the same slug, create a new revision
chart_revision = gm.SuggestedChartRevisions(
chartId=chart_id,
createdBy=int(config.GRAPHER_USER_ID), # type: ignore
createdBy=GRAPHER_USER_ID,
updatedBy=GRAPHER_USER_ID,
originalConfig=diff.target_chart.config,
suggestedConfig=diff.source_chart.config,
status="pending",
Expand All @@ -277,8 +281,8 @@ def cli(
.filter_by(
chartId=chart_id,
status="pending",
createdBy=int(config.GRAPHER_USER_ID), # type: ignore
) # type: ignore
createdBy=GRAPHER_USER_ID,
)
.filter(gm.SuggestedChartRevisions.createdAt > staging_created_at)
.delete()
)
Expand Down
3 changes: 2 additions & 1 deletion apps/step_update/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ def _update_snapshot_step(
metadata = SnapshotMeta.load_from_yaml(step_dvc_file)
# Update version and date accessed.
metadata.version = step_version_new # type: ignore
metadata.origin.date_accessed = step_version_new # type: ignore
if metadata.origin:
metadata.origin.date_accessed = step_version_new # type: ignore
# Write metadata to new file.
step_dvc_file_new.write_text(metadata.to_yaml())

Expand Down
74 changes: 53 additions & 21 deletions apps/wizard/pages/chart_diff/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import streamlit as st
from sqlalchemy.engine.base import Engine
from sqlmodel import Session
from sqlalchemy.orm import Session
from st_pages import add_indentation
from structlog import get_logger

from apps.staging_sync.cli import _modified_chart_ids_by_admin
from apps.wizard.pages.chart_diff.chart_diff import ChartDiffModified
from apps.wizard.pages.chart_diff.config_diff import st_show_diff
from apps.wizard.utils import chart_html, set_states
from apps.wizard.utils import Pagination, chart_html, set_states
from apps.wizard.utils.env import OWID_ENV, OWIDEnv
from etl import config

Expand Down Expand Up @@ -53,13 +53,15 @@
st.warning(warning_msg)
TARGET = OWIDEnv.from_staging("master")

CHART_PER_PAGE = 10


########################################
# FUNCTIONS
########################################


def get_chart_diffs(source_engine, target_engine):
def get_chart_diffs(source_engine, target_engine) -> dict[int, ChartDiffModified]:
with Session(source_engine) as source_session:
with Session(target_engine) as target_session:
# Get IDs from modified charts
Expand All @@ -72,6 +74,19 @@ def get_chart_diffs(source_engine, target_engine):
)
for chart_id in chart_ids
}

# TODO: parallelize it, doesn't work with current version of SQLALchemy
# from concurrent.futures import ThreadPoolExecutor, as_completed
# with ThreadPoolExecutor(max_workers=5) as executor:
# chart_diffs_futures = {
# chart_id: executor.submit(ChartDiffModified.from_chart_id, chart_id, source_session, target_session)
# for chart_id in chart_ids
# }
# chart_diffs = {}
# for chart_id, future in chart_diffs_futures.items():

# chart_diffs[chart_id] = future.result()

return chart_diffs


Expand Down Expand Up @@ -118,13 +133,16 @@ def refresh_on_click(source_session=source_session, target_session=None):

# Actually show stuff
with st.expander(label, not diff.approved):
col1, col2 = st.columns([1, 3])

# Refresh
st.button(
"🔄 Refresh",
key=f"refresh-btn-{diff.chart_id}",
on_click=lambda s=source_session, t=target_session: refresh_on_click(s, t),
help="Get the latest version of the chart from the staging server.",
)
with col2:
st.button(
"🔄 Refresh",
key=f"refresh-btn-{diff.chart_id}",
on_click=lambda s=source_session, t=target_session: refresh_on_click(s, t),
help="Get the latest version of the chart from the staging server.",
)

options = ["Pending", "Approve"]
options = {
Expand All @@ -133,16 +151,17 @@ def refresh_on_click(source_session=source_session, target_session=None):
# "Reject": "red",
}
option_names = list(options.keys())
st.radio(
label="Approve or reject chart",
key=f"radio-{diff.chart_id}",
options=option_names,
horizontal=True,
format_func=lambda x: f":{options.get(x)}-background[{x}]",
index=option_names.index("Approve") if diff.approved else option_names.index("Pending"),
on_change=lambda diff=diff, session=source_session: tgl_on_change(diff, session),
# label_visibility="collapsed",
)
with col1:
st.radio(
label="Approve or reject chart",
key=f"radio-{diff.chart_id}",
options=option_names,
horizontal=True,
format_func=lambda x: f":{options.get(x)}-background[{x}]",
index=option_names.index("Approve") if diff.approved else option_names.index("Pending"),
on_change=lambda diff=diff, session=source_session: tgl_on_change(diff, session),
# label_visibility="collapsed",
)

if diff.is_modified:
tab1, tab2 = st.tabs(["Charts", "Config diff"])
Expand Down Expand Up @@ -304,16 +323,29 @@ def main():
if chart_diffs_modified:
st.header("Modified charts")
st.markdown(f"{len(chart_diffs_modified)} charts modified in [{OWID_ENV.name}]({OWID_ENV.site})")
for chart_diff in chart_diffs_modified:

modified_charts_pagination = Pagination(
chart_diffs_modified, items_per_page=CHART_PER_PAGE, pagination_key="pagination_modified"
)
modified_charts_pagination.show_controls()

for chart_diff in modified_charts_pagination.get_page_items():
st_show(chart_diff, source_session, target_session)
else:
st.warning(
"No chart modifications found in the staging environment. Try unchecking the 'Hide approved charts' toggle in case there are hidden ones."
)

if chart_diffs_new:
st.header("New charts")
st.markdown(f"{len(chart_diffs_new)} new charts in [{OWID_ENV.name}]({OWID_ENV.site})")
for chart_diff in chart_diffs_new:

new_charts_pagination = Pagination(
chart_diffs_new, items_per_page=CHART_PER_PAGE, pagination_key="pagination_new"
)
new_charts_pagination.show_controls()

for chart_diff in new_charts_pagination.get_page_items():
st_show(chart_diff, source_session, target_session)
else:
st.warning(
Expand Down
Loading

0 comments on commit e750a5a

Please sign in to comment.