Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_tests/small_multi_stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ training_config:
time_step: 06:00:00
num_steps: 2
policy: "fixed"
offset: 1


# validation config; full validation config is merge of training and validation config
Expand Down
22 changes: 22 additions & 0 deletions packages/common/src/weathergen/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,26 @@ def _apply_fixes(config: Config) -> Config:
eventually removed.
"""
config = _check_logging(config)
config = _check_datasets(config)
return config


def _check_datasets(config: Config) -> Config:
"""
Collect dataset paths under legacy keys.
"""
config = config.copy()
if config.get("data_paths") is None: # TODO remove this for next version
legacy_keys = [
"data_path_anmoi",
"data_path_obs",
"data_path_eobs",
"data_path_fesom",
"data_path_icon",
]
paths = [config.get(key) for key in legacy_keys]
config.data_paths = [path for path in paths if path is not None]

return config


Expand Down Expand Up @@ -526,6 +546,8 @@ def _load_private_conf(private_home: Path | None = None) -> DictConfig:
if "secrets" in private_cf:
del private_cf["secrets"]

private_cf = _check_datasets(private_cf) # TODO: remove temp backward compatibility fix

assert isinstance(private_cf, DictConfig)
return private_cf

Expand Down
26 changes: 7 additions & 19 deletions packages/readers_extra/src/weathergen/readers_extra/registry.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,24 @@
from collections.abc import Callable
from dataclasses import dataclass

from weathergen.common.config import Config


@dataclass
class ReaderEntry:
data_path: str | None
constructor: Callable


def get_extra_reader(name: str, cf: Config) -> object | None:
"""Get an extra reader by name."""
def get_extra_reader(stream_type: str) -> object | None:
"""Get an extra reader by stream_type name."""
# Uses lazy imports to avoid circular dependencies and to not load all the readers at start.
# There is no sanity check on them, so they may fail at runtime during imports

match name:
match stream_type:
case "iconart":
from weathergen.readers_extra.data_reader_iconart import DataReaderIconArt

return ReaderEntry(cf.data_path_icon, DataReaderIconArt)
return DataReaderIconArt
case "eobs":
from weathergen.readers_extra.data_reader_eobs import DataReaderEObs

return ReaderEntry(cf.data_path_eobs, DataReaderEObs)
return DataReaderEObs
case "iconesm":
from weathergen.readers_extra.data_reader_icon_esm import DataReaderIconEsm

return ReaderEntry(cf.data_path_icon_esm, DataReaderIconEsm)
return DataReaderIconEsm
case "cams":
from weathergen.readers_extra.data_reader_cams import DataReaderCams

return ReaderEntry(cf.data_path_cams, DataReaderCams)
return DataReaderCams
case _:
return None
20 changes: 7 additions & 13 deletions src/weathergen/datasets/multi_stream_data_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,40 +143,34 @@ def __init__(
match stream_info["type"]:
case "obs":
dataset = DataReaderObs
datapath = cf.data_path_obs
# kwargs["end"] = end_date_padded # TODO: implement the padding
case "anemoi":
dataset = DataReaderAnemoi
datapath = cf.data_path_anemoi
case "fesom":
dataset = DataReaderFesom
datapath = cf.data_path_fesom
case type_name:
reader_entry = get_extra_reader(type_name, cf)
if reader_entry is not None:
dataset = reader_entry.constructor
datapath = reader_entry.data_path
else:
dataset = get_extra_reader(type_name)
if dataset is None:
msg = f"Unsupported stream type {stream_info['type']}"
f"for stream name '{stream_info['name']}'."
raise ValueError(msg)

datapath = pathlib.Path(datapath)
fname = pathlib.Path(fname)
# dont check if file exists since zarr stores might be directories
if fname.exists():
# check if fname is a valid path to allow for simple overwriting
filename = fname
else:
filename = pathlib.Path(datapath) / fname
filenames = [pathlib.Path(path) / fname for path in cf.data_paths]

if not filename.exists(): # see above
if not any(filename.exists() for filename in filenames): # see above
msg = (
f"Did not find input data for {stream_info['type']} "
f"stream '{stream_info['name']}': {filename}."
f"stream '{stream_info['name']}': {filenames}."
)
raise FileNotFoundError(msg)

filename = filenames[0] # arbitrarly choose first existing path

ds_type = stream_info["type"]
if is_root():
logger.info(
Expand Down
Loading