Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1411] On-Demand Transformation Functions #1371

Merged
merged 4 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@
feature_statistics = TransformationStatistics("feature")


@udf(float)
@udf(float, drop=["feature"])
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.min) / (
statistics.feature.max - statistics.feature.min
)


@udf(float)
@udf(float, drop=["feature"])
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.mean) / statistics.feature.stddev


@udf(float)
@udf(float, drop=["feature"])
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
return (feature - statistics.feature.percentiles[49]) / (
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
)


@udf(int)
@udf(int, drop=["feature"])
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = sorted(
[value for value in statistics.feature.extended_statistics["unique_values"]]
Expand All @@ -53,7 +53,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
)


@udf(bool)
@udf(bool, drop=["feature"])
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
unique_data = [
value for value in statistics.feature.extended_statistics["unique_values"]
Expand Down
16 changes: 13 additions & 3 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ def save(
feature_group_instance.feature_store_id,
"featuregroups",
]
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
headers = {"content-type": "application/json"}
feature_group_object = feature_group_instance.update_from_response_json(
_client._send_request(
"POST",
path_params,
headers=headers,
data=feature_group_instance.json(),
query_params=query_params,
),
)
return feature_group_object
Expand Down Expand Up @@ -93,7 +97,11 @@ def get(
"featuregroups",
name,
]
query_params = None if version is None else {"version": version}
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
if version is not None:
query_params["version"] = version

fg_objs = []
# In principle unique names are enforced across fg type and this should therefore
Expand Down Expand Up @@ -157,8 +165,10 @@ def get_by_id(
"featuregroups",
feature_group_id,
]

fg_json = _client._send_request("GET", path_params)
query_params = {
"expand": ["features", "expectationsuite", "transformationfunctions"]
}
fg_json = _client._send_request("GET", path_params, query_params)
if (
fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM
or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH
Expand Down
8 changes: 6 additions & 2 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def insert(
validation_options: dict = None,
):
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
feature_dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down Expand Up @@ -281,7 +283,9 @@ def insert_stream(
)

dataframe_features = engine.get_instance().parse_schema_feature_group(
dataframe, feature_group.time_travel_format
dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down
40 changes: 11 additions & 29 deletions python/hsfs/core/feature_view_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from typing import List, Optional, Union

from hsfs import client, feature_view, training_dataset, transformation_function
from hsfs import client, feature_view, training_dataset
from hsfs.client.exceptions import RestAPIError
from hsfs.constructor import query, serving_prepared_statement
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
Expand Down Expand Up @@ -46,7 +46,6 @@ class FeatureViewApi:
_TRANSFORMED_lOG = "transformed"
_UNTRANSFORMED_LOG = "untransformed"


def __init__(self, feature_store_id: int) -> None:
self._feature_store_id = feature_store_id
self._client = client.get_instance()
Expand Down Expand Up @@ -214,28 +213,6 @@ def get_serving_prepared_statement(
self._client._send_request("GET", path, query_params, headers=headers)
)

def get_attached_transformation_fn(
self, name: str, version: int
) -> List["transformation_function.TransformationFunction"]:
"""
Get transformation functions attached to a feature view form the backend

# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.

# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.

# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
return transformation_function.TransformationFunction.from_response_json(
self._client._send_request("GET", path)
)

def create_training_dataset(
self,
name: str,
Expand Down Expand Up @@ -407,7 +384,8 @@ def get_models_provenance(
def enable_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -420,7 +398,8 @@ def enable_feature_logging(
def pause_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -434,7 +413,8 @@ def pause_feature_logging(
def resume_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -448,7 +428,8 @@ def resume_feature_logging(
def materialize_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand All @@ -469,7 +450,8 @@ def materialize_feature_logging(
def get_feature_logging(
self,
feature_view_name: str,
feature_view_version: int,):
feature_view_version: int,
):
_client = client.get_instance()
path_params = self._base_path + [
feature_view_name,
Expand Down
122 changes: 57 additions & 65 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
feature_group,
feature_view,
training_dataset_feature,
transformation_function,
util,
)
from hsfs.client import exceptions
Expand Down Expand Up @@ -271,28 +270,6 @@ def get_batch_query_string(
return fs_query.pit_query
return fs_query.query

def get_attached_transformation_fn(
self, name: str, version: int
) -> List[transformation_function.TransformationFunction]:
"""
Get transformation functions attached to a feature view form the backend

# Arguments
name `str`: Name of feature view.
version `ìnt`: Version of feature view.

# Returns
`List[TransformationFunction]` : List of transformation functions attached to the feature view.

# Raises
`RestAPIError`: If the feature view cannot be found from the backend.
`ValueError`: If the feature group associated with the feature view cannot be found.
"""
transformation_functions = (
self._feature_view_api.get_attached_transformation_fn(name, version)
)
return transformation_functions

def create_training_dataset(
self,
feature_view_obj,
Expand Down Expand Up @@ -820,11 +797,6 @@ def get_batch_data(
else:
return feature_dataframe

def transform_batch_data(self, features, transformation_functions):
return engine.get_instance()._apply_transformation_function(
transformation_functions, dataset=features, inplace=False
)

def add_tag(
self, feature_view_obj, name: str, value, training_dataset_version=None
):
Expand Down Expand Up @@ -996,7 +968,16 @@ def _get_logging_fg(self, fv, transformed):
else:
return feature_logging.untransformed_features

def log_features(self, fv, features, prediction=None, transformed=False, write_options=None, training_dataset_version=None, hsml_model=None):
def log_features(
self,
fv,
features,
prediction=None,
transformed=False,
write_options=None,
training_dataset_version=None,
hsml_model=None,
):
default_write_options = {
"start_offline_materialization": False,
}
Expand All @@ -1017,29 +998,41 @@ def log_features(self, fv, features, prediction=None, transformed=False, write_o
)
return fg.insert(df, write_options=default_write_options)

def read_feature_logs(self, fv,
start_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
end_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
filter: Optional[Union[Filter, Logic]]=None,
transformed: Optional[bool]=False,
training_dataset_version=None,
hsml_model=None,
):
def read_feature_logs(
self,
fv,
start_time: Optional[Union[str, int, datetime, datetime.date]] = None,
end_time: Optional[Union[str, int, datetime, datetime.date]] = None,
filter: Optional[Union[Filter, Logic]] = None,
transformed: Optional[bool] = False,
training_dataset_version=None,
hsml_model=None,
):
fg = self._get_logging_fg(fv, transformed)
fv_feat_name_map = self._get_fv_feature_name_map(fv)
query = fg.select_all()
if start_time:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TIME) >= start_time)
query = query.filter(
fg.get_feature(FeatureViewEngine._LOG_TIME) >= start_time
)
if end_time:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TIME) <= end_time)
query = query.filter(
fg.get_feature(FeatureViewEngine._LOG_TIME) <= end_time
)
if training_dataset_version:
query = query.filter(fg.get_feature(FeatureViewEngine._LOG_TD_VERSION) == training_dataset_version)
query = query.filter(
fg.get_feature(FeatureViewEngine._LOG_TD_VERSION)
== training_dataset_version
)
if hsml_model:
query = query.filter(fg.get_feature(FeatureViewEngine._HSML_MODEL) == self.get_hsml_model_value(hsml_model))
query = query.filter(
fg.get_feature(FeatureViewEngine._HSML_MODEL)
== self.get_hsml_model_value(hsml_model)
)
if filter:
query = query.filter(self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map))
query = query.filter(
self._convert_to_log_fg_filter(fg, fv, filter, fv_feat_name_map)
)
df = query.read()
df = df.drop(["log_id", FeatureViewEngine._LOG_TIME], axis=1)
return df
Expand All @@ -1062,9 +1055,12 @@ def _convert_to_log_fg_filter(self, fg, fv, filter, fv_feat_name_map):
)
elif isinstance(filter, Filter):
fv_feature_name = fv_feat_name_map.get(
f"{filter.feature.feature_group_id}_{filter.feature.name}")
f"{filter.feature.feature_group_id}_{filter.feature.name}"
)
if fv_feature_name is None:
raise FeatureStoreException("Filter feature {filter.feature.name} does not exist in feature view feature.")
raise FeatureStoreException(
"Filter feature {filter.feature.name} does not exist in feature view feature."
)
return Filter(
fg.get_feature(filter.feature.name),
filter.condition,
Expand All @@ -1076,32 +1072,30 @@ def _convert_to_log_fg_filter(self, fg, fv, filter, fv_feat_name_map):
def _get_fv_feature_name_map(self, fv) -> Dict[str, str]:
result_dict = {}
for td_feature in fv.features:
fg_feature_key = f"{td_feature.feature_group.id}_{td_feature.feature_group_feature_name}"
fg_feature_key = (
f"{td_feature.feature_group.id}_{td_feature.feature_group_feature_name}"
)
result_dict[fg_feature_key] = td_feature.name
return result_dict

def get_log_timeline(self, fv,
wallclock_time: Optional[
Union[str, int, datetime, datetime.date]] = None,
limit: Optional[int] = None,
transformed: Optional[bool]=False,
) -> Dict[str, Dict[str, str]]:
def get_log_timeline(
self,
fv,
wallclock_time: Optional[Union[str, int, datetime, datetime.date]] = None,
limit: Optional[int] = None,
transformed: Optional[bool] = False,
) -> Dict[str, Dict[str, str]]:
fg = self._get_logging_fg(fv, transformed)
return fg.commit_details(wallclock_time=wallclock_time, limit=limit)

def pause_logging(self, fv):
self._feature_view_api.pause_feature_logging(
fv.name, fv.version
)
self._feature_view_api.pause_feature_logging(fv.name, fv.version)

def resume_logging(self, fv):
self._feature_view_api.resume_feature_logging(
fv.name, fv.version
)
self._feature_view_api.resume_feature_logging(fv.name, fv.version)

def materialize_feature_logs(self, fv, wait):
jobs = self._feature_view_api.materialize_feature_logging(
fv.name, fv.version
)
jobs = self._feature_view_api.materialize_feature_logging(fv.name, fv.version)
if wait:
for job in jobs:
try:
Expand All @@ -1111,6 +1105,4 @@ def materialize_feature_logs(self, fv, wait):
return jobs

def delete_feature_logs(self, fv, transformed):
self._feature_view_api.delete_feature_logs(
fv.name, fv.version, transformed
)
self._feature_view_api.delete_feature_logs(fv.name, fv.version, transformed)
Loading
Loading