From e82eeca0523c2088af794b5e9e890f97b677e090 Mon Sep 17 00:00:00 2001 From: manu-sj Date: Tue, 9 Jul 2024 08:33:51 +0200 Subject: [PATCH] reverting python.py unintentional changes during rebase --- python/hsfs/engine/python.py | 156 +++++++++++++++++++++++++++-------- 1 file changed, 120 insertions(+), 36 deletions(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 7089e74dde..61220141b3 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -57,7 +57,7 @@ feature, feature_store, feature_view, - transformation_function_attached, + transformation_function, util, ) from hsfs import storage_connector as sc @@ -97,6 +97,19 @@ if HAS_SQLALCHEMY: from sqlalchemy import sql + +PYARROW_EXTENSION_ENABLE = False +try: + import pandas as pd + from packaging.version import Version + + if Version(pd.__version__) > Version("2.0"): + PYARROW_EXTENSION_ENABLE = True + else: + PYARROW_EXTENSION_ENABLE = False +except Exception: + PYARROW_EXTENSION_ENABLE = False # Set PYARROW_EXTENSION_ENABLE to false if pyarrow or pandas cannot be imported + # Decimal types are currently not supported _INT_TYPES = [pa.uint8(), pa.uint16(), pa.int8(), pa.int16(), pa.int32()] _BIG_INT_TYPES = [pa.uint32(), pa.int64()] @@ -864,7 +877,22 @@ def get_training_data( query_obj: query.Query, read_options: Dict[str, Any], dataframe_type: str, + training_dataset_version: int = None, ) -> Union[pd.DataFrame, pl.DataFrame]: + """ + Function that creates or retrieves already created the training dataset. + + # Arguments + training_dataset_obj `TrainingDataset`: The training dataset metadata object. + feature_view_obj `FeatureView`: The feature view object for the which the training data is being created. + query_obj `Query`: The query object that contains the query used to create the feature view. + read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data. + dataframe_type `str`: The type of dataframe returned. + training_dataset_version `int`: Version of training data to be retrieved. + # Raises + `ValueError`: If the training dataset statistics could not be retrieved. + """ + # dataframe_type of list and numpy are prevented here because statistics needs to be computed from the returned dataframe. # The daframe is converted into required types in the function split_labels if dataframe_type.lower() not in ["default", "polars", "pandas"]: @@ -877,14 +905,20 @@ def get_training_data( feature_view_obj, read_options, dataframe_type, + training_dataset_version, ) else: df = query_obj.read( read_options=read_options, dataframe_type=dataframe_type ) - transformation_function_engine.TransformationFunctionEngine.populate_builtin_transformation_functions( + # if training_dataset_version is None: + transformation_function_engine.TransformationFunctionEngine.compute_and_set_feature_statistics( training_dataset_obj, feature_view_obj, df ) + # else: + # transformation_function_engine.TransformationFunctionEngine.get_and_set_feature_statistics( + # training_dataset_obj, feature_view_obj, training_dataset_version + # ) return self._apply_transformation_function( training_dataset_obj.transformation_functions, df ) @@ -919,10 +953,21 @@ def _prepare_transform_split_df( feature_view_obj: feature_view.FeatureView, read_option: Dict[str, Any], dataframe_type: str, + training_dataset_version: int = None, ) -> Dict[str, Union[pd.DataFrame, pl.DataFrame]]: """ Split a df into slices defined by `splits`. `splits` is a `dict(str, int)` which keys are name of split and values are split ratios. + + # Arguments + query_obj `Query`: The query object that contains the query used to create the feature view. + training_dataset_obj `TrainingDataset`: The training dataset metadata object. + feature_view_obj `FeatureView`: The feature view object for the which the training data is being created. + read_options `Dict[str, Any]`: Dictionary that can be used to specify extra parameters for reading data. + dataframe_type `str`: The type of dataframe returned. + training_dataset_version `int`: Version of training data to be retrieved. + # Raises + `ValueError`: If the training dataset statistics could not be retrieved. """ if ( training_dataset_obj.splits[0].split_type @@ -955,15 +1000,19 @@ def _prepare_transform_split_df( training_dataset_obj, ) - # apply transformations - # 1st parametrise transformation functions with dt split stats - transformation_function_engine.TransformationFunctionEngine.populate_builtin_transformation_functions( + # TODO : Currently statistics always computed since in memory training dataset retrieved is not consistent + # if training_dataset_version is None: + transformation_function_engine.TransformationFunctionEngine.compute_and_set_feature_statistics( training_dataset_obj, feature_view_obj, result_dfs ) + # else: + # transformation_function_engine.TransformationFunctionEngine.get_and_set_feature_statistics( + # training_dataset_obj, feature_view_obj, training_dataset_version + # ) # and the apply them for split_name in result_dfs: result_dfs[split_name] = self._apply_transformation_function( - training_dataset_obj.transformation_functions, + feature_view_obj.transformation_functions, result_dfs.get(split_name), ) @@ -1103,8 +1152,24 @@ def write_training_dataset( def _return_dataframe_type( self, dataframe: Union[pd.DataFrame, pl.DataFrame], dataframe_type: str ) -> Union[pd.DataFrame, pl.DataFrame, np.ndarray, List[List[Any]]]: - if dataframe_type.lower() in ["default", "pandas", "polars"]: + """ + Returns a dataframe of particular type. + + # Arguments + dataframe `Union[pd.DataFrame, pl.DataFrame]`: Input dataframe + dataframe_type `str`: Type of dataframe to be returned + # Returns + `Union[pd.DataFrame, pl.DataFrame, np.array, list]`: DataFrame of required type. + """ + if dataframe_type.lower() in ["default", "pandas"]: return dataframe + if dataframe_type.lower() == "polars": + if not ( + isinstance(dataframe, pl.DataFrame) or isinstance(dataframe, pl.Series) + ): + return pl.from_pandas(dataframe) + else: + return dataframe if dataframe_type.lower() == "numpy": return dataframe.values if dataframe_type.lower() == "python": @@ -1182,41 +1247,60 @@ def add_file(self, file: Optional[str]) -> Optional[str]: def _apply_transformation_function( self, - transformation_functions: Dict[ - str, transformation_function_attached.TransformationFunctionAttached - ], + transformation_functions: List[transformation_function.TransformationFunction], dataset: Union[pd.DataFrame, pl.DataFrame], inplace=True, ) -> Union[pd.DataFrame, pl.DataFrame]: - for ( - feature_name, - transformation_fn, - ) in transformation_functions.items(): - if isinstance(dataset, pl.DataFrame) or isinstance( - dataset, pl.dataframe.frame.DataFrame - ): - dataset = dataset.with_columns( - pl.col(feature_name).map_elements( - transformation_fn.transformation_fn - ) - ) + """ + Apply transformation function to the dataframe. + + # Arguments + transformation_functions `List[transformation_function.TransformationFunction]` : List of transformation functions. + dataset `Union[pd.DataFrame, pl.DataFrame]`: A pandas or polars dataframe. + # Returns + `DataFrame`: A pandas dataframe with the transformed data. + # Raises + `FeatureStoreException`: If any of the features mentioned in the transformation function is not present in the Feature View. + """ + transformed_features = set() + + if isinstance(dataset, pl.DataFrame) or isinstance( + dataset, pl.dataframe.frame.DataFrame + ): + # Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions. + if PYARROW_EXTENSION_ENABLE: + dataset = dataset.to_pandas( + use_pyarrow_extension_array=True + ) # Zero copy if pyarrow extension can be used. else: - dataset = pd.DataFrame.copy(dataset) - dataset[feature_name] = dataset[feature_name].map( - transformation_fn.transformation_fn - ) - # The below functions is not required for Polars since polars does have object types like pandas - if not ( - isinstance(dataset, pl.DataFrame) - or isinstance(dataset, pl.dataframe.frame.DataFrame) - ): - offline_type = Engine.convert_spark_type_to_offline_type( - transformation_fn.output_type - ) - dataset[feature_name] = Engine._cast_column_to_offline_type( - dataset[feature_name], offline_type + dataset = dataset.to_pandas(use_pyarrow_extension_array=False) + + for tf in transformation_functions: + hopsworks_udf = tf.hopsworks_udf + missing_features = set(hopsworks_udf.transformation_features) - set( + dataset.columns + ) + if missing_features: + raise FeatureStoreException( + f"Features {missing_features} specified in the transformation function '{hopsworks_udf.function_name}' are not present in the feature view. Please specify the feature required correctly." ) + transformed_features.update(tf.hopsworks_udf.transformation_features) + dataset = pd.concat( + [ + dataset, + tf.hopsworks_udf.get_udf()( + *( + [ + dataset[feature] + for feature in tf.hopsworks_udf.transformation_features + ] + ) + ), + ], + axis=1, + ) + dataset = dataset.drop(transformed_features, axis=1) return dataset @staticmethod