Skip to content

Commit 8c752d9

Browse files
committed
Update utilities for NAN codes:
* update export utility to export, validate, and test the missing cols * handle deleted rows: replaced with nan values * handle deleted files: replace with an empty CSV file * handle comparisons between CSVs with/without missing cols
1 parent 826614d commit 8c752d9

File tree

4 files changed

+298
-44
lines changed

4 files changed

+298
-44
lines changed

_delphi_utils_python/delphi_utils/archive.py

+39-13
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
from git import Repo
4141
from git.refs.head import Head
4242
import pandas as pd
43+
import numpy as np
4344

4445
from .utils import read_params
4546
from .logger import get_structured_logger
47+
from .nancodes import Nans
4648

4749
Files = List[str]
4850
FileDiffMap = Dict[str, Optional[str]]
@@ -73,8 +75,10 @@ def diff_export_csv(
7375
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
7476
added_df is the pd.DataFrame of added rows from after_csv.
7577
"""
76-
export_csv_dtypes = {"geo_id": str, "val": float,
77-
"se": float, "sample_size": float}
78+
export_csv_dtypes = {
79+
"geo_id": str, "val": float, "se": float, "sample_size": float,
80+
"missing_val": int, "missing_se": int, "missing_sample_size": int
81+
}
7882

7983
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
8084
before_df.set_index("geo_id", inplace=True)
@@ -89,12 +93,22 @@ def diff_export_csv(
8993
before_df_cmn = before_df.reindex(common_idx)
9094
after_df_cmn = after_df.reindex(common_idx)
9195

92-
# Exact comparisons, treating NA == NA as True
93-
same_mask = before_df_cmn == after_df_cmn
94-
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
96+
# If CSVs have different columns (no missingness), mark all values as new
97+
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
98+
same_mask = after_df_cmn.copy()
99+
same_mask.loc[:] = False
100+
else:
101+
# Exact comparisons, treating NA == NA as True
102+
same_mask = before_df_cmn == after_df_cmn
103+
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
104+
105+
# Code deleted entries as nans with the deleted missing code
106+
deleted_df = before_df.loc[deleted_idx, :].copy()
107+
deleted_df[["val", "se", "sample_size"]] = np.nan
108+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
95109

96110
return (
97-
before_df.loc[deleted_idx, :],
111+
deleted_df,
98112
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
99113
after_df.loc[added_idx, :])
100114

@@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
227241

228242
deleted_df, changed_df, added_df = diff_export_csv(
229243
before_file, after_file)
230-
new_issues_df = pd.concat([changed_df, added_df], axis=0)
244+
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)
231245

232246
if len(deleted_df) > 0:
233247
print(
234-
f"Warning, diff has deleted indices in {after_file} that will be ignored")
248+
f"Diff has deleted indices in {after_file} that have been coded as nans.")
235249

236250
# Write the diffs to diff_file, if applicable
237251
if len(new_issues_df) > 0:
@@ -240,6 +254,17 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
240254
new_issues_df.to_csv(diff_file, na_rep="NA")
241255
common_diffs[after_file] = diff_file
242256

257+
# Replace deleted files with empty versions, but only if the cached version is not
258+
# already empty
259+
for deleted_file in deleted_files:
260+
deleted_df = pd.read_csv(deleted_file)
261+
if not deleted_df.empty:
262+
print(
263+
f"Diff has deleted {deleted_file} and replaced it with an empty CSV.")
264+
empty_df = deleted_df[0:0]
265+
new_deleted_filename = join(self.export_dir, basename(deleted_file))
266+
empty_df.to_csv(new_deleted_filename, index=False)
267+
243268
return deleted_files, common_diffs, new_files
244269

245270
def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
@@ -266,9 +291,10 @@ def filter_exports(self, common_diffs: FileDiffMap):
266291
Filter export directory to only contain relevant files.
267292
268293
Filters down the export_dir to only contain:
269-
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only.
270-
Should be called after archive_exports() so we archive the raw exports before
271-
potentially modifying them.
294+
1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows
295+
only, and 3) Deleted files replaced with empty CSVs with the same name. Should
296+
be called after archive_exports() so we archive the raw exports before potentially
297+
modifying them.
272298
273299
Parameters
274300
----------
@@ -297,9 +323,9 @@ def run(self):
297323
self.update_cache()
298324

299325
# Diff exports, and make incremental versions
300-
_, common_diffs, new_files = self.diff_exports()
326+
deleted_files, common_diffs, new_files = self.diff_exports()
301327

302-
# Archive changed and new files only
328+
# Archive changed, new, and emptied deleted files
303329
to_archive = [f for f, diff in common_diffs.items()
304330
if diff is not None]
305331
to_archive += new_files

_delphi_utils_python/delphi_utils/export.py

+40-2
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,32 @@
33
from datetime import datetime
44
from os.path import join
55
from typing import Optional
6+
import logging
67

78
import numpy as np
89
import pandas as pd
910

11+
from .nancodes import Nans
12+
13+
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
14+
"""Find values with contradictory missingness codes, filter them, and log."""
15+
columns = ["val", "se", "sample_size"]
16+
# Get indicies where the XNOR is true (i.e. both are true or both are false).
17+
masks = [
18+
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
19+
for column in columns
20+
]
21+
for mask in masks:
22+
if not logger is None and df.loc[mask].size > 0:
23+
logger.info(
24+
"Filtering contradictory missing code in " +
25+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
26+
)
27+
df = df.loc[~mask]
28+
elif logger is None and df.loc[mask].size > 0:
29+
df = df.loc[~mask]
30+
return df
31+
1032
def create_export_csv(
1133
df: pd.DataFrame,
1234
export_dir: str,
@@ -16,7 +38,8 @@ def create_export_csv(
1638
start_date: Optional[datetime] = None,
1739
end_date: Optional[datetime] = None,
1840
remove_null_samples: Optional[bool] = False,
19-
write_empty_days: Optional[bool] = False
41+
write_empty_days: Optional[bool] = False,
42+
logger: Optional[logging.Logger] = None
2043
):
2144
"""Export data in the format expected by the Delphi API.
2245
@@ -43,6 +66,8 @@ def create_export_csv(
4366
write_empty_days: Optional[bool]
4467
If true, every day in between start_date and end_date will have a CSV file written
4568
even if there is no data for the day. If false, only the days present are written.
69+
logger: Optional[logging.Logger]
70+
Pass a logger object here to log information about contradictory missing codes.
4671
4772
Returns
4873
---------
@@ -70,7 +95,20 @@ def create_export_csv(
7095
else:
7196
export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv"
7297
export_file = join(export_dir, export_filename)
73-
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
98+
expected_columns = [
99+
"geo_id",
100+
"val",
101+
"se",
102+
"sample_size",
103+
"missing_val",
104+
"missing_se",
105+
"missing_sample_size"
106+
]
107+
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
108+
if "missing_val" in export_df.columns:
109+
export_df = filter_contradicting_missing_codes(
110+
export_df, sensor, metric, date, logger=logger
111+
)
74112
if remove_null_samples:
75113
export_df = export_df[export_df["sample_size"].notnull()]
76114
export_df = export_df.round({"val": 7, "se": 7})

0 commit comments

Comments
 (0)