diff --git a/hhs_hosp/delphi_hhs/run.py b/hhs_hosp/delphi_hhs/run.py index 5fc5c20d0..1dd47bc4a 100644 --- a/hhs_hosp/delphi_hhs/run.py +++ b/hhs_hosp/delphi_hhs/run.py @@ -9,14 +9,13 @@ import time from delphi_epidata import Epidata -from delphi_utils.export import create_export_csv -from delphi_utils.geomap import GeoMapper -from delphi_utils import get_structured_logger +from delphi_utils import create_export_csv, get_structured_logger, Nans, GeoMapper import numpy as np import pandas as pd from .constants import SIGNALS, GEOS, SMOOTHERS, CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU + def _date_to_int(d): """Return a date object as a yyyymmdd int.""" return int(d.strftime("%Y%m%d")) @@ -64,6 +63,19 @@ def generate_date_ranges(start, end): return output +def add_nancodes(df): + """Add nancodes to a signal dataframe.""" + # Default missingness codes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_APPLICABLE + df["missing_sample_size"] = Nans.NOT_APPLICABLE + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() + df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER + return df + + def run_module(params): """ Generate ground truth HHS hospitalization data. @@ -79,16 +91,16 @@ def run_module(params): """ start_time = time.time() logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) mapper = GeoMapper() request_all_states = ",".join(mapper.get_geo_values("state_id")) end_day = date.today() - if "epidata" in params["common"] and \ - "as_of" in params["common"]["epidata"]: + if "epidata" in params["common"] and "as_of" in params["common"]["epidata"]: end_day = min( - end_day, - datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date() + end_day, datetime.strptime(str(params["common"]["epidata"]["as_of"]), "%Y%m%d").date() ) past_reference_day = date(year=2020, month=1, day=1) # first available date in DB date_range = generate_date_ranges(past_reference_day, end_day) @@ -100,33 +112,32 @@ def run_module(params): raise Exception(f"Bad result from Epidata for {r}: {response['message']}") if response["result"] == -2 and r == date_range[-1]: # -2 code means no results continue - dfs.append(pd.DataFrame(response['epidata'])) + dfs.append(pd.DataFrame(response["epidata"])) all_columns = pd.concat(dfs) geo_mapper = GeoMapper() stats = [] for sensor, smoother, geo in product(SIGNALS, SMOOTHERS, GEOS): - logger.info("Generating signal and exporting to CSV", - geo_res = geo, - sensor = sensor, - smoother = smoother) - df = geo_mapper.add_geocode(make_signal(all_columns, sensor), - "state_id", - "state_code", - from_col="state") + logger.info( + "Generating signal and exporting to CSV", geo_res=geo, sensor=sensor, smoother=smoother + ) + df = geo_mapper.add_geocode( + make_signal(all_columns, sensor), "state_id", "state_code", from_col="state" + ) if sensor.endswith("_prop"): - df=pop_proportion(df, geo_mapper) + df = pop_proportion(df, geo_mapper) df = make_geo(df, geo, geo_mapper) + df["se"] = np.nan + df["sample_size"] = np.nan df = smooth_values(df, smoother[0]) + df = add_nancodes(df) if df.empty: continue sensor_name = sensor + smoother[1] # don't export first 6 days for smoothed signals since they'll be nan. start_date = min(df.timestamp) + timedelta(6) if smoother[1] else min(df.timestamp) - dates = create_export_csv(df, - params["common"]["export_dir"], - geo, - sensor_name, - start_date=start_date) + dates = create_export_csv( + df, params["common"]["export_dir"], geo, sensor_name, start_date=start_date + ) if len(dates) > 0: stats.append((max(dates), len(dates))) @@ -135,71 +146,75 @@ def run_module(params): csv_export_count = sum(s[-1] for s in stats) max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d") - logger.info("Completed indicator run", - elapsed_time_in_seconds = elapsed_time_in_seconds, - csv_export_count = csv_export_count, - max_lag_in_days = max_lag_in_days, - oldest_final_export_date = formatted_min_max_date) + logger.info( + "Completed indicator run", + elapsed_time_in_seconds=elapsed_time_in_seconds, + csv_export_count=csv_export_count, + max_lag_in_days=max_lag_in_days, + oldest_final_export_date=formatted_min_max_date, + ) def smooth_values(df, smoother): """Smooth the value column in the dataframe.""" df["val"] = df["val"].astype(float) - df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform( - smoother.smooth - ) + df["val"] = df[["geo_id", "val"]].groupby("geo_id")["val"].transform(smoother.smooth) return df -def pop_proportion(df,geo_mapper): + +def pop_proportion(df, geo_mapper): """Get the population-proportionate variants as the dataframe val.""" - pop_val=geo_mapper.add_population_column(df, "state_code") - df["val"]=round(df["val"]/pop_val["population"]*100000, 7) + pop_val = geo_mapper.add_population_column(df, "state_code") + df["val"] = round(df["val"] / pop_val["population"] * 100000, 7) pop_val.drop("population", axis=1, inplace=True) return df + def make_geo(state, geo, geo_mapper): """Transform incoming geo (state) to another geo.""" if geo == "state": exported = state.rename(columns={"state": "geo_id"}) else: - exported = geo_mapper.replace_geocode(state, "state_code", geo, new_col="geo_id") - exported["se"] = np.nan - exported["sample_size"] = np.nan + exported = geo_mapper.replace_geocode( + state, "state_code", geo, new_col="geo_id", date_col="timestamp" + ) return exported def make_signal(all_columns, sig): """Generate column sums according to signal name.""" - assert sig in SIGNALS, f"Unexpected signal name '{sig}';" + \ - " familiar names are '{', '.join(SIGNALS)}'" + assert sig in SIGNALS, ( + f"Unexpected signal name '{sig}';" + " familiar names are '{', '.join(SIGNALS)}'" + ) if sig.startswith(CONFIRMED): - df = pd.DataFrame({ - "state": all_columns.state.apply(str.lower), - "timestamp":int_date_to_previous_day_datetime(all_columns.date), - "val": \ - all_columns.previous_day_admission_adult_covid_confirmed + \ - all_columns.previous_day_admission_pediatric_covid_confirmed - }) + df = pd.DataFrame( + { + "state": all_columns.state.apply(str.lower), + "timestamp": int_date_to_previous_day_datetime(all_columns.date), + "val": all_columns.previous_day_admission_adult_covid_confirmed + + all_columns.previous_day_admission_pediatric_covid_confirmed, + } + ) elif sig.startswith(SUM_CONF_SUSP): - df = pd.DataFrame({ - "state": all_columns.state.apply(str.lower), - "timestamp":int_date_to_previous_day_datetime(all_columns.date), - "val": \ - all_columns.previous_day_admission_adult_covid_confirmed + \ - all_columns.previous_day_admission_adult_covid_suspected + \ - all_columns.previous_day_admission_pediatric_covid_confirmed + \ - all_columns.previous_day_admission_pediatric_covid_suspected, - }) + df = pd.DataFrame( + { + "state": all_columns.state.apply(str.lower), + "timestamp": int_date_to_previous_day_datetime(all_columns.date), + "val": all_columns.previous_day_admission_adult_covid_confirmed + + all_columns.previous_day_admission_adult_covid_suspected + + all_columns.previous_day_admission_pediatric_covid_confirmed + + all_columns.previous_day_admission_pediatric_covid_suspected, + } + ) elif sig.startswith(CONFIRMED_FLU): - df = pd.DataFrame({ - "state": all_columns.state.apply(str.lower), - "timestamp":int_date_to_previous_day_datetime(all_columns.date), - "val": \ - all_columns.previous_day_admission_influenza_confirmed - }) - else: - raise Exception( - "Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal" + df = pd.DataFrame( + { + "state": all_columns.state.apply(str.lower), + "timestamp": int_date_to_previous_day_datetime(all_columns.date), + "val": all_columns.previous_day_admission_influenza_confirmed, + } ) + else: + raise Exception("Bad programmer: signal '{sig}' in SIGNALS but not handled in make_signal") df["val"] = df.val.astype(float) return df diff --git a/hhs_hosp/tests/test_run.py b/hhs_hosp/tests/test_run.py index fc393e336..afb88604f 100644 --- a/hhs_hosp/tests/test_run.py +++ b/hhs_hosp/tests/test_run.py @@ -4,11 +4,11 @@ import tempfile import os -from delphi_hhs.run import _date_to_int, int_date_to_previous_day_datetime, generate_date_ranges, \ +from delphi_hhs.run import _date_to_int, add_nancodes, int_date_to_previous_day_datetime, generate_date_ranges, \ make_signal, make_geo, run_module, pop_proportion from delphi_hhs.constants import SMOOTHERS, GEOS, SIGNALS, \ CONFIRMED, SUM_CONF_SUSP, CONFIRMED_FLU, CONFIRMED_PROP, SUM_CONF_SUSP_PROP, CONFIRMED_FLU_PROP -from delphi_utils.geomap import GeoMapper +from delphi_utils import GeoMapper, Nans from freezegun import freeze_time import numpy as np import pandas as pd @@ -85,7 +85,7 @@ def test_make_signal(): }) pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU)) pd.testing.assert_frame_equal(expected_flu, make_signal(data, CONFIRMED_FLU_PROP)) - + with pytest.raises(Exception): make_signal(data, "zig") @@ -93,7 +93,7 @@ def test_pop_proportion(): geo_mapper = GeoMapper() state_pop = geo_mapper.get_crosswalk("state_code", "pop") - test_df = pd.DataFrame({ + test_df = pd.DataFrame({ 'state': ['PA'], 'state_code': [42], 'timestamp': [datetime(year=2020, month=1, day=1)], @@ -109,7 +109,7 @@ def test_pop_proportion(): 'val': [15/pa_pop*100000],}) ) - test_df= pd.DataFrame({ + test_df= pd.DataFrame({ 'state': ['WV'], 'state_code': [54], 'timestamp': [datetime(year=2020, month=1, day=1)], @@ -137,30 +137,23 @@ def test_make_geo(): 'val': [1., 2., 4.], }) - template = { - 'se': np.nan, - 'sample_size': np.nan, - } expecteds = { "state": pd.DataFrame( - dict(template, - geo_id=data.state, + dict(geo_id=data.state, timestamp=data.timestamp, val=data.val)), "hhs": pd.DataFrame( - dict(template, - geo_id=['3', '5'], + dict(geo_id=['3', '5'], timestamp=[test_timestamp] * 2, val=[3., 4.])), "nation": pd.DataFrame( - dict(template, - geo_id=['us'], + dict(geo_id=['us'], timestamp=[test_timestamp], val=[7.])) } for geo, expected in expecteds.items(): result = make_geo(data, geo, geo_mapper) - for series in ["geo_id", "timestamp", "val", "se", "sample_size"]: + for series in ["geo_id", "timestamp", "val"]: pd.testing.assert_series_equal(expected[series], result[series], obj=f"{geo}:{series}") @@ -207,3 +200,25 @@ def test_ignore_last_range_no_results(mock_covid_hosp, mock_export): } } assert not run_module(params) # function should not raise value error and has no return value + +def test_add_nancode(): + data = pd.DataFrame({ + 'state': ['PA','WV','OH'], + 'state_code': [42, 54, 39], + 'timestamp': [pd.to_datetime("20200601")]*3, + 'val': [1, 2, np.nan], + 'se': [np.nan] * 3, + 'sample_size': [np.nan] * 3, + }) + expected = pd.DataFrame({ + 'state': ['PA','WV','OH'], + 'state_code': [42, 54, 39], + 'timestamp': [pd.to_datetime("20200601")]*3, + 'val': [1, 2, np.nan], + 'se': [np.nan] * 3, + 'sample_size': [np.nan] * 3, + 'missing_val': [Nans.NOT_MISSING] * 2 + [Nans.OTHER], + 'missing_se': [Nans.NOT_APPLICABLE] * 3, + 'missing_sample_size': [Nans.NOT_APPLICABLE] * 3, + }) + pd.testing.assert_frame_equal(expected, add_nancodes(data))