Skip to content

Commit

Permalink
reverting python.py unintentional changes during rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
manu-sj committed Jul 9, 2024
1 parent 1c8e997 commit e82eeca
Showing 1 changed file with 120 additions and 36 deletions.
156 changes: 120 additions & 36 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
feature,
feature_store,
feature_view,
transformation_function_attached,
transformation_function,
util,
)
from hsfs import storage_connector as sc
Expand Down Expand Up @@ -97,6 +97,19 @@
if HAS_SQLALCHEMY:
from sqlalchemy import sql


PYARROW_EXTENSION_ENABLE = False
try:
import pandas as pd
from packaging.version import Version

if Version(pd.__version__) > Version("2.0"):
PYARROW_EXTENSION_ENABLE = True
else:
PYARROW_EXTENSION_ENABLE = False
except Exception:
PYARROW_EXTENSION_ENABLE = False # Set PYARROW_EXTENSION_ENABLE to false if pyarrow or pandas cannot be imported

# Decimal types are currently not supported
_INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()]
_BIG_INT_TYPES = [pa.uint32(), pa.int64()]
Expand Down Expand Up @@ -864,7 +877,22 @@ def get_training_data(
query_obj: query.Query,
read_options: Dict[str, Any],
dataframe_type: str,
training_dataset_version: int = None,
) -> Union[pd.DataFrame, pl.DataFrame]:
"""
Function that creates or retrieves already created the training dataset.
# Arguments
training_dataset_obj `TrainingDataset`: The training dataset metadata object.
feature_view_obj `FeatureView`: The feature view object for the which the training data is being created.
query_obj `Query`: The query object that contains the query used to create the feature view.
read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data.
dataframe_type `str`: The type of dataframe returned.
training_dataset_version `int`: Version of training data to be retrieved.
# Raises
`ValueError`: If the training dataset statistics could not be retrieved.
"""

# dataframe_type of list and numpy are prevented here because statistics needs to be computed from the returned dataframe.
# The daframe is converted into required types in the function split_labels
if dataframe_type.lower() not in ["default", "polars", "pandas"]:
Expand All @@ -877,14 +905,20 @@ def get_training_data(
feature_view_obj,
read_options,
dataframe_type,
training_dataset_version,
)
else:
df = query_obj.read(
read_options=read_options, dataframe_type=dataframe_type
)
transformation_function_engine.TransformationFunctionEngine.populate_builtin_transformation_functions(
# if training_dataset_version is None:
transformation_function_engine.TransformationFunctionEngine.compute_and_set_feature_statistics(
training_dataset_obj, feature_view_obj, df
)
# else:
# transformation_function_engine.TransformationFunctionEngine.get_and_set_feature_statistics(
# training_dataset_obj, feature_view_obj, training_dataset_version
# )
return self._apply_transformation_function(
training_dataset_obj.transformation_functions, df
)
Expand Down Expand Up @@ -919,10 +953,21 @@ def _prepare_transform_split_df(
feature_view_obj: feature_view.FeatureView,
read_option: Dict[str, Any],
dataframe_type: str,
training_dataset_version: int = None,
) -> Dict[str, Union[pd.DataFrame, pl.DataFrame]]:
"""
Split a df into slices defined by `splits`. `splits` is a `dict(str, int)` which keys are name of split
and values are split ratios.
# Arguments
query_obj `Query`: The query object that contains the query used to create the feature view.
training_dataset_obj `TrainingDataset`: The training dataset metadata object.
feature_view_obj `FeatureView`: The feature view object for the which the training data is being created.
read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data.
dataframe_type `str`: The type of dataframe returned.
training_dataset_version `int`: Version of training data to be retrieved.
# Raises
`ValueError`: If the training dataset statistics could not be retrieved.
"""
if (
training_dataset_obj.splits[0].split_type
Expand Down Expand Up @@ -955,15 +1000,19 @@ def _prepare_transform_split_df(
training_dataset_obj,
)

# apply transformations
# 1st parametrise transformation functions with dt split stats
transformation_function_engine.TransformationFunctionEngine.populate_builtin_transformation_functions(
# TODO : Currently statistics always computed since in memory training dataset retrieved is not consistent
# if training_dataset_version is None:
transformation_function_engine.TransformationFunctionEngine.compute_and_set_feature_statistics(
training_dataset_obj, feature_view_obj, result_dfs
)
# else:
# transformation_function_engine.TransformationFunctionEngine.get_and_set_feature_statistics(
# training_dataset_obj, feature_view_obj, training_dataset_version
# )
# and the apply them
for split_name in result_dfs:
result_dfs[split_name] = self._apply_transformation_function(
training_dataset_obj.transformation_functions,
feature_view_obj.transformation_functions,
result_dfs.get(split_name),
)

Expand Down Expand Up @@ -1103,8 +1152,24 @@ def write_training_dataset(
def _return_dataframe_type(
self, dataframe: Union[pd.DataFrame, pl.DataFrame], dataframe_type: str
) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]:
if dataframe_type.lower() in ["default", "pandas", "polars"]:
"""
Returns a dataframe of particular type.
# Arguments
dataframe `Union[pd.DataFrame, pl.DataFrame]`: Input dataframe
dataframe_type `str`: Type of dataframe to be returned
# Returns
`Union[pd.DataFrame, pl.DataFrame, np.array, list]`: DataFrame of required type.
"""
if dataframe_type.lower() in ["default", "pandas"]:
return dataframe
if dataframe_type.lower() == "polars":
if not (
isinstance(dataframe, pl.DataFrame) or isinstance(dataframe, pl.Series)
):
return pl.from_pandas(dataframe)
else:
return dataframe
if dataframe_type.lower() == "numpy":
return dataframe.values
if dataframe_type.lower() == "python":
Expand Down Expand Up @@ -1182,41 +1247,60 @@ def add_file(self, file: Optional[str]) -> Optional[str]:

def _apply_transformation_function(
self,
transformation_functions: Dict[
str, transformation_function_attached.TransformationFunctionAttached
],
transformation_functions: List[transformation_function.TransformationFunction],
dataset: Union[pd.DataFrame, pl.DataFrame],
inplace=True,
) -> Union[pd.DataFrame, pl.DataFrame]:
for (
feature_name,
transformation_fn,
) in transformation_functions.items():
if isinstance(dataset, pl.DataFrame) or isinstance(
dataset, pl.dataframe.frame.DataFrame
):
dataset = dataset.with_columns(
pl.col(feature_name).map_elements(
transformation_fn.transformation_fn
)
)
"""
Apply transformation function to the dataframe.
# Arguments
transformation_functions `List[transformation_function.TransformationFunction]` : List of transformation functions.
dataset `Union[pd.DataFrame, pl.DataFrame]`: A pandas or polars dataframe.
# Returns
`DataFrame`: A pandas dataframe with the transformed data.
# Raises
`FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View.
"""
transformed_features = set()

if isinstance(dataset, pl.DataFrame) or isinstance(
dataset, pl.dataframe.frame.DataFrame
):
# Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions.
if PYARROW_EXTENSION_ENABLE:
dataset = dataset.to_pandas(
use_pyarrow_extension_array=True
) # Zero copy if pyarrow extension can be used.
else:
dataset = pd.DataFrame.copy(dataset)
dataset[feature_name] = dataset[feature_name].map(
transformation_fn.transformation_fn
)
# The below functions is not required for Polars since polars does have object types like pandas
if not (
isinstance(dataset, pl.DataFrame)
or isinstance(dataset, pl.dataframe.frame.DataFrame)
):
offline_type = Engine.convert_spark_type_to_offline_type(
transformation_fn.output_type
)
dataset[feature_name] = Engine._cast_column_to_offline_type(
dataset[feature_name], offline_type
dataset = dataset.to_pandas(use_pyarrow_extension_array=False)

for tf in transformation_functions:
hopsworks_udf = tf.hopsworks_udf
missing_features = set(hopsworks_udf.transformation_features) - set(
dataset.columns
)
if missing_features:
raise FeatureStoreException(
f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly."
)

transformed_features.update(tf.hopsworks_udf.transformation_features)
dataset = pd.concat(
[
dataset,
tf.hopsworks_udf.get_udf()(
*(
[
dataset[feature]
for feature in tf.hopsworks_udf.transformation_features
]
)
),
],
axis=1,
)
dataset = dataset.drop(transformed_features, axis=1)
return dataset

@staticmethod
Expand Down

0 comments on commit e82eeca

Please sign in to comment.