Skip to content

Commit

Permalink
adds context manager for all xarray file reading
Browse files Browse the repository at this point in the history
  • Loading branch information
spjuhel committed Sep 26, 2024
1 parent 9bb8856 commit ab28661
Show file tree
Hide file tree
Showing 3 changed files with 600 additions and 609 deletions.
18 changes: 9 additions & 9 deletions climada/hazard/isimip_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ def _read_one_nc(file_name, bbox=None, years=None):
Contains data in the specified bounding box and for the
specified time period
"""
data = xr.open_dataset(file_name, decode_times=False)
if not bbox:
bbox = bbox_world
if not years:
return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]))

time_id = years - int(data['time'].units[12:16])
return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]),
time=slice(time_id[0], time_id[1]))
with xr.open_dataset(file_name, decode_times=False) as data:
if not bbox:
bbox = bbox_world
if not years:
return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]))

time_id = years - int(data['time'].units[12:16])
return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]),
time=slice(time_id[0], time_id[1]))
243 changes: 118 additions & 125 deletions climada/hazard/storm_europe.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,37 +228,33 @@ def _read_one_nc(cls, file_name, centroids, intensity_thres):
new_haz : StormEurope
Hazard instance for one single storm.
"""
ncdf = xr.open_dataset(file_name)

if centroids.size != (ncdf.sizes['latitude'] * ncdf.sizes['longitude']):
ncdf.close()
LOGGER.warning(('Centroids size doesn\'t match NCDF dimensions. '
'Omitting file %s.'), file_name)
return None

# xarray does not penalise repeated assignments, see
# http://xarray.pydata.org/en/stable/data-structures.html
stacked = ncdf['max_wind_gust'].stack(
intensity=('latitude', 'longitude', 'time')
)
stacked = stacked.where(stacked > intensity_thres)
stacked = stacked.fillna(0)

# fill in values from netCDF
ssi_wisc = np.array([float(ncdf.attrs['ssi'])])
intensity = sparse.csr_matrix(stacked)
new_haz = cls(ssi_wisc=ssi_wisc,
intensity=intensity,
event_name=[ncdf.attrs['storm_name']],
date=np.array([datetime64_to_ordinal(ncdf['time'].data[0])]),
# fill in default values
centroids=centroids,
event_id=np.array([1]),
frequency=np.array([1]),
orig=np.array([True]),)

ncdf.close()
return new_haz
with xr.open_dataset(file_name) as ncdf:
if centroids.size != (ncdf.sizes['latitude'] * ncdf.sizes['longitude']):
LOGGER.warning(('Centroids size doesn\'t match NCDF dimensions. '
'Omitting file %s.'), file_name)
return None

# xarray does not penalise repeated assignments, see
# http://xarray.pydata.org/en/stable/data-structures.html
stacked = ncdf['max_wind_gust'].stack(
intensity=('latitude', 'longitude', 'time')
)
stacked = stacked.where(stacked > intensity_thres)
stacked = stacked.fillna(0)

# fill in values from netCDF
ssi_wisc = np.array([float(ncdf.attrs['ssi'])])
intensity = sparse.csr_matrix(stacked)
new_haz = cls(ssi_wisc=ssi_wisc,
intensity=intensity,
event_name=[ncdf.attrs['storm_name']],
date=np.array([datetime64_to_ordinal(ncdf['time'].data[0])]),
# fill in default values
centroids=centroids,
event_id=np.array([1]),
frequency=np.array([1]),
orig=np.array([True]),)
return new_haz

def read_cosmoe_file(self, *args, **kwargs):
"""This function is deprecated, use StormEurope.from_cosmoe_file instead."""
Expand Down Expand Up @@ -309,69 +305,66 @@ def from_cosmoe_file(cls, fp_file, run_datetime, event_date=None,
intensity_thres = cls.intensity_thres if intensity_thres is None else intensity_thres

# read intensity from file
ncdf = xr.open_dataset(fp_file)
ncdf = ncdf.assign_coords(date=('time',ncdf["time"].dt.floor("D").values))

if event_date:
try:
stacked = ncdf.sel(
time=event_date.strftime('%Y-%m-%d')
).groupby('date').max().stack(intensity=('y_1', 'x_1'))
except KeyError as ker:
raise ValueError('Extraction of date and coordinates failed. This is most likely '
'because the selected event_date '
f'{event_date.strftime("%Y-%m-%d")} is not contained in the '
'weather forecast selected by fp_file {fp_file}. Please adjust '
f'event_date or fp_file.') from ker
considered_dates = np.datetime64(event_date)
else:
time_covered_step = ncdf['time'].diff('time')
time_covered_day = time_covered_step.groupby('date').sum()
# forecast run should cover at least 18 hours of a day
considered_dates_bool = time_covered_day >= np.timedelta64(18,'h')
stacked = ncdf.groupby('date').max()\
.sel(date=considered_dates_bool)\
.stack(intensity=('y_1', 'x_1'))
considered_dates = stacked['date'].values
stacked = stacked.stack(date_ensemble=('date', 'epsd_1'))
stacked = stacked.where(stacked['VMAX_10M'] > intensity_thres)
stacked = stacked.fillna(0)

# fill in values from netCDF
intensity = sparse.csr_matrix(stacked['VMAX_10M'].T)
event_id = np.arange(stacked['date_ensemble'].size) + 1
date = np.repeat(
np.array(datetime64_to_ordinal(considered_dates)),
np.unique(ncdf['epsd_1']).size
)
orig = np.full_like(event_id, False)
orig[(stacked['epsd_1'] == 0).values] = True
if description is None:
description = (model_name +
' weather forecast windfield ' +
'for run startet at ' +
run_datetime.strftime('%Y%m%d%H'))

# Create Hazard
haz = cls(
intensity=intensity,
event_id=event_id,
centroids = cls._centroids_from_nc(fp_file),
# fill in default values
orig=orig,
date=date,
event_name=[date_i + '_ens' + str(ens_i)
for date_i, ens_i
in zip(date_to_str(date), stacked['epsd_1'].values + 1)],
frequency=np.divide(
np.ones_like(event_id),
np.unique(ncdf['epsd_1']).size),
)
with xr.open_dataset(fp_file) as ncdf:
ncdf = ncdf.assign_coords(date=('time',ncdf["time"].dt.floor("D").values))
if event_date:
try:
stacked = ncdf.sel(
time=event_date.strftime('%Y-%m-%d')
).groupby('date').max().stack(intensity=('y_1', 'x_1'))
except KeyError as ker:
raise ValueError('Extraction of date and coordinates failed. This is most likely '

Check warning on line 316 in climada/hazard/storm_europe.py

View check run for this annotation

Jenkins - WCR / Pylint

line-too-long

LOW: Line too long (102/100)
Raw output
Used when a line is longer than a given number of characters.
'because the selected event_date '
f'{event_date.strftime("%Y-%m-%d")} is not contained in the '
'weather forecast selected by fp_file {fp_file}. Please adjust '

Check warning on line 319 in climada/hazard/storm_europe.py

View check run for this annotation

Jenkins - WCR / Pylint

line-too-long

LOW: Line too long (101/100)
Raw output
Used when a line is longer than a given number of characters.
f'event_date or fp_file.') from ker
considered_dates = np.datetime64(event_date)
else:
time_covered_step = ncdf['time'].diff('time')
time_covered_day = time_covered_step.groupby('date').sum()
# forecast run should cover at least 18 hours of a day
considered_dates_bool = time_covered_day >= np.timedelta64(18,'h')
stacked = ncdf.groupby('date').max()\
.sel(date=considered_dates_bool)\
.stack(intensity=('y_1', 'x_1'))
considered_dates = stacked['date'].values
stacked = stacked.stack(date_ensemble=('date', 'epsd_1'))
stacked = stacked.where(stacked['VMAX_10M'] > intensity_thres)
stacked = stacked.fillna(0)

# fill in values from netCDF
intensity = sparse.csr_matrix(stacked['VMAX_10M'].T)
event_id = np.arange(stacked['date_ensemble'].size) + 1
date = np.repeat(
np.array(datetime64_to_ordinal(considered_dates)),
np.unique(ncdf['epsd_1']).size
)
orig = np.full_like(event_id, False)
orig[(stacked['epsd_1'] == 0).values] = True
if description is None:
description = (model_name +
' weather forecast windfield ' +
'for run startet at ' +
run_datetime.strftime('%Y%m%d%H'))

# Create Hazard
haz = cls(
intensity=intensity,
event_id=event_id,
centroids = cls._centroids_from_nc(fp_file),
# fill in default values
orig=orig,
date=date,
event_name=[date_i + '_ens' + str(ens_i)
for date_i, ens_i
in zip(date_to_str(date), stacked['epsd_1'].values + 1)],
frequency=np.divide(
np.ones_like(event_id),
np.unique(ncdf['epsd_1']).size),
)

# close netcdf file
ncdf.close()
haz.check()
return haz
haz.check()
return haz

def read_icon_grib(self, *args, **kwargs):
"""This function is deprecated, use StormEurope.from_icon_grib instead."""
Expand Down Expand Up @@ -444,11 +437,12 @@ def from_icon_grib(cls, run_datetime, event_date=None, model_name='icon-eu-eps',
gripfile_path_i = Path(file_i[:-4])
with open(file_i, 'rb') as source, open(gripfile_path_i, 'wb') as dest:
dest.write(bz2.decompress(source.read()))
ds_i = xr.open_dataset(gripfile_path_i, engine='cfgrib')
if ind_i == 0:
stacked = ds_i
else:
stacked = xr.concat([stacked,ds_i], 'valid_time')

with xr.open_dataset(gripfile_path_i, engine='cfgrib') as ds_i:

Check warning on line 441 in climada/hazard/storm_europe.py

View check run for this annotation

Jenkins - WCR / Code Coverage

Not covered line

Line 441 is not covered by tests
if ind_i == 0:
stacked = ds_i
else:
stacked = xr.concat([stacked,ds_i], 'valid_time')

# create intensity matrix with max for each full day
stacked = stacked.assign_coords(
Expand Down Expand Up @@ -524,35 +518,34 @@ def _centroids_from_nc(file_name):
'longitude' variables in a netCDF file.
"""
LOGGER.info('Constructing centroids from %s', file_name)
ncdf = xr.open_dataset(file_name)
create_meshgrid = True
if hasattr(ncdf, 'latitude'):
lats = ncdf['latitude'].data
lons = ncdf['longitude'].data
elif hasattr(ncdf, 'lat'):
lats = ncdf['lat'].data
lons = ncdf['lon'].data
elif hasattr(ncdf, 'lat_1'):
if len(ncdf['lon_1'].shape)>1 & \
(ncdf['lon_1'].shape == ncdf['lat_1'].shape) \
:
lats = ncdf['lat_1'].data.flatten()
lons = ncdf['lon_1'].data.flatten()
with xr.open_dataset(file_name) as ncdf:
create_meshgrid = True
if hasattr(ncdf, 'latitude'):
lats = ncdf['latitude'].data
lons = ncdf['longitude'].data
elif hasattr(ncdf, 'lat'):
lats = ncdf['lat'].data
lons = ncdf['lon'].data
elif hasattr(ncdf, 'lat_1'):
if len(ncdf['lon_1'].shape)>1 & \
(ncdf['lon_1'].shape == ncdf['lat_1'].shape) \
:
lats = ncdf['lat_1'].data.flatten()
lons = ncdf['lon_1'].data.flatten()
create_meshgrid = False
else:
lats = ncdf['lat_1'].data
lons = ncdf['lon_1'].data
elif hasattr(ncdf, 'clat'):
lats = ncdf['clat'].data
lons = ncdf['clon'].data
if ncdf['clat'].attrs['units']=='radian':
lats = np.rad2deg(lats)
lons = np.rad2deg(lons)
create_meshgrid = False
else:
lats = ncdf['lat_1'].data
lons = ncdf['lon_1'].data
elif hasattr(ncdf, 'clat'):
lats = ncdf['clat'].data
lons = ncdf['clon'].data
if ncdf['clat'].attrs['units']=='radian':
lats = np.rad2deg(lats)
lons = np.rad2deg(lons)
create_meshgrid = False
else:
raise AttributeError('netcdf file has no field named latitude or '
'other know abrivation for coordinates.')
ncdf.close()
raise AttributeError('netcdf file has no field named latitude or '
'other know abrivation for coordinates.')

if create_meshgrid:
lats, lons = np.array([np.repeat(lats, len(lons)),
Expand Down
Loading

0 comments on commit ab28661

Please sign in to comment.