Skip to content

Commit

Permalink
correcting logs and fixing reading dataframes as pandas in spark engine
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Feb 19, 2024
1 parent 5b47c4d commit bb80120
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
28 changes: 24 additions & 4 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,17 +516,28 @@ def _read_dir_from_storage_connector(
)

df = self._drop_helper_columns(
df, feature_view_features, with_primary_keys, primary_keys, False
df,
feature_view_features,
with_primary_keys,
primary_keys,
False,
dataframe_type,
)
df = self._drop_helper_columns(
df, feature_view_features, with_event_time, event_time, False
df,
feature_view_features,
with_event_time,
event_time,
False,
dataframe_type,
)
df = self._drop_helper_columns(
df,
feature_view_features,
with_training_helper_columns,
training_helper_columns,
True,
dataframe_type,
)
return df

Expand All @@ -540,10 +551,19 @@ def _read_dir_from_storage_connector(
raise e

def _drop_helper_columns(
self, df, feature_view_features, with_columns, columns, training_helper
self,
df,
feature_view_features,
with_columns,
columns,
training_helper,
dataframe_type,
):
if not with_columns:
if engine.get_type().startswith("spark"):
if (
engine.get_type().startswith("spark")
and dataframe_type.lower() == "spark"
):
existing_cols = [field.name for field in df.schema.fields]
else:
existing_cols = df.columns
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def get_feature_vector(
return polars_df
else:
raise Exception(
"Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'"
"Unknown return type. Supported return types are 'list', 'polars', 'pandas' and 'numpy'"
)

def get_feature_vectors(
Expand Down Expand Up @@ -351,7 +351,7 @@ def get_feature_vectors(
return polars_df
else:
raise Exception(
"Unknown return type. Supported return types are 'list', 'pandas' and 'numpy'"
"Unknown return type. Supported return types are 'list', 'polars', 'pandas' and 'numpy'"
)

def get_inference_helper(self, entry, return_type):
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def register_delta_temporary_table(
def _return_dataframe_type(self, dataframe, dataframe_type):
if dataframe_type.lower() in ["default", "spark"]:
return dataframe
if dataframe_type.lower() == "pandas":
if dataframe_type.lower() == "pandas" and isinstance(dataframe, DataFrame):
return dataframe.toPandas()
if dataframe_type.lower() == "numpy":
return dataframe.toPandas().values
Expand Down

0 comments on commit bb80120

Please sign in to comment.