Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@
feature_statistics = TransformationStatistics("feature")


@udf(float)
@udf(float, drop=["feature"])
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float)
@udf(float, drop=["feature"])
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float)
@udf(float, drop=["feature"])
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int)
@udf(int, drop=["feature"])
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = sorted(
[value for value in statistics.feature.extended_statistics["unique_values"]]
Expand All @@ -53,7 +53,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
)


@udf(bool)
@udf(bool, drop=["feature"])
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = [
value for value in statistics.feature.extended_statistics["unique_values"]
Expand Down
16 changes: 13 additions & 3 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ def save(
feature_group_instance.feature_store_id,
"featuregroups",
]
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
headers = {"content-type": "application/json"}
feature_group_object = feature_group_instance.update_from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=feature_group_instance.json(),
query_params=query_params,
),
)
return feature_group_object
Expand Down Expand Up @@ -93,7 +97,11 @@ def get(
"featuregroups",
name,
]
query_params = None if version is None else {"version": version}
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
if version is not None:
query_params["version"] = version

fg_objs = []
# In principle unique names are enforced across fg type and this should therefore
Expand Down Expand Up @@ -157,8 +165,10 @@ def get_by_id(
"featuregroups",
feature_group_id,
]

fg_json = _client._send_request("GET", path_params)
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
fg_json = _client._send_request("GET", path_params, query_params)
if (
fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM
or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH
Expand Down
52 changes: 51 additions & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from __future__ import annotations

import warnings
from typing import List

from hsfs import engine, util
from hsfs import engine, feature, util
from hsfs import feature_group as fg
from hsfs.client import exceptions
from hsfs.core import delta_engine, feature_group_base_engine, hudi_engine
Expand All @@ -30,6 +31,40 @@ def __init__(self, feature_store_id):
# cache online feature store connector
self._online_conn = None

def _update_feature_group_schema_on_demand_transformations(
self, feature_group: fg.FeatureGroup, features: List[feature.Feature]
):
"""
Function to update feature group schema based on the on demand transformation available in the feature group.

# Arguments:
feature_group: fg.FeatureGroup. The feature group for which the schema has to be updated.
features: List[feature.Feature]. List of features currently in the feature group
# Returns:
Updated list of features. That has on-demand features and removes dropped features.
"""
if not feature_group.transformation_functions:
return features
else:
transformed_features = []
dropped_features = []
for tf in feature_group.transformation_functions:
transformed_features.append(
feature.Feature(
tf.hopsworks_udf.output_column_names[0],
tf.hopsworks_udf.return_types[0],
on_demand=True,
)
)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)
updated_schema = []

for feat in features:
if feat.name not in dropped_features:
updated_schema.append(feat)
return updated_schema + transformed_features

def save(
self,
feature_group,
Expand All @@ -40,6 +75,11 @@ def save(
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down Expand Up @@ -90,6 +130,11 @@ def insert(
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down Expand Up @@ -283,6 +328,11 @@ def insert_stream(
dataframe_features = engine.get_instance().parse_schema_feature_group(
dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down
40 changes: 11 additions & 29 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from typing import List, Optional, Union

from hsfs import client, feature_view, training_dataset, transformation_function
from hsfs import client, feature_view, training_dataset
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
Expand Down Expand Up @@ -46,7 +46,6 @@ class FeatureViewApi:
_TRANSFORMED_lOG = "transformed"
_UNTRANSFORMED_LOG = "untransformed"


def __init__(self, feature_store_id: int) -> None:
self._feature_store_id = feature_store_id
self._client = client.get_instance()
Expand Down Expand Up @@ -214,28 +213,6 @@ def get_serving_prepared_statement(
self._client._send_request("GET", path, query_params, headers=headers)
)

def get_attached_transformation_fn(
self, name: str, version: int
) -> List["transformation_function.TransformationFunction"]:
"""
Get transformation functions attached to a feature view form the backend

# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.

# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.

# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
return transformation_function.TransformationFunction.from_response_json(
self._client._send_request("GET", path)
)

def create_training_dataset(
self,
name: str,
Expand Down Expand Up @@ -407,7 +384,8 @@ def get_models_provenance(
def enable_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -420,7 +398,8 @@ def enable_feature_logging(
def pause_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -434,7 +413,8 @@ def pause_feature_logging(
def resume_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -448,7 +428,8 @@ def resume_feature_logging(
def materialize_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -469,7 +450,8 @@ def materialize_feature_logging(
def get_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand Down
Loading