-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathadapter.py
142 lines (114 loc) · 4.49 KB
/
adapter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
""" Data Services Swath Projector service for Harmony. """
import mimetypes
import os
import shutil
from tempfile import mkdtemp
from harmony import BaseHarmonyAdapter
from harmony.message import Source as HarmonySource
from harmony.util import HarmonyException, download, generate_output_filename, stage
from pystac import Asset, Item
from swath_projector.reproject import reproject
class SwathProjectorAdapter(BaseHarmonyAdapter):
"""Data Services Swath Projector service for Harmony
This class uses the Harmony utility library for processing the
service input options.
"""
def invoke(self):
"""Adds validation to default process_item-based invocation
Returns
-------
pystac.Catalog
the output catalog
"""
logger = self.logger
logger.info('Starting Data Services Swath Projector Service')
os.environ['HDF5_DISABLE_VERSION_CHECK'] = '1'
self.validate_message()
return super().invoke()
def process_item(self, item: Item, source: HarmonySource):
"""
Processes a single input item. Services that are not aggregating multiple input files
should prefer to implement this method rather than #invoke
This example copies its input to the output, marking "dpi" and "variables" message
attributes as having been processed
Parameters
----------
item : pystac.Item
the item that should be processed
source : harmony.message.Source
the input source defining the variables, if any, to subset from the item
Returns
-------
pystac.Item
a STAC catalog whose metadata and assets describe the service output
"""
logger = self.logger
result = item.clone()
result.assets = {}
# Create a temporary dir for processing we may do
workdir = mkdtemp()
try:
# Get the data file
asset = next(v for v in item.assets.values() if 'data' in (v.roles or []))
granule_url = asset.href
input_filename = download(
granule_url,
workdir,
logger=logger,
access_token=self.message.accessToken,
cfg=self.config,
)
logger.info('Granule data copied')
# Call Reprojection utility
working_filename = reproject(
self.message,
source.shortName,
granule_url,
input_filename,
workdir,
logger,
)
# Stage the output file with a conventional filename
output_filename = generate_output_filename(asset.href, is_regridded=True)
mimetype, _ = mimetypes.guess_type(output_filename, False) or (
'application/x-netcdf4',
None,
)
url = stage(
working_filename,
output_filename,
mimetype,
location=self.message.stagingLocation,
logger=self.logger,
)
# Update the STAC record
asset = Asset(
url, title=output_filename, media_type=mimetype, roles=['data']
)
result.assets['data'] = asset
# Return the output file back to Harmony
logger.info('Reprojection complete')
return result
except Exception as err:
logger.error('Reprojection failed: ' + str(err), exc_info=1)
raise HarmonyException(
'Reprojection failed with error: ' + str(err)
) from err
finally:
# Clean up any intermediate resources
shutil.rmtree(workdir, ignore_errors=True)
def validate_message(self):
"""Check the service was triggered by a valid message containing
the expected number of granules.
"""
if not hasattr(self, 'message'):
raise HarmonyException('No message request')
has_granules = hasattr(self.message, 'granules') and self.message.granules
try:
has_items = bool(self.catalog and next(self.catalog.get_all_items()))
except StopIteration:
has_items = False
if not has_granules and not has_items:
raise HarmonyException('No granules specified for reprojection')
if not isinstance(self.message.granules, list):
raise Exception('Invalid granule list')