Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NSSP secondary source #2074

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
25 changes: 21 additions & 4 deletions nssp/DETAILS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@

We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels.

There are 2 sources we grab data from for nssp:
Copy link
Contributor

@aysim319 aysim319 Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that there are some difference that's significant; I actually got 4% of the data having more that .1 difference and I would also mention this to logan/daniel/etc as an fyi

specifically for hhs region 7 seems to be the least similar... at least in this run

Copy link
Contributor Author

@minhkhul minhkhul Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's expected that hhs got wilder differences since the primary source signal values at hhs geo-level were 1. aggregated from state level data 2. using 2020 population weights, rather than something directly published by the source like the equivalent secondary signals for hhs geos. The public facing docs mentions this weighing already, but I'll update it and link stuff here with more info after the pipeline is in a good spot.

- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
There are 8 signals output from the primary source and 4 output from secondary. There are no smoothed signals from secondary source.

Note that the data produced from secondary source are mostly the same as their primary source equivalent, with past analysis shows around 95% of datapoints having less than 0.1 value difference and the other 5% having a 0.1 to 1.2 value difference.

## Geographical Levels
* `state`: reported using two-letter postal code
* `county`: reported using fips code
* `national`: just `us` for now
Primary source:
* `state`: reported from source using two-letter postal code
* `county`: reported from source using fips code
* `national`: just `us` for now, reported from source
* `hhs`, `hrr`, `msa`: not reported from source, so we computed them from county-level data using a weighted mean. Each county is assigned a weight equal to its population in the last census (2020).

Secondary source:
* `state`: reported from source
* `hhs`: reported from source
* `national`: reported from source

## Metrics
* `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `percent_visits_covid_secondary`, `percent_visits_rsv_secondary`, `percent_visits_influenza_secondary`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined_secondary`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid.
5 changes: 5 additions & 0 deletions nssp/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# NSSP Emergency Department Visit data

We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data.

There are 2 sources we grab data from for nssp:
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

For details see the `DETAILS.md` file in this directory.

## Create a MyAppToken
Expand Down
26 changes: 26 additions & 0 deletions nssp/delphi_nssp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@
"fips": str,
}
)

SECONDARY_COLS_MAP = {
"week_end": "timestamp",
"geography": "geo_value",
"percent_visits": "val",
"pathogen": "signal",
}

SECONDARY_SIGNALS_MAP = {
"COVID-19": "pct_ed_visits_covid_secondary",
"Influenza": "pct_ed_visits_influenza_secondary",
"RSV": "pct_ed_visits_rsv_secondary",
"Combined": "pct_ed_visits_combined_secondary",
}

SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
SECONDARY_GEOS = ["state", "nation", "hhs"]

SECONDARY_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_value": str,
"val": float,
"geo_type": str,
"signal": str,
}
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]
105 changes: 86 additions & 19 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,16 @@
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
from .constants import (
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)


def warn_string(df, type_dict):
Expand All @@ -29,42 +38,50 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API.

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object
dataset_id: str
The dataset id to pull data from

Returns
-------
pd.DataFrame
Dataframe as described above.
list of dictionaries, each representing a row in the dataset
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
return results


def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits primary dataset.

https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)
Expand All @@ -79,3 +96,53 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger

keep_columns = ["timestamp", "geography", "county", "fips"]
return df_ervisits[SIGNALS + keep_columns]


def secondary_pull_nssp_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
):
"""Pull the latest NSSP ER visits secondary dataset.

https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

The output dataset has:

- Each row corresponds to a single observation

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
create_backup_csv(df_ervisits, backup_dir, custom_run, sensor="secondary", logger=logger)
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved

# geo_type is not provided in the dataset, so we infer it from the geo_value
# which is either state names, "National" or hhs region numbers
df_ervisits["geo_type"] = "state"

df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"

hhs_region_mask = df_ervisits["geo_value"].str.lower().str.startswith("region ")
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
"Region ", ""
)
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"

df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)

df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]

try:
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
except KeyError as exc:
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc

return df_ervisits
53 changes: 51 additions & 2 deletions nssp/delphi_nssp/run.py
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from delphi_utils.geomap import GeoMapper
from delphi_utils.nancodes import add_default_nancodes

from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
from .pull import pull_nssp_data
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
from .pull import pull_nssp_data, secondary_pull_nssp_data


def add_needed_columns(df, col_names=None):
Expand Down Expand Up @@ -83,6 +83,8 @@ def run_module(params):
socrata_token = params["indicator"]["socrata_token"]

run_stats = []

logger.info("Generating primary signals")
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
Expand Down Expand Up @@ -139,5 +141,52 @@ def run_module(params):
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

logger.info("Generating secondary signals")
secondary_df_pull = secondary_pull_nssp_data(socrata_token, backup_dir, custom_run, logger)
for signal in SECONDARY_SIGNALS:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal]
if secondary_df_pull_signal.empty:
logger.warning("No data found for signal", signal=signal)
continue
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
for geo in SECONDARY_GEOS:
df = secondary_df_pull_signal.copy()
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
if geo == "state":
df = df[(df["geo_type"] == "state")]
df["geo_id"] = df["geo_value"].apply(
lambda x: (
us.states.lookup(x).abbr.lower()
if us.states.lookup(x)
else ("dc" if x == "District of Columbia" else x)
)
)
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
if unexpected_state_names.shape[0] > 0:
logger.error(
"Unexpected state names",
unexpected_state_names=unexpected_state_names["geo_value"].unique(),
)
raise RuntimeError
elif geo == "nation":
df = df[(df["geo_type"] == "nation")]
df["geo_id"] = "us"
elif geo == "hhs":
df = df[(df["geo_type"] == "hhs")]
df["geo_id"] = df["geo_value"]
# add se, sample_size, and na codes
missing_cols = set(CSV_COLS) - set(df.columns)
df = add_needed_columns(df, col_names=list(missing_cols))
df_csv = df[CSV_COLS + ["timestamp"]]
# actual export
dates = create_export_csv(
df_csv,
geo_res=geo,
export_dir=export_dir,
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

## log this indicator run
logging(start_time, run_stats, logger)
73 changes: 73 additions & 0 deletions nssp/tests/test_data/secondary_page.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
[
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "COVID-19", "geography": "National",
"percent_visits": "1.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "2.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "COVID-19",
"geography": "National",
"percent_visits": "1.6",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.9",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.7",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "3.2",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Decreasing"
}
]
Loading
Loading