Skip to content

Commit

Permalink
Merge branch 'master' into covid-mdims-b5
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasrodes committed Dec 11, 2024
2 parents cca85b2 + b8d4a0c commit afd3eb3
Show file tree
Hide file tree
Showing 1,731 changed files with 56,091 additions and 79,887 deletions.
6 changes: 0 additions & 6 deletions .flake8

This file was deleted.

6 changes: 0 additions & 6 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,11 @@
"**/docs/architecture/*.md"
],
"files.exclude": {
"etl/steps/archive": true,
"snapshots/archive": true,
"**/dataset_*_config.json": true,
"**/dataset_*_values.json": true,
"**/dataset_*.json.dvc": true,
"**/dataset_*.feather.dvc": true
},
"search.exclude": {
"etl/steps/archive": true,
"snapshots/archive": true
},
"yaml.format.printWidth": 999,
"ruff.path": [
".venv/bin/ruff"
Expand Down
69 changes: 62 additions & 7 deletions apps/anomalist/anomalist_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import random
import tempfile
import time
from pathlib import Path
Expand Down Expand Up @@ -90,7 +91,13 @@ def get_variables_views_in_charts(
if len(df) == 0:
df = pd.DataFrame(columns=["variable_id", "chart_id", "chart_slug", "views_7d", "views_14d", "views_365d"])

return df
return df.astype(
{
"views_7d": "Int64",
"views_14d": "Int64",
"views_365d": "Int64",
}
).fillna(0)


def renormalize_score(
Expand All @@ -114,14 +121,14 @@ def renormalize_score(

# Function to format population numbers.
def pretty_print_number(number):
if number >= 1e9:
if pd.isna(number):
return "?"
elif int(number) >= 1e9:
return f"{number/1e9:.1f}B"
elif number >= 1e6:
return f"{number/1e6:.1f}M"
elif number >= 1e3:
return f"{number/1e3:.1f}k"
elif pd.isna(number):
return "?"
else:
return f"{int(number)}"

Expand Down Expand Up @@ -394,6 +401,9 @@ def add_analytics_score(df_reduced: pd.DataFrame) -> pd.DataFrame:
# Fill them with a low value (e.g. 0.1) to avoid zeros when calculating the final score.
df_reduced["score_analytics"] = df_reduced["score_analytics"].fillna(fillna_value)

# Fill missing views
df_reduced["views"] = df_reduced["views"].fillna(0)

return df_reduced


Expand Down Expand Up @@ -436,6 +446,7 @@ def anomaly_detection(
dry_run: bool = False,
force: bool = False,
reset_db: bool = False,
sample_n: Optional[int] = None,
) -> None:
"""Detect anomalies."""
engine = get_engine()
Expand Down Expand Up @@ -475,21 +486,34 @@ def anomaly_detection(
dataset_variable_ids[variable.datasetId].append(variable)

for dataset_id, variables_in_dataset in dataset_variable_ids.items():
# Limit the number of variables.
if sample_n and len(variables_in_dataset) > sample_n:
variables_in_dataset = _sample_variables(variables_in_dataset, sample_n)

# Get dataset's checksum
with Session(engine) as session:
dataset = gm.Dataset.load_dataset(session, dataset_id)

log.info("loading_data.start")
log.info("loading_data.start", variables=len(variables_in_dataset))
variables_old = [
variables[variable_id_old]
for variable_id_old in variable_mapping.keys()
if variable_mapping[variable_id_old] in [variable.id for variable in variables_in_dataset]
]
variables_old_and_new = variables_in_dataset + variables_old
t = time.time()
df = load_data_for_variables(engine=engine, variables=variables_old_and_new)
try:
df = load_data_for_variables(engine=engine, variables=variables_old_and_new)
except FileNotFoundError as e:
# This happens when a dataset is in DB, but not in a local catalog.
log.error("loading_data.error", error=str(e))
continue

log.info("loading_data.end", t=time.time() - t)

if df.empty:
continue

for anomaly_type in anomaly_types:
# Instantiate the anomaly detector.
if anomaly_type not in ANOMALY_DETECTORS:
Expand Down Expand Up @@ -628,6 +652,10 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.
df = pd.DataFrame(variable_data_table_from_catalog(engine, variables=variables))
df = df.rename(columns={"country": "entity_name"})

if "year" not in df.columns and "date" in df.columns:
log.warning("Anomalist does not work for datasets with `date` column yet.")
return pd.DataFrame()

# Define the list of columns that are not index columns.
data_columns = [v.id for v in variables]

Expand All @@ -642,7 +670,7 @@ def load_data_for_variables(engine: Engine, variables: list[gm.Variable]) -> pd.

# Sort data (which may be needed for some detectors).
# NOTE: Here, we first convert the entity_name to string, because otherwise the sorting will be based on categorical order (which can be arbitrary).
df = df.astype({"entity_name": str}).sort_values(INDEX_COLUMNS).reset_index(drop=True)
df = df.astype({"entity_name": "string[pyarrow]"}).sort_values(INDEX_COLUMNS).reset_index(drop=True)

return df

Expand Down Expand Up @@ -681,3 +709,30 @@ def combine_and_reduce_scores_df(anomalies: List[gm.Anomaly]) -> pd.DataFrame:
# df = df.astype({"year": int})

return df_reduced


def _sample_variables(variables: List[gm.Variable], n: int) -> List[gm.Variable]:
"""Sample n variables. Prioritize variables that are used in charts, then fill the rest
with random variables."""
if len(variables) <= n:
return variables

# Include all variables that are used in charts.
# NOTE: if we run this before indicator upgrader, none of the charts will be in charts yet. So the
# first round of anomalies with random sampling won't be very useful. Next runs should be useful
# though
df_views = get_variables_views_in_charts(variable_ids=[v.id for v in variables])
sample_ids = set(df_views.sort_values("views_365d", ascending=False).head(n)["variable_id"])

# Fill the rest with random variables.
unused_ids = list(set(v.id for v in variables) - sample_ids)
random.seed(1)
if len(sample_ids) < n:
sample_ids |= set(np.random.choice(unused_ids, n - len(sample_ids), replace=False))

log.info(
"sampling_variables",
original_n=len(variables),
new_n=len(sample_ids),
)
return [v for v in variables if v.id in sample_ids]
8 changes: 8 additions & 0 deletions apps/anomalist/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
type=bool,
help="Drop anomalies table and recreate it. This is useful for development when the schema changes.",
)
@click.option(
"--sample-n",
type=int,
default=500,
help="Sample at most N variables from a dataset",
)
def cli(
anomaly_types: Optional[Tuple[str, ...]],
dataset_ids: Optional[list[int]],
Expand All @@ -70,6 +76,7 @@ def cli(
dry_run: bool,
force: bool,
reset_db: bool,
sample_n: Optional[int],
) -> None:
"""TBD
Expand Down Expand Up @@ -140,6 +147,7 @@ def cli(
dry_run=dry_run,
force=force,
reset_db=reset_db,
sample_n=sample_n,
)


Expand Down
12 changes: 2 additions & 10 deletions apps/anomalist/detectors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Dict, List

import numpy as np
import pandas as pd
import structlog
from sklearn.ensemble import IsolationForest
Expand Down Expand Up @@ -71,17 +70,10 @@ def get_scale_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi

def get_zeros_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of zeros.
df_zeros = pd.DataFrame(np.zeros_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_zeros[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
df_zeros = df[INDEX_COLUMNS + variable_ids].copy()
df_zeros[variable_ids] = 0
return df_zeros

def get_nans_df(self, df: pd.DataFrame, variable_ids: List[int]) -> pd.DataFrame:
# Create a dataframe of nans.
df_nans = pd.DataFrame(np.empty_like(df), columns=df.columns)[INDEX_COLUMNS + variable_ids]
df_nans[variable_ids] = np.nan
df_nans[INDEX_COLUMNS] = df[INDEX_COLUMNS].copy()
return df_nans


class AnomalyUpgradeMissing(AnomalyDetector):
"""New data misses entity-years that used to exist in old version."""
Expand Down
32 changes: 21 additions & 11 deletions apps/anomalist/gp_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
return pd.DataFrame()

# Create a processing queue with (entity_name, variable_id) pairs
# TODO: we could make probabilities proportional to "relevance" score in anomalist
items = _processing_queue(
items=list(df_wide.index.unique()),
)
Expand All @@ -118,8 +119,8 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
# Get the data for the current entity and variable
group = df_wide.loc[(entity_name, variable_id)]

# Skip if the series has only one or fewer data points
if isinstance(group, pd.Series) or len(group) <= 1:
# Skip if the series has only three or fewer data points
if isinstance(group, pd.Series) or len(group) <= 3:
continue

# Prepare the input features (X) and target values (y) for Gaussian Process
Expand Down Expand Up @@ -161,9 +162,13 @@ def get_score_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mappi
df_score_long["p_value"] = 2 * (1 - norm.cdf(np.abs(df_score_long["z"])))

# Adjust p-values for multiple testing
df_score_long["adj_p_value"] = df_score_long.groupby(["entity_name", "variable_id"]).p_value.transform(
lambda p: multipletests(p, method="fdr_bh")[1]
)
df_score_long["adj_p_value"] = df_score_long.groupby(
["entity_name", "variable_id"], observed=True
).p_value.transform(lambda p: multipletests(p, method="fdr_bh")[1])

# Anomalies with adj p-value < 0.1 are not interesting, drop them. This could be
# even stricter
df_score_long = df_score_long[df_score_long["adj_p_value"] < 0.1]

# Final score is 1 - p-value
df_score_long["anomaly_score"] = 1 - df_score_long["adj_p_value"]
Expand Down Expand Up @@ -285,20 +290,25 @@ def viz(self, df: pd.DataFrame, variable: gm.Variable, country: Optional[str] =
def get_scale_df(self, df: pd.DataFrame, variable_ids: List[int], variable_mapping: Dict[int, int]) -> pd.DataFrame:
# NOTE: Ideally, for this detector, the scale should be the difference between a value and the mean, divided by the range of values of the variable. But calculating that may be hard to implement in an efficient way.

log.info("gp_outlier.get_scale_df.start")
t = time.time()

# Create a dataframe of zeros.
df_scale = self.get_zeros_df(df, variable_ids)

for variable_id in variable_ids:
# The scale is given by the size of changes in consecutive points (for a given country), as a fraction of the maximum range of values of that variable.
df_scale[variable_id] = abs(df[variable_id].diff().fillna(0)) / (
df[variable_id].max() - df[variable_id].min()
)
# The scale is given by the size of changes in consecutive points (for a given country), as a fraction of the maximum range of values of that variable.
ranges = df[variable_ids].max() - df[variable_ids].min()
diff = df[variable_ids].diff().fillna(0).abs()

# The previous procedure includes the calculation of the deviation between the last point of an entity and the first point of the next, which is meaningless.
# Therefore, make zero the first point of each entity_name for all columns.
df_scale.loc[df_scale["entity_name"] != df_scale["entity_name"].shift(), variable_ids] = 0
diff.loc[df["entity_name"] != df["entity_name"].shift(), :] = 0

df_scale[variable_ids] = diff / ranges

# Since this anomaly detector return a long dataframe, we need to melt it.
df_scale = df_scale.melt(id_vars=["entity_name", "year"], var_name="variable_id", value_name="score_scale")

log.info("gp_outlier.get_scale_df.end", t=time.time() - t)

return df_scale
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def run(dest_dir: str) -> None:
# Load data from snapshot.
#
snap = paths.load_snapshot()
tb = snap.read().set_index(["country", "year"])
tb = snap.read(safe_types=False).set_index(["country", "year"])

#
# Save outputs.
Expand Down
Loading

0 comments on commit afd3eb3

Please sign in to comment.