Skip to content

Commit d7a7803

Browse files
author
Ariana Barzinpour
committed
initial work towards centralised ds to stac logic
1 parent cf7b897 commit d7a7803

File tree

2 files changed

+299
-0
lines changed

2 files changed

+299
-0
lines changed

odc/stac/eo3/_eo3converter.py

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
"view:azimuth": "eo:azimuth",
5454
"view:sun_azimuth": "eo:sun_azimuth",
5555
"view:sun_elevation": "eo:sun_elevation",
56+
"created": "odc:processing_datetime",
5657
}
5758

5859
(_eo3,) = (

odc/stac/eo3/_stacconverter.py

+298
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
import datetime
2+
import math
3+
from pathlib import Path
4+
from typing import Any, Dict, Iterable, Iterator, List, Optional, Sequence
5+
from urllib.parse import urljoin
6+
import mimetypes
7+
8+
import pystac.asset
9+
import pystac.collection
10+
import pystac.errors
11+
import pystac.item
12+
from pystac.extensions.eo import Band, EOExtension
13+
from pystac.extensions.projection import ProjectionExtension
14+
from pystac.extensions.view import ViewExtension
15+
from pystac import Asset, Item, Link, MediaType
16+
from pystac.utils import datetime_to_str
17+
from pystac.errors import STACError
18+
from odc.geo.geom import Geometry
19+
20+
21+
from datacube.model import Dataset, Product
22+
from datacube.utils.uris import uri_resolve
23+
from datacube.index.eo3 import EO3Grid
24+
from odc.geo import CRS
25+
from odc.geo.geobox import GeoBox
26+
from toolz import dicttoolz
27+
28+
from ._eo3converter import STAC_TO_EO3_RENAMES
29+
30+
MAPPING_EO3_TO_STAC = {v: k for k, v in STAC_TO_EO3_RENAMES.items()}
31+
32+
def _as_stac_instruments(value: str):
33+
"""
34+
>>> _as_stac_instruments('TM')
35+
['tm']
36+
>>> _as_stac_instruments('OLI')
37+
['oli']
38+
>>> _as_stac_instruments('ETM+')
39+
['etm']
40+
>>> _as_stac_instruments('OLI_TIRS')
41+
['oli', 'tirs']
42+
"""
43+
return [i.strip("+-").lower() for i in value.split("_")]
44+
45+
46+
# may need to be more robust
47+
def _convert_value_to_stac_type(key: str, value):
48+
"""
49+
Convert return type as per STAC specification
50+
"""
51+
# In STAC spec, "instruments" have [String] type
52+
if key == "eo:instrument":
53+
return _as_stac_instruments(value)
54+
# Convert the non-default datetimes to a string
55+
elif isinstance(value, datetime.datetime) and key != "datetime":
56+
return datetime_to_str(value)
57+
else:
58+
return value
59+
60+
61+
def _media_type(path: Path) -> str:
62+
"""
63+
Add media type of the asset object
64+
"""
65+
mime_type = mimetypes.guess_type(path.name)[0]
66+
if path.suffix == ".sha1":
67+
return MediaType.TEXT
68+
elif path.suffix == ".yaml":
69+
return "text/yaml"
70+
elif mime_type:
71+
if mime_type == "image/tiff":
72+
return MediaType.COG
73+
else:
74+
return mime_type
75+
else:
76+
return "application/octet-stream"
77+
78+
79+
def _asset_roles_fields(asset_name: str) -> list[str]:
80+
"""
81+
Add roles of the asset object
82+
"""
83+
if asset_name.startswith("thumbnail"):
84+
return ["thumbnail"]
85+
else:
86+
return ["metadata"]
87+
88+
89+
def _asset_title_fields(asset_name: str) -> str | None:
90+
"""
91+
Add title of the asset object
92+
"""
93+
if asset_name.startswith("thumbnail"):
94+
return "Thumbnail image"
95+
else:
96+
return None
97+
98+
99+
def _proj_fields(grids: dict[str, EO3Grid], grid_name: str = "default") -> dict:
100+
"""
101+
Get any proj (Stac projection extension) fields if we have them for the grid.
102+
"""
103+
if not grids:
104+
return {}
105+
106+
grid_doc = grids.get(grid_name or "default")
107+
if not grid_doc:
108+
return {}
109+
110+
return {
111+
"shape": grid_doc.shape,
112+
"transform": grid_doc.transform,
113+
}
114+
115+
116+
def _lineage_fields(dataset: Dataset) -> dict:
117+
"""
118+
Add custom lineage field to a STAC Item
119+
"""
120+
dataset_doc = dataset.metadata_doc
121+
# using sources vs source_tree?
122+
if dataset.sources:
123+
lineage = {classifier: [str(d.id)] for classifier, d in dataset.sources}
124+
elif dataset_doc.get("lineage"):
125+
# sometimes lineage is included at 'lineage' instead of 'lineage.source_datasets'
126+
# in which case it should already be in {classifier: [ids]} format
127+
lineage = dataset_doc.get("lineage")
128+
# shouldn't need to account for legacy embedded lineage at this point
129+
else:
130+
return {}
131+
# it seems like derived are not accounted for at all - on purpose?
132+
133+
return {"odc:lineage": lineage}
134+
135+
136+
def eo3_to_stac_properties(dataset: Dataset) -> dict:
137+
"""
138+
Convert EO3 properties dictionary to the Stac equivalent.
139+
"""
140+
title = dataset.metadata.label
141+
# explorer has logic to try and figure out a label if missing, should it be included here?
142+
properties = {
143+
# Put the title at the top for document readability.
144+
**(dict(title=title) if title else {}),
145+
**{
146+
MAPPING_EO3_TO_STAC.get(key, key): _convert_value_to_stac_type(key, val)
147+
for key, val in dataset.metadata_doc.properties.items()
148+
},
149+
}
150+
151+
return properties
152+
153+
154+
def ds_to_item(
155+
dataset: Dataset,
156+
stac_item_url: str | None = None, # Either ds.uri or '/collections/<collection>/items/<dataset_id>'
157+
# dataset_location: str | None = None, # I don't think it's necessary to keep this
158+
# odc_dataset_metadata_url: str | None = None, # dataset.raw_doc in explorer (link to odc-metadata.yaml)
159+
# explorer_base_url: str | None = None, # default_redirect in explorer. Not optional unless collection_url is provided
160+
# collection_url: str | None = None, # normally just '/collection/<product.name>', so only needs base url
161+
) -> pystac.Item:
162+
"""
163+
Convert the given ODC Dataset into a Stac Item document.
164+
165+
Note: You may want to call `validate_item(doc)` on the outputs to find any
166+
incomplete properties.
167+
168+
:param collection_url: URL to the Stac Collection. Either this or an explorer_base_url
169+
should be specified for Stac compliance.
170+
:param stac_item_destination_url: Public 'self' URL where the stac document will be findable.
171+
:param dataset_location: Use this location instead of picking from dataset.locations
172+
(for calculating relative band paths)
173+
:param odc_dataset_metadata_url: Public URL for the original ODC dataset yaml document
174+
:param explorer_base_url: An Explorer instance that contains this dataset.
175+
Will allow links to things such as the product definition.
176+
"""
177+
if not dataset.is_eo3:
178+
raise STACError("Cannot convert non-eo3 datasets to STAC")
179+
180+
if stac_item_url is None and not dataset.uri:
181+
raise ValueError("No dataset location provided")
182+
183+
if dataset.extent is not None:
184+
wgs84_geometry = dataset.extent.to_crs("EPSG:4326", math.inf)
185+
geometry = wgs84_geometry.json
186+
bbox = wgs84_geometry.boundingbox.bbox
187+
else:
188+
geometry = None
189+
bbox = None
190+
191+
properties = eo3_to_stac_properties(dataset)
192+
properties.update(_lineage_fields(dataset))
193+
194+
dt = dataset.time
195+
if dt is None:
196+
raise ValueError("Cannot convert dataset with no datetime information")
197+
dt_info = {}
198+
if dt.begin == dt.end:
199+
dt_info["start_datetime"] = dt.begin
200+
dt_info["end_datetime"] = dt.end
201+
else:
202+
dt_info["datetime"] = dt.begin
203+
properties.pop("datetime", None)
204+
205+
item = pystac.item.Item(
206+
id=str(dataset.id),
207+
**dt_info,
208+
properties=properties,
209+
geometry=geometry,
210+
bbox=bbox,
211+
collection=dataset.product.name,
212+
)
213+
214+
EOExtension.ext(item, add_if_missing=True)
215+
216+
# while dataset._gs has already handled grid information, it doesn't allow us to
217+
# access all the information we need here
218+
grids = {name: EO3Grid(grid_spec) for name, grid_spec in dataset.metadata_doc.get("grids", {}).items()}
219+
220+
if geometry:
221+
proj = ProjectionExtension.ext(item, add_if_missing=True)
222+
223+
if dataset.crs is None:
224+
raise STACError("Projection extension requires either epsg or wkt for crs.")
225+
if dataset.crs.epsg is not None:
226+
proj.apply(epsg=dataset.crs.epsg, **_proj_fields(grids))
227+
else:
228+
proj.apply(wkt2=dataset.crs, **_proj_fields(grids))
229+
230+
# To pass validation, only add 'view' extension when we're using it somewhere.
231+
if any(k.startswith("view:") for k in properties.keys()):
232+
ViewExtension.ext(item, add_if_missing=True)
233+
234+
# Without a dataset location, all paths will be relative.
235+
dataset_location = dataset.uri # or should we default to stac_url?
236+
237+
# Add assets that are data
238+
for name, measurement in dataset.measurements.items():
239+
if not dataset_location and not measurement.get("path"):
240+
# No URL to link to. URL is mandatory for Stac validation.
241+
continue
242+
243+
asset = Asset(
244+
href=uri_resolve(dataset_location, measurement.get("path")),
245+
media_type=_media_type(Path(measurement.get("path"))),
246+
title=name,
247+
roles=["data"],
248+
)
249+
eo = EOExtension.ext(asset)
250+
251+
# TODO: pull out more information about the band
252+
band = Band.create(name)
253+
eo.apply(bands=[band])
254+
255+
if grids:
256+
proj_fields = _proj_fields(grids, measurement.get("grid"))
257+
if proj_fields is not None:
258+
proj = ProjectionExtension.ext(asset)
259+
# Not sure how this handles None for an EPSG code
260+
# should we have a wkt2 case like above?
261+
proj.apply(
262+
**proj_fields,
263+
epsg=dataset.crs.epsg,
264+
)
265+
266+
item.add_asset(name, asset=asset)
267+
268+
# Add assets that are accessories
269+
for name, acc in dataset.accessories.items():
270+
if not dataset_location and not acc.get("path"):
271+
# No URL to link to. URL is mandatory for Stac validation.
272+
continue
273+
274+
asset = Asset(
275+
href=uri_resolve(dataset_location, acc.get("path")),
276+
media_type=_media_type(Path(acc.get("path"))),
277+
title=_asset_title_fields(name),
278+
roles=_asset_roles_fields(name),
279+
)
280+
281+
item.add_asset(name, asset=asset)
282+
283+
# should all item links be handled externally?
284+
if stac_item_url:
285+
item.links.append(
286+
Link(
287+
rel="self",
288+
media_type=MediaType.JSON,
289+
target=stac_item_url,
290+
)
291+
)
292+
293+
return item
294+
295+
296+
def ds2stac(datasets: Iterable[Dataset]) -> Iterator[pystac.item.Item]:
297+
for dataset in datasets:
298+
yield ds_to_item(dataset, dataset.uri)

0 commit comments

Comments
 (0)