Skip to content

Commit

Permalink
feat: do not set time as dataframe index in TSLong
Browse files Browse the repository at this point in the history
  • Loading branch information
martibosch committed Oct 1, 2024
1 parent b8a9b28 commit 91b6017
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 15 deletions.
2 changes: 2 additions & 0 deletions tstore/archive/ts/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def get_time_filters(
start_op = ">"
if inclusive in ["both", "left"]:
start_op += "="
# TODO: avoid hardcoding "time" here?
filters.append(("time", start_op, start_time))
if end_time is not None:
end_op = "<"
if inclusive in ["both", "right"]:
end_op += "="
# TODO: avoid hardcoding "time" here?
filters.append(("time", end_op, end_time))

if filters:
Expand Down
9 changes: 9 additions & 0 deletions tstore/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,17 @@ def change_backend(
pa.ChunkedArray: _change_series_backend_from_pyarrow,
}

no_index_types = [DaskDataFrame, PandasDataFrame]
no_index_kwargs = {
"polars": {"include_index": False},
"pyarrow": {"preserve_index": False},
}
for supported_type, change_backend_function in change_backend_functions.items():
if isinstance(obj, supported_type):
if index_var is None and supported_type in no_index_types and new_backend in no_index_kwargs:
backend_kwargs = backend_kwargs.copy()
backend_kwargs.update(no_index_kwargs[new_backend])

new_obj = change_backend_function(
obj,
new_backend=new_backend.replace("geopandas", "pandas"),
Expand Down
6 changes: 3 additions & 3 deletions tstore/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def pandas_long_dataframe(helpers) -> pd.DataFrame:
df_list = []

for store_id in store_ids:
df = helpers.create_dask_dataframe().compute()
df = helpers.create_dask_dataframe().compute().reset_index()
df[ID_VAR] = store_id
df[STATIC_VAR1] = chr(64 + store_id) # A, B, C, D
df[STATIC_VAR2] = float(store_id) # 1.0, 2.0, 3.0, 4.0
Expand All @@ -346,14 +346,14 @@ def pandas_long_dataframe(helpers) -> pd.DataFrame:
@pytest.fixture()
def polars_long_dataframe(pandas_long_dataframe: pd.DataFrame) -> pl.DataFrame:
"""Create a long Polars DataFrame."""
df_pl = pl.from_pandas(pandas_long_dataframe, include_index=True)
df_pl = pl.from_pandas(pandas_long_dataframe)
return df_pl


@pytest.fixture()
def pyarrow_long_dataframe(pandas_long_dataframe: pd.DataFrame) -> pa.Table:
"""Create a long Pyarrow Table."""
df_pa = pa.Table.from_pandas(pandas_long_dataframe, preserve_index=True)
df_pa = pa.Table.from_pandas(pandas_long_dataframe, preserve_index=False)
return df_pa


Expand Down
1 change: 1 addition & 0 deletions tstore/tests/test_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def test_polars(
) -> None:
"""Test on a Pandas TS object."""
filepath = str(tmp_path / "test.parquet")

ts = TS(polars_dataframe)
ts.to_disk(filepath)
ts_loaded = TS.from_disk(filepath, partitions=[], backend="polars")
Expand Down
4 changes: 3 additions & 1 deletion tstore/tsdf/ts_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def from_disk(
**kwargs,
)

df = change_backend(df, new_backend=backend)
# unlike for long data frames (where time is usually not unique), here it makes sense to set the time as index
# TODO: avoid hardcoding "time" here?
df = change_backend(df, new_backend=backend, index_var="time")

# Create the TS object
return TS(df)
Expand Down
3 changes: 1 addition & 2 deletions tstore/tsdf/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,12 @@ def _to_tslong_dask(self) -> "TSLongDask":

df = None
tstore_ids = self._obj[self._tstore_id_var].unique()

long_rows = [self._get_long_rows(tstore_id) for tstore_id in tstore_ids]
df = dd.concat(long_rows)
time_var = df.index.name

return TSLongDask(
df,
df.reset_index(),
id_var=self._tstore_id_var,
time_var=time_var,
ts_vars=self._tstore_ts_vars,
Expand Down
5 changes: 2 additions & 3 deletions tstore/tslong/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ def _get_ts_array(self, variables: list[str]) -> TSArray:
def _get_ts(self, tstore_id: str, variables: list[str]) -> TS:
"""Create a TS object for a given tstore_id and a set of variables."""
df = self._obj
df = df[df[self._tstore_id_var] == tstore_id]
df = df[variables]
return TS(df)
# filter df by id and select only provided variables (and time)
return TS(df[df[self._tstore_id_var] == tstore_id][[*variables, self._tstore_time_var]])

def _get_static_values(self) -> dict[str, list]:
"""Retrieve the static values."""
Expand Down
10 changes: 4 additions & 6 deletions tstore/tslong/tslong.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
change_backend,
get_column_names,
get_dataframe_index,
re_set_dataframe_index,
)
from tstore.tswrapper.tswrapper import TSWrapper

Expand All @@ -35,7 +34,6 @@ def __init__(
ts_vars: Union[dict[str, list[str]], list[str], None] = None,
static_vars: Optional[list[str]] = None,
geometry: Optional[GeoPandasDataFrame] = None,
ensure_time_index: bool = True,
) -> None:
"""Wrap a long-form timeseries DataFrame as a TSLong object.
Expand Down Expand Up @@ -78,8 +76,8 @@ def __init__(
if geometry is not None:
geometry = cast_column_to_large_string(geometry, id_var)

if ensure_time_index:
df = re_set_dataframe_index(df, index_var=time_var)
# if ensure_time_index:
# df = re_set_dataframe_index(df, index_var=time_var)

super().__init__(df)

Expand All @@ -103,7 +101,8 @@ def __new__(cls, *args, **kwargs) -> "TSLong":

def change_backend(self, new_backend: Backend) -> "TSLong":
"""Return a new wrapper with the dataframe converted to a different backend."""
new_df = change_backend(self._obj, new_backend, index_var=self._tstore_time_var)
# new_df = change_backend(self._obj, new_backend, index_var=self._tstore_time_var)
new_df = change_backend(self._obj, new_backend)
return self._rewrap(new_df)

@staticmethod
Expand Down Expand Up @@ -240,7 +239,6 @@ def _check_ts_vars(
ValueError: If the `ts_vars` argument contains repeated or unavailable column names.
"""
available_cols = set(get_column_names(df)) - {id_var, time_var} - set(static_vars)

requested_cols = set()
for cols in ts_vars.values():
new_cols = set(cols)
Expand Down

0 comments on commit 91b6017

Please sign in to comment.