Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NSSP secondary source #2074

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
9 changes: 8 additions & 1 deletion nssp/DETAILS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

We import the NSSP Emergency Department Visit data, including percentage and smoothed percentage of ER visits attributable to a given pathogen, from the CDC website. The data is provided at the county level, state level and national level; we do a population-weighted mean to aggregate from county data up to the HRR and MSA levels.

There are 2 sources we grab data from for nssp:
Copy link
Contributor

@aysim319 aysim319 Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that there are some difference that's significant; I actually got 4% of the data having more that .1 difference and I would also mention this to logan/daniel/etc as an fyi

specifically for hhs region 7 seems to be the least similar... at least in this run

Copy link
Contributor Author

@minhkhul minhkhul Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it's expected that hhs got wilder differences since the primary source signal values at hhs geo-level were 1. aggregated from state level data 2. using 2020 population weights, rather than something directly published by the source like the equivalent secondary signals for hhs geos. The public facing docs mentions this weighing already, but I'll update it and link stuff here with more info after the pipeline is in a good spot.

- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
There are 8 signals output from the primary source and 4 output from secondary. Secondary source data is only available from state-level geos and above, though secondary data might be updated more often.

## Geographical Levels
* `state`: reported using two-letter postal code
* `county`: reported using fips code
Expand All @@ -10,4 +15,6 @@ We import the NSSP Emergency Department Visit data, including percentage and smo
* `percent_visits_covid`, `percent_visits_rsv`, `percent_visits_influenza`: percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined`: sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_covid`, `smoothed_percent_visits_rsv`, `smoothed_percent_visits_influenza`: 3 week moving average of the percentage of emergency department patient visits for specified pathogen.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `smoothed_percent_visits_combined`: 3 week moving average of the sum of the three percentages of visits for flu, rsv and covid.
* `percent_visits_covid_secondary`, `percent_visits_rsv_secondary`, `percent_visits_influenza_secondary`: Taken from secondary source, percentage of emergency department patient visits for specified pathogen.
* `percent_visits_combined_secondary`: Taken from secondary source, sum of the three percentages of visits for flu, rsv and covid.
5 changes: 5 additions & 0 deletions nssp/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# NSSP Emergency Department Visit data

We import the NSSP Emergency Department Visit data, currently only the smoothed concentration, from the CDC website, aggregate to the state and national level from the wastewater sample site level, and export the aggregated data.

There are 2 sources we grab data from for nssp:
- Primary source: https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
- Secondary source: https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

For details see the `DETAILS.md` file in this directory.

## Create a MyAppToken
Expand Down
26 changes: 26 additions & 0 deletions nssp/delphi_nssp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@
"fips": str,
}
)

SECONDARY_COLS_MAP = {
"week_end": "timestamp",
"geography": "geo_value",
"percent_visits": "val",
"pathogen": "signal",
}

SECONDARY_SIGNALS_MAP = {
"COVID-19": "pct_ed_visits_covid_secondary",
"Influenza": "pct_ed_visits_influenza_secondary",
"RSV": "pct_ed_visits_rsv_secondary",
"Combined": "pct_ed_visits_combined_secondary",
}

SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
SECONDARY_GEOS = ["state", "nation", "hhs"]

SECONDARY_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_value": str,
"val": float,
"geo_type": str,
"signal": str,
}
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]
100 changes: 84 additions & 16 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
import pandas as pd
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
from .constants import (
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)


def warn_string(df, type_dict):
Expand All @@ -27,38 +36,50 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API.

Parameters
----------
socrata_token: str
My App Token for pulling the NWSS data (could be the same as the nchs data)
test_file: Optional[str]
When not null, name of file from which to read test data
My App Token for pulling the NSSP data (could be the same as the nchs data)
dataset_id: str
The dataset id to pull data from

Returns
-------
pd.DataFrame
Dataframe as described above.
list of dictionaries, each representing a row in the dataset
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
return results


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits primary dataset.

https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand All @@ -72,3 +93,50 @@ def pull_nssp_data(socrata_token: str):

keep_columns = ["timestamp", "geography", "county", "fips"]
return df_ervisits[SIGNALS + keep_columns]


def secondary_pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits secondary dataset.

https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

The output dataset has:

- Each row corresponds to a single observation

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved

# geo_type is not provided in the dataset, so we infer it from the geo_value
# which is either state names, "National" or hhs region numbers
df_ervisits["geo_type"] = "state"

df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"

hhs_region_mask = df_ervisits["geo_value"].str.startswith("Region ")
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
"Region ", ""
)
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"

df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)

df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]

try:
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
except KeyError as exc:
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc

return df_ervisits
52 changes: 50 additions & 2 deletions nssp/delphi_nssp/run.py
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from delphi_utils.geomap import GeoMapper
from delphi_utils.nancodes import add_default_nancodes

from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
from .pull import pull_nssp_data
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
from .pull import pull_nssp_data, secondary_pull_nssp_data


def add_needed_columns(df, col_names=None):
Expand Down Expand Up @@ -81,6 +81,8 @@ def run_module(params):
socrata_token = params["indicator"]["socrata_token"]

run_stats = []

logger.info("Generating primary signals")
## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -137,5 +139,51 @@ def run_module(params):
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

logger.info("Generating secondary signals")
secondary_df_pull = secondary_pull_nssp_data(socrata_token)
for signal in SECONDARY_SIGNALS:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
secondary_df_pull_signal = secondary_df_pull[secondary_df_pull["signal"] == signal]
if secondary_df_pull_signal.empty:
continue
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
for geo in SECONDARY_GEOS:
df = secondary_df_pull_signal.copy()
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
if geo == "state":
df = df[(df["geo_type"] == "state")]
df["geo_id"] = df["geo_value"].apply(
lambda x: (
us.states.lookup(x).abbr.lower()
if us.states.lookup(x)
else ("dc" if x == "District of Columbia" else x)
)
)
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
if unexpected_state_names.shape[0] > 0:
logger.error(
"Unexpected state names",
unexpected_state_names=unexpected_state_names["geo_value"].unique(),
)
raise RuntimeError
elif geo == "nation":
df = df[(df["geo_type"] == "nation")]
df["geo_id"] = "us"
elif geo == "hhs":
df = df[(df["geo_type"] == "hhs")]
df["geo_id"] = df["geo_value"]
# add se, sample_size, and na codes
missing_cols = set(CSV_COLS) - set(df.columns)
df = add_needed_columns(df, col_names=list(missing_cols))
df_csv = df[CSV_COLS + ["timestamp"]]
# actual export
dates = create_export_csv(
df_csv,
geo_res=geo,
export_dir=export_dir,
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

## log this indicator run
logging(start_time, run_stats, logger)
73 changes: 73 additions & 0 deletions nssp/tests/test_data/secondary_page.txt
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
[
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "COVID-19", "geography": "National",
"percent_visits": "1.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.5",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-01T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "2.8",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "COVID-19",
"geography": "National",
"percent_visits": "1.6",
"status": "Reporting",
"trend_on_date": "Decreasing",
"recent_trend": "Decreasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Influenza",
"geography": "National",
"percent_visits": "0.9",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "RSV",
"geography": "National",
"percent_visits": "0.7",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Increasing"
},
{
"week_end": "2022-10-15T00:00:00.000",
"pathogen": "Combined",
"geography": "National",
"percent_visits": "3.2",
"status": "Reporting",
"trend_on_date": "Increasing",
"recent_trend": "Decreasing"
}
]
36 changes: 35 additions & 1 deletion nssp/tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@

from delphi_nssp.pull import (
pull_nssp_data,
secondary_pull_nssp_data,
pull_with_socrata_api,
)
from delphi_nssp.constants import (
SIGNALS,
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)
Expand Down Expand Up @@ -55,6 +61,34 @@ def test_pull_nssp_data(self, mock_socrata):
for signal in SIGNALS:
assert result[signal].notnull().all(), f"{signal} has rogue NaN"

@patch("delphi_nssp.pull.Socrata")
def test_secondary_pull_nssp_data(self, mock_socrata):
# Load test data
with open("test_data/secondary_page.txt", "r") as f:
test_data = json.load(f)

# Mock Socrata client and its get method
mock_client = MagicMock()
mock_client.get.side_effect = [test_data, []] # Return test data on first call, empty list on second call
mock_socrata.return_value = mock_client

# Call function with test token
test_token = "test_token"
result = secondary_pull_nssp_data(test_token)
# print(result)

# Check that Socrata client was initialized with correct arguments
mock_socrata.assert_called_once_with("data.cdc.gov", test_token)

# Check that get method was called with correct arguments
mock_client.get.assert_any_call("7mra-9cq9", limit=50000, offset=0)
aysim319 marked this conversation as resolved.
Show resolved Hide resolved

for col in SECONDARY_KEEP_COLS:
assert result[col].notnull().all(), f"{col} has rogue NaN"

assert result[result['geo_value'].str.startswith('Region') ].empty, "'Region ' need to be removed from geo_value for geo_type 'hhs'"
assert (result[result['geo_type'] == 'nation']['geo_value'] == 'National').all(), "All rows with geo_type 'nation' must have geo_value 'National'"


if __name__ == "__main__":
unittest.main()
Loading