-
-
Notifications
You must be signed in to change notification settings - Fork 2
Fix missing / incomplete Parquet & Intake metadata #7
Description
The source.discover()
method shows some details about the internals of a data source within an intake catalog. E.g.
import intake
pudl_cat = intake.cat.pudl_cat
pudl_cat.hourly_emissions_epacems.discover()
{
'dtype': {
'plant_id_eia': 'int32',
'unitid': 'object',
'operating_datetime_utc': 'datetime64[ns, UTC]',
'year': 'int32',
'state': 'int64',
'facility_id': 'int32',
'unit_id_epa': 'object',
'operating_time_hours': 'float32',
'gross_load_mw': 'float32',
'heat_content_mmbtu': 'float32',
'steam_load_1000_lbs': 'float32',
'so2_mass_lbs': 'float32',
'so2_mass_measurement_code': 'int64',
'nox_rate_lbs_mmbtu': 'float32',
'nox_rate_measurement_code': 'int64',
'nox_mass_lbs': 'float32',
'nox_mass_measurement_code': 'int64',
'co2_mass_tons': 'float32',
'co2_mass_measurement_code': 'int64'
},
'shape': (None, 19),
'npartitions': 1,
'metadata': {
'title': 'Continuous Emissions Monitoring System (CEMS) Hourly Data',
'type': 'application/parquet',
'provider': 'US Environmental Protection Agency Air Markets Program',
'path': 'https://ampd.epa.gov/ampd',
'license': {
'name': 'CC-BY-4.0',
'title': 'Creative Commons Attribution 4.0',
'path': 'https://creativecommons.org/licenses/by/4.0'
},
'catalog_dir': '/home/zane/code/catalyst/pudl-data-catalog/src/pudl_catalog/'
}
}
However, some of this information doesn't reflect what's in the parquet files as well as it could. We should make sure:
-
unitid
andunit_id_epa
show up asstring
notobject
- The
category
columnsstate
,so2_mass_measurement_code
,nox_rate_measurement_code
,nox_mass_measurement_code
, andco2_mass_measurement_code
show up ascategory
instead ofint64
(presumably they're appearing as integers because integers are keys in a dictionary of categorical values?) - The
shape
tuple should indicate the number of rows in the dataset, rather thanNone
since that information is stored in the Parquet file metadata. - The nullability of columns dtypes should be preserved.
Some of these issues seem to be arising from Intake, and some of them seem to arise from the metadata that's getting written to the Parquet files in ETL. Looking at the type information for a sample of the data after it's been read back into a pandas dataframe:
filters = year_state_filter(
years=[2019, 2020],
states=["CO", "ID", "TX"],
)
epacems_df = (
pudl_cat.hourly_emissions_epacems(filters=filters)
.to_dask().compute()
)
epacems_df.info(show_counts=True, memory_usage="deep")
epacems_df.info(show_counts=True, memory_usage="deep")
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8006424 entries, 0 to 8006423
Data columns (total 19 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 plant_id_eia 8006424 non-null int32
1 unitid 8006424 non-null object
2 operating_datetime_utc 8006424 non-null datetime64[ns, UTC]
3 year 8006424 non-null int32
4 state 8006424 non-null category
5 facility_id 8006424 non-null int32
6 unit_id_epa 8006424 non-null object
7 operating_time_hours 8003928 non-null float32
8 gross_load_mw 8006424 non-null float32
9 heat_content_mmbtu 8006424 non-null float32
10 steam_load_1000_lbs 33252 non-null float32
11 so2_mass_lbs 3586052 non-null float32
12 so2_mass_measurement_code 3586052 non-null category
13 nox_rate_lbs_mmbtu 3716001 non-null float32
14 nox_rate_measurement_code 3716001 non-null category
15 nox_mass_lbs 3716549 non-null float32
16 nox_mass_measurement_code 3716549 non-null category
17 co2_mass_tons 3688397 non-null float32
18 co2_mass_measurement_code 3688397 non-null category
dtypes: category(5), datetime64[ns, UTC](1), float32(8), int32(3), object(2)
memory usage: 1.3 GB
The categorical values show up correctly as categories, but the other type issues (nullability, string vs/ object) remain. In my experimentation with different ways of writing out the files I think I did see strings, nullable types, and category types coming through fine in this information in the past, so I think there's something wrong with the Parquet metadata. Reading in one file and looking at the metadata directly, they all appear to be correct:
import pyarrow.parquet as pq
epacems_pq = pq.read_table("../data/hourly_emissions_epacems/epacems-2020-ID.parquet")
{name: dtype for name, dtype in zip(epacems_pq.schema.names, epacems_pq.schema.types)}
{
'plant_id_eia': DataType(int32),
'unitid': DataType(string),
'operating_datetime_utc': TimestampType(timestamp[ms, tz=UTC]),
'year': DataType(int32),
'state': DictionaryType(dictionary<values=string, indices=int32, ordered=0>),
'facility_id': DataType(int32),
'unit_id_epa': DataType(string),
'operating_time_hours': DataType(float),
'gross_load_mw': DataType(float),
'heat_content_mmbtu': DataType(float),
'steam_load_1000_lbs': DataType(float),
'so2_mass_lbs': DataType(float),
'so2_mass_measurement_code': DictionaryType(dictionary<values=string, indices=int32, ordered=0>),
'nox_rate_lbs_mmbtu': DataType(float),
'nox_rate_measurement_code': DictionaryType(dictionary<values=string, indices=int32, ordered=0>),
'nox_mass_lbs': DataType(float),
'nox_mass_measurement_code': DictionaryType(dictionary<values=string, indices=int32, ordered=0>),
'co2_mass_tons': DataType(float),
'co2_mass_measurement_code': DictionaryType(dictionary<values=string, indices=int32, ordered=0>)
}
epacems_pq.schema
plant_id_eia: int32 not null
-- field metadata --
description: 'The unique six-digit facility identification number, also' + 69
unitid: string not null
-- field metadata --
description: 'Facility-specific unit id (e.g. Unit 4)'
operating_datetime_utc: timestamp[ms, tz=UTC] not null
-- field metadata --
description: 'Date and time measurement began (UTC).'
year: int32 not null
-- field metadata --
description: 'Year the data was reported in, used for partitioning EPA ' + 5
state: dictionary<values=string, indices=int32, ordered=0>
-- field metadata --
description: 'Two letter US state abbreviation.'
facility_id: int32
-- field metadata --
description: 'New EPA plant ID.'
unit_id_epa: string
-- field metadata --
description: 'Emissions (smokestake) unit monitored by EPA CEMS.'
operating_time_hours: float
-- field metadata --
description: 'Length of time interval measured.'
gross_load_mw: float not null
-- field metadata --
description: 'Average power in megawatts delivered during time interval' + 10
heat_content_mmbtu: float not null
-- field metadata --
description: 'The energy contained in fuel burned, measured in million ' + 4
steam_load_1000_lbs: float
-- field metadata --
description: 'Total steam pressure produced by a unit during the report' + 8
so2_mass_lbs: float
-- field metadata --
description: 'Sulfur dioxide emissions in pounds.'
so2_mass_measurement_code: dictionary<values=string, indices=int32, ordered=0>
-- field metadata --
description: 'Identifies whether the reported value of emissions was me' + 47
nox_rate_lbs_mmbtu: float
-- field metadata --
description: 'The average rate at which NOx was emitted during a given ' + 12
nox_rate_measurement_code: dictionary<values=string, indices=int32, ordered=0>
-- field metadata --
description: 'Identifies whether the reported value of emissions was me' + 47
nox_mass_lbs: float
-- field metadata --
description: 'NOx emissions in pounds.'
nox_mass_measurement_code: dictionary<values=string, indices=int32, ordered=0>
-- field metadata --
description: 'Identifies whether the reported value of emissions was me' + 47
co2_mass_tons: float
-- field metadata --
description: 'Carbon dioxide emissions in short tons.'
co2_mass_measurement_code: dictionary<values=string, indices=int32, ordered=0>
-- field metadata --
description: 'Identifies whether the reported value of emissions was me' + 47
-- schema metadata --
description: 'Hourly emissions and plant operational data reported via Co' + 68
primary_key: 'plant_id_eia,unitid,operating_datetime_utc'
However... the epacems.schema.pandas_metadata
is None
so it's relying on the default mapping of PyArrow types to Pandas types, which isn't what we want it to do.
Why isn't the pandas metadata being embedded in the Parquet file? Is it possible to explicitly insert it? The function that's writing the Parquet files is: pudl.etl._etl_one_year_epacems()
and it's using pa.Table.from_pandas()
so.... wtf?
def _etl_one_year_epacems(
year: int,
states: List[str],
pudl_db: str,
out_dir: str,
ds_kwargs: Dict[str, Any],
) -> None:
"""Process one year of EPA CEMS and output year-state paritioned Parquet files."""
pudl_engine = sa.create_engine(pudl_db)
ds = Datastore(**ds_kwargs)
schema = Resource.from_id("hourly_emissions_epacems").to_pyarrow()
for state in states:
with pq.ParquetWriter(
where=Path(out_dir) / f"epacems-{year}-{state}.parquet",
schema=schema,
compression="snappy",
version="2.6",
) as pqwriter:
logger.info(f"Processing EPA CEMS hourly data for {year}-{state}")
df = pudl.extract.epacems.extract(year=year, state=state, ds=ds)
df = pudl.transform.epacems.transform(df, pudl_engine=pudl_engine)
pqwriter.write_table(
pa.Table.from_pandas(df, schema=schema, preserve_index=False)
)
Metadata
Metadata
Assignees
Labels
Type
Projects
Status