Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address issue where a small number of Sentinel-2 scenes in GCP bucket are missing some bands #110

Merged
merged 9 commits into from
Jan 16, 2025
240 changes: 177 additions & 63 deletions rslearn/data_sources/gcp_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ def deserialize(d: dict[str, Any]) -> "Sentinel2Item":
)


class CorruptItemException(Exception):
"""A Sentinel-2 scene is corrupted or otherwise unreadable for a known reason."""

def __init__(self, message: str) -> None:
"""Create a new CorruptItemException.

Args:
message: error message.
"""
self.message = message


class MissingXMLException(Exception):
"""Exception for when an item's XML file does not exist in GCS.

Some items that appear in the index on BigQuery, or that have a folder, lack an XML
file, and so in those cases this exception can be ignored.
"""

def __init__(self, item_name: str):
"""Create a new MissingXMLException.

Args:
item_name: the name of the item (Sentinel-2 scene) that is missing its XML
file in the GCS bucket.
"""
self.item_name = item_name


# TODO: Distinguish between AWS and GCP data sources in class names.
class Sentinel2(DataSource):
"""A data source for Sentinel-2 data on Google Cloud Storage.
Expand Down Expand Up @@ -295,25 +324,16 @@ def _get_xml_by_name(self, name: str) -> ET.ElementTree:
if not cache_xml_fname.exists():
metadata_blob_path = base_url + "MTD_MSIL1C.xml"
blob = self.bucket.blob(metadata_blob_path)
if not blob.exists():
raise MissingXMLException(name)
with open_atomic(cache_xml_fname, "wb") as f:
blob.download_to_file(f)

with cache_xml_fname.open("rb") as f:
return ET.parse(f)

def get_item_by_name(self, name: str) -> Sentinel2Item:
"""Gets an item by name.

Reads the individual product metadata file (MTD_MSIL1C.xml) to get both the
expected blob path where images are stored as well as the detailed geometry of
the product (not just the bounding box).

Args:
name: the name of the item to get

Returns:
the item object
"""
def _get_item_by_name(self, name: str) -> Sentinel2Item:
# Get the item without the caching logic.
parts = name.split("_")
assert len(parts[5]) == 6
assert parts[5][0] == "T"
Expand Down Expand Up @@ -347,6 +367,19 @@ def get_item_by_name(self, name: str) -> Sentinel2Item:
raise ValueError(f"IMAGE_FILE is empty for {name}")
blob_prefix = base_url + elements[0].text.split("B01")[0]

# Some Sentinel-2 scenes in the bucket are missing a subset of image files. So
# here we verify that all the bands we know about are intact.
expected_suffixes = {t[0] for t in self.BANDS}
for blob in self.bucket.list_blobs(prefix=blob_prefix):
assert blob.name.startswith(blob_prefix)
suffix = blob.name[len(blob_prefix) :]
if suffix in expected_suffixes:
expected_suffixes.remove(suffix)
if len(expected_suffixes) > 0:
raise CorruptItemException(
f"item is missing image files: {expected_suffixes}"
)

elements = list(tree.iter("PRODUCT_START_TIME"))
assert len(elements) == 1
if elements[0].text is None:
Expand All @@ -362,13 +395,112 @@ def get_item_by_name(self, name: str) -> Sentinel2Item:
geometry = STGeometry(WGS84_PROJECTION, shp, (start_time, start_time))
geometry = split_at_prime_meridian(geometry)

# Sometimes the geometry is not valid.
# We just apply make_valid on it to correct issues.
if not geometry.shp.is_valid:
geometry.shp = shapely.make_valid(geometry.shp)

return Sentinel2Item(
name,
geometry,
blob_prefix,
cloud_cover,
)

def get_item_by_name(self, name: str) -> Sentinel2Item:
"""Gets an item by name.

Reads the individual product metadata file (MTD_MSIL1C.xml) to get both the
expected blob path where images are stored as well as the detailed geometry of
the product (not just the bounding box).

Args:
name: the name of the item to get

Returns:
the item object
"""
# Cache the result if _get_item_by_name.
# We want to cache the item if it is successful, but also the
# CorruptItemException if it is raised.
cache_item_fname = self.index_cache_dir / (name + ".json")

if cache_item_fname.exists():
with cache_item_fname.open() as f:
d = json.load(f)

if "error" in d:
raise CorruptItemException(d["error"])

return Sentinel2Item.deserialize(d)

try:
item = self._get_item_by_name(name)
except CorruptItemException as e:
with open_atomic(cache_item_fname, "w") as f:
json.dump({"error": e.message}, f)
raise

with open_atomic(cache_item_fname, "w") as f:
json.dump(item.serialize(), f)
return item

def _read_products_for_cell_year(
self, cell_id: str, year: int
) -> list[Sentinel2Item]:
# Read items for the given cell and year. These items are cached by
# _read_products together in one file.
cell_part1 = cell_id[0:2]
cell_part2 = cell_id[2:3]
cell_part3 = cell_id[3:5]

items = []

for product_prefix in ["S2A_MSIL1C", "S2B_MSIL1C"]:
cell_folder = f"tiles/{cell_part1}/{cell_part2}/{cell_part3}"
blob_prefix = f"{cell_folder}/{product_prefix}_{year}"
blobs = self.bucket.list_blobs(prefix=blob_prefix, delimiter="/")

# Need to consume the iterator to obtain folder names.
# See https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix#storage_list_files_with_prefix-python # noqa: E501
# Previously we checked for .SAFE_$folder$ blobs here, but those do
# not exist for some years like 2017.
for _ in blobs:
pass

logger.debug(
"under %s, found %d folders to scan",
blob_prefix,
len(blobs.prefixes),
)

for prefix in blobs.prefixes:
folder_name = prefix.split("/")[-2]
expected_suffix = ".SAFE"
assert folder_name.endswith(expected_suffix)
item_name = folder_name.split(expected_suffix)[0]

try:
item = self.get_item_by_name(item_name)
except CorruptItemException as e:
logger.warning("skipping corrupt item %s: %s", item_name, e.message)
continue
except MissingXMLException:
# Sometimes there is a .SAFE folder but some files like the
# XML file are just missing for whatever reason. Since we
# know this happens occasionally, we just ignore the error
# here.
logger.warning(
"no metadata XML for Sentinel-2 folder %s/%s",
blob_prefix,
folder_name,
)
continue

items.append(item)

return items

def _read_products(
self, needed_cell_years: set[tuple[str, int]]
) -> Generator[Sentinel2Item, None, None]:
Expand All @@ -391,62 +523,15 @@ def _read_products(
cache_fname = self.index_cache_dir / f"{cell_id}_{year}.json"

if not cache_fname.exists():
cell_part1 = cell_id[0:2]
cell_part2 = cell_id[2:3]
cell_part3 = cell_id[3:5]

items = []

for product_prefix in ["S2A_MSIL1C", "S2B_MSIL1C"]:
cell_folder = f"tiles/{cell_part1}/{cell_part2}/{cell_part3}"
blob_prefix = f"{cell_folder}/{product_prefix}_{year}"
blobs = self.bucket.list_blobs(prefix=blob_prefix, delimiter="/")

# Need to consume the iterator to obtain folder names.
# See https://cloud.google.com/storage/docs/samples/storage-list-files-with-prefix#storage_list_files_with_prefix-python # noqa: E501
# Previously we checked for .SAFE_$folder$ blobs here, but those do
# not exist for some years like 2017.
for _ in blobs:
pass

logger.debug(
"under %s, found %d folders to scan",
blob_prefix,
len(blobs.prefixes),
)

for prefix in blobs.prefixes:
folder_name = prefix.split("/")[-2]
expected_suffix = ".SAFE"
assert folder_name.endswith(expected_suffix)
item_name = folder_name.split(expected_suffix)[0]

# Make sure metadata XML blob exists, otherwise we won't be
# able to load the item.
# (Sometimes there is a .SAFE folder but some files like the
# XML file are just missing for whatever reason.)
xml_blob_path = f"{cell_folder}/{folder_name}/MTD_MSIL1C.xml"
xml_blob = self.bucket.blob(xml_blob_path)
if not xml_blob.exists():
logger.warning(
"no metadata XML for Sentinel-2 folder %s at %s",
folder_name,
xml_blob_path,
)
continue

item = self.get_item_by_name(item_name)
items.append(item)

items = self._read_products_for_cell_year(cell_id, year)
with open_atomic(cache_fname, "w") as f:
json.dump([item.serialize() for item in items], f)

else:
with cache_fname.open() as f:
items = [Sentinel2Item.deserialize(d) for d in json.load(f)]

for item in items:
yield item
yield from items

def _get_candidate_items_index(
self, wgs84_geometries: list[STGeometry]
Expand All @@ -469,7 +554,23 @@ def _get_candidate_items_index(
continue
if not item.geometry.shp.intersects(geometry.shp):
continue
item = self.get_item_by_name(item.name)

# Get the item from XML to get its exact geometry (the index only
# knows the bounding box of the item).
try:
item = self.get_item_by_name(item.name)
except CorruptItemException as e:
logger.warning("skipping corrupt item %s: %s", item.name, e.message)
continue
except MissingXMLException:
# Sometimes a scene that appears in the BigQuery index does not
# actually have an XML file on GCS. Since we know this happens
# occasionally, we ignore the error here.
logger.warning(
"skipping item %s that is missing XML file", item.name
)
continue

if not item.geometry.shp.intersects(geometry.shp):
continue
candidates[idx].append(item)
Expand Down Expand Up @@ -586,7 +687,15 @@ def ingest(
with tempfile.TemporaryDirectory() as tmp_dir:
fname = os.path.join(tmp_dir, suffix)
blob = self.bucket.blob(item.blob_prefix + suffix)
logger.debug(
"gcp_public_data downloading raster file %s",
item.blob_prefix + suffix,
)
blob.download_to_filename(fname)
logger.debug(
"gcp_public_data ingesting raster file %s into tile store",
item.blob_prefix + suffix,
)

# Harmonize values if needed.
# TCI does not need harmonization.
Expand All @@ -611,3 +720,8 @@ def ingest(
tile_store.write_raster_file(
item.name, band_names, UPath(fname)
)

logger.debug(
"gcp_public_data done ingesting raster file %s",
item.blob_prefix + suffix,
)
Loading