Skip to content

Commit

Permalink
Add NSSP secondary source (#2074)
Browse files Browse the repository at this point in the history
* base changes

* lint

* lint

* add test

* fix hhs bug + doc to readme + fix signal grouping

* fix weird nan

* logging + error details

* test data sync

* typo in pull.py

* region to lower

* add log

* fix str bug

* add backup data mechanism

* adjust details.md

* appease linter

* add tests for secondary source backup

* Update signal names to _2023RVR in constants.py

* clarify _2023RVR signals in DETAILS.md
  • Loading branch information
minhkhul authored Nov 17, 2024
1 parent e7eccaa commit feea58d
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 28 deletions.
25 changes: 21 additions & 4 deletions nssp/DETAILS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@

We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels.

There are 2 sources we grab data from for nssp:
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary (2023RVR) source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
There are 8 signals output from the primary source and 4 output from secondary. There are no smoothed signals from secondary source.

Note that the data produced from secondary source are mostly the same as their primary source equivalent, with past analysis shows around 95% of datapoints having less than 0.1 value difference and the other 5% having a 0.1 to 1.2 value difference.

## Geographical Levels
* `state`: reported using two-letter postal code
* `county`: reported using fips code
* `national`: just `us` for now
Primary source:
* `state`: reported from source using two-letter postal code
* `county`: reported from source using fips code
* `national`: just `us` for now, reported from source
* `hhs`, `hrr`, `msa`: not reported from source, so we computed them from county-level data using a weighted mean. Each county is assigned a weight equal to its population in the last census (2020).

Secondary (2023RVR) source:
* `state`: reported from source
* `hhs`: reported from source
* `national`: reported from source

## Metrics
* `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `percent_visits_covid_2023RVR`, `percent_visits_rsv_2023RVR`, `percent_visits_influenza_2023RVR`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined_2023RVR`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid.
5 changes: 5 additions & 0 deletions nssp/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# NSSP Emergency Department Visit data

We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data.

There are 2 sources we grab data from for nssp:
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

For details see the `DETAILS.md` file in this directory.

## Create a MyAppToken
Expand Down
26 changes: 26 additions & 0 deletions nssp/delphi_nssp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@
"fips": str,
}
)

SECONDARY_COLS_MAP = {
"week_end": "timestamp",
"geography": "geo_value",
"percent_visits": "val",
"pathogen": "signal",
}

SECONDARY_SIGNALS_MAP = {
"COVID-19": "pct_ed_visits_covid_2023RVR",
"Influenza": "pct_ed_visits_influenza_2023RVR",
"RSV": "pct_ed_visits_rsv_2023RVR",
"Combined": "pct_ed_visits_combined_2023RVR",
}

SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
SECONDARY_GEOS = ["state", "nation", "hhs"]

SECONDARY_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_value": str,
"val": float,
"geo_type": str,
"signal": str,
}
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]
105 changes: 86 additions & 19 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
from .constants import (
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)


def warn_string(df, type_dict):
Expand All @@ -29,42 +38,50 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
The output dataset has:
- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API.
Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object
dataset_id: str
The dataset id to pull data from
Returns
-------
pd.DataFrame
Dataframe as described above.
list of dictionaries, each representing a row in the dataset
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
return results


def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits primary dataset.
https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)
Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)
Expand All @@ -79,3 +96,53 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger

keep_columns = ["timestamp", "geography", "county", "fips"]
return df_ervisits[SIGNALS + keep_columns]


def secondary_pull_nssp_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
):
"""Pull the latest NSSP ER visits secondary dataset.
https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
The output dataset has:
- Each row corresponds to a single observation
Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)
Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger)
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)

# geo_type is not provided in the dataset, so we infer it from the geo_value
# which is either state names, "National" or hhs region numbers
df_ervisits["geo_type"] = "state"

df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"

hhs_region_mask = df_ervisits["geo_value"].str.lower().str.startswith("region ")
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
"Region ", ""
)
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"

df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)

df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]

try:
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
except KeyError as exc:
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc

return df_ervisits
53 changes: 51 additions & 2 deletions nssp/delphi_nssp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from delphi_utils.geomap import GeoMapper
from delphi_utils.nancodes import add_default_nancodes

from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
from .pull import pull_nssp_data
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
from .pull import pull_nssp_data, secondary_pull_nssp_data


def add_needed_columns(df, col_names=None):
Expand Down Expand Up @@ -83,6 +83,8 @@ def run_module(params):
socrata_token = params["indicator"]["socrata_token"]

run_stats = []

logger.info("Generating primary signals")
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
Expand Down Expand Up @@ -139,5 +141,52 @@ def run_module(params):
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

logger.info("Generating secondary signals")
secondary_df_pull = secondary_pull_nssp_data(socrata_token, backup_dir, custom_run, logger)
for signal in SECONDARY_SIGNALS:
secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal]
if secondary_df_pull_signal.empty:
logger.warning("No data found for signal", signal=signal)
continue
for geo in SECONDARY_GEOS:
df = secondary_df_pull_signal.copy()
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
if geo == "state":
df = df[(df["geo_type"] == "state")]
df["geo_id"] = df["geo_value"].apply(
lambda x: (
us.states.lookup(x).abbr.lower()
if us.states.lookup(x)
else ("dc" if x == "District of Columbia" else x)
)
)
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
if unexpected_state_names.shape[0] > 0:
logger.error(
"Unexpected state names",
unexpected_state_names=unexpected_state_names["geo_value"].unique(),
)
raise RuntimeError
elif geo == "nation":
df = df[(df["geo_type"] == "nation")]
df["geo_id"] = "us"
elif geo == "hhs":
df = df[(df["geo_type"] == "hhs")]
df["geo_id"] = df["geo_value"]
# add se, sample_size, and na codes
missing_cols = set(CSV_COLS) - set(df.columns)
df = add_needed_columns(df, col_names=list(missing_cols))
df_csv = df[CSV_COLS + ["timestamp"]]
# actual export
dates = create_export_csv(
df_csv,
geo_res=geo,
export_dir=export_dir,
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

## log this indicator run
logging(start_time, run_stats, logger)
73 changes: 73 additions & 0 deletions nssp/tests/test_data/secondary_page.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
[
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "COVID-19", "geography": "National",
"percent_visits": "1.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "2.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "COVID-19",
"geography": "National",
"percent_visits": "1.6",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.9",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.7",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "3.2",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Decreasing"
}
]
Loading

0 comments on commit feea58d

Please sign in to comment.