Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion message_ix/util/ixmp4.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import ixmp
import ixmp.backend
from ixmp.util.ixmp4 import is_ixmp4backend


Expand Down
99 changes: 67 additions & 32 deletions message_ix/util/scenario_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@
from typing import TYPE_CHECKING, Any, Literal, TypeVar, Union, cast

import pandas as pd

if TYPE_CHECKING:
from ixmp4 import Run
from ixmp4.core import IndexSet, Parameter, Table

from message_ix.core import Scenario

from ixmp import Platform
from ixmp.util.ixmp4 import is_ixmp4backend

Expand All @@ -18,6 +11,13 @@
DEFAULT_TABLE_DATA,
)

if TYPE_CHECKING:
from ixmp4 import Run
from ixmp4.core import IndexSet, Parameter, Table
from ixmp4.data.backend.base import Backend

from message_ix.core import Scenario

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -75,31 +75,43 @@

def add_default_data(scenario: "Scenario") -> None:
"""Add default data expected in a MESSAGEix Scenario."""
if not is_ixmp4backend(scenario.platform._backend):
backend = scenario.platform._backend

if not is_ixmp4backend(backend):
return

# Get the Run associated with the Scenario
run = cast("Run", scenario.platform._backend.index[scenario])
run = cast("Run", backend.index[scenario])

ixmp4_backend = backend._backend

# Add IndexSet data
for indexset_data_info in DEFAULT_INDEXSET_DATA:
indexset = run.optimization.indexsets.get(name=indexset_data_info.name)
indexset = ixmp4_backend.optimization.indexsets.get(
run_id=run.id, name=indexset_data_info.name
)

# Only add default data if they are missing
# NOTE this works because all DEFAULT_INDEXSET_DATA items have str-type data
if missing := [
item for item in indexset_data_info.data if item not in indexset.data
]:
indexset.add(data=list(missing))
ixmp4_backend.optimization.indexsets.add_data(
id=indexset.id, data=list(missing)
)

# Add Table data
for table_data_info in DEFAULT_TABLE_DATA:
table = run.optimization.tables.get(name=table_data_info.name)
table = ixmp4_backend.optimization.tables.get(
run_id=run.id, name=table_data_info.name
)

# Only add default data if they are missing
# NOTE this works because all DEFAULT_TABLE_DATA items have just one row
if not pd.DataFrame(table_data_info.data).isin(table.data).all(axis=None):
table.add(data=table_data_info.data)
ixmp4_backend.optimization.tables.add_data(
id=table.id, data=table_data_info.data
)

# Add Parameter data
for parameter_data_info in DEFAULT_PARAMETER_DATA:
Expand All @@ -108,12 +120,16 @@ def add_default_data(scenario: "Scenario") -> None:
# NOTE parameter_data_info.data *must* contain a 'unit' column
check_existence_of_units(platform=scenario.platform, data=parameter_df)

parameter = run.optimization.parameters.get(name=parameter_data_info.name)
parameter = ixmp4_backend.optimization.parameters.get(
run_id=run.id, name=parameter_data_info.name
)

# Only add default data if they are missing
# NOTE this works because all DEFAULT_PARAMETER_DATA items have just one row
if not parameter_df.isin(parameter.data).all(axis=None):
parameter.add(data=parameter_data_info.data)
ixmp4_backend.optimization.parameters.add_data(
id=parameter.id, data=parameter_data_info.data
)


# TODO Should this really be a ValueError?
Expand Down Expand Up @@ -144,7 +160,9 @@ def ensure_required_indexsets_have_data(scenario: "Scenario") -> None:


def _maybe_add_to_table(
table: "Table", data: Union[dict[str, Any], pd.DataFrame]
table: "Table",
data: Union[dict[str, Any], pd.DataFrame],
backend: "Backend",
) -> None:
"""Add (parts of) `data` to `table` if they are missing."""
# NOTE This function doesn't handle empty data as internally, this won't happen
Expand All @@ -157,7 +175,7 @@ def _maybe_add_to_table(
new_data = data[~data.isin(table.data).all(axis=1)]

# Add new rows to table data
table.add(data=new_data)
backend.optimization.tables.add_data(id=table.id, data=new_data)


def compose_dimension_map(
Expand Down Expand Up @@ -242,44 +260,55 @@ def _find_all_descendants(parent: T) -> list[T]:
new_map_df = data.merge(pd.DataFrame(descendant_data), how="outer")

# Add new rows to map_{dimension} data
_maybe_add_to_table(table=map_parameter, data=new_map_df)
_maybe_add_to_table(
table=map_parameter,
data=new_map_df,
backend=scenario.platform._backend._backend,
)


def _maybe_add_single_item_to_indexset(
indexset: "IndexSet", data: Union[float, int, str]
indexset: "IndexSet", data: Union[float, int, str], backend: "Backend"
) -> None:
"""Add `data` to `indexset` if it is missing."""
if data not in list(indexset.data):
indexset.add(data=data)
backend.optimization.indexsets.add_data(id=indexset.id, data=data)


def _maybe_add_list_to_indexset(
indexset: "IndexSet", data: Union[list[float], list[int], list[str]]
indexset: "IndexSet",
data: Union[list[float], list[int], list[str]],
backend: "Backend",
) -> None:
"""Add missing parts of `data` to `indexset`."""
# NOTE missing will always only have one type, but how to tell mypy?
# NOTE mypy recognizes missing as set[float | str]. If int indexsets mysteriously
# turn to float indexsets, look here
if missing := set(data) - set(indexset.data):
indexset.add(list(missing)) # type: ignore[arg-type]
backend.optimization.indexsets.add_data(id=indexset.id, data=list(missing)) # type: ignore[arg-type]


def _maybe_add_to_indexset(
indexset: "IndexSet",
data: Union[float, int, str, list[float], list[int], list[str]],
backend: "Backend",
) -> None:
"""Add (parts of) `data` to `indexset` if they are missing."""
# NOTE This function doesn't handle empty data as internally, this won't happen
if not isinstance(data, list):
_maybe_add_single_item_to_indexset(indexset=indexset, data=data)
_maybe_add_single_item_to_indexset(
indexset=indexset, data=data, backend=backend
)
else:
_maybe_add_list_to_indexset(indexset=indexset, data=data)
_maybe_add_list_to_indexset(indexset=indexset, data=data, backend=backend)


# NOTE this could be combined with `_maybe_add_to_table()`, but that function would be
# slower than necessary (though likely not by much). Is the maintenance effort worth it?
def _maybe_add_to_parameter(
parameter: "Parameter", data: Union[dict[str, Any], pd.DataFrame]
parameter: "Parameter",
data: Union[dict[str, Any], pd.DataFrame],
backend: "Backend",
) -> None:
"""Add (parts of) `data` to `parameter` if they are missing."""
# NOTE This function doesn't handle empty data as internally, this won't happen
Expand All @@ -299,7 +328,7 @@ def _maybe_add_to_parameter(
)

# Add new rows to table data
parameter.add(data=new_data)
backend.optimization.parameters.add_data(id=parameter.id, data=new_data)


def compose_maps(scenario: "Scenario") -> None:
Expand All @@ -324,16 +353,18 @@ def compose_period_map(scenario: "Scenario") -> None:
This covers `assignPeriodMaps()` from ixmp_source.
"""
if not is_ixmp4backend(scenario.platform._backend):
backend = scenario.platform._backend
if not is_ixmp4backend(backend):
return

# Get the Run associated with the Scenario
run = cast("Run", scenario.platform._backend.index[scenario])
run = cast("Run", backend.index[scenario])
ixmp4_backend = backend._backend

# TODO Included here in ixmp_source; this should likely move to add_default_data
# Add one default item to 'type_year'
type_year = run.optimization.indexsets.get(name="type_year")
_maybe_add_to_indexset(indexset=type_year, data="cumulative")
_maybe_add_to_indexset(indexset=type_year, data="cumulative", backend=ixmp4_backend)

cat_year = run.optimization.tables.get(name="cat_year")
cat_year_df = pd.DataFrame(cat_year.data)
Expand Down Expand Up @@ -363,17 +394,20 @@ def compose_period_map(scenario: "Scenario") -> None:
# Ensure that years are sorted
sorted_years = sorted(years)
if years != sorted_years:
year.remove(data=years)
year.add(data=sorted_years)
ixmp4_backend.optimization.indexsets.remove_data(id=year.id, data=years)
ixmp4_backend.optimization.indexsets.add_data(id=year.id, data=sorted_years)

# Store years within the model horizon
for y in sorted_years:
if first_model_year is None or first_model_year <= y:
y_str = str(y)
_maybe_add_to_indexset(indexset=type_year, data=y_str)
_maybe_add_to_indexset(
indexset=type_year, data=y_str, backend=ixmp4_backend
)
_maybe_add_to_table(
table=cat_year,
data={"type_year": ["cumulative", y_str], "year": [y_str, y_str]},
backend=ixmp4_backend,
)

# Initialize duration_period with this data
Expand All @@ -394,6 +428,7 @@ def compose_period_map(scenario: "Scenario") -> None:
"values": durations,
"units": ["y"] * len(sorted_years),
},
backend=ixmp4_backend,
)


Expand Down
Loading