Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit 6643e2a

Browse files
author
Ben Epstein
authored
Merge pull request #136 from splicemachine/DBAAS-5219
Dbaas 5219 - Pipeline feature aggregation support
2 parents dbfadbe + f7ea54d commit 6643e2a

File tree

6 files changed

+350
-39
lines changed

6 files changed

+350
-39
lines changed

requirements-docs.txt

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
py4j==0.10.7.0
2-
pytest
32
mlflow==1.8.0
4-
pyyaml==5.4
5-
mleap==0.15.0
63
graphviz==0.13
74
requests
85
gorilla==0.3.0
@@ -11,9 +8,7 @@ pyspark-dist-explore==0.1.8
118
numpy==1.18.2
129
pandas==1.0.3
1310
scipy==1.4.1
14-
tensorflow==2.2.1
1511
pyspark
16-
h2o-pysparkling-2.4==3.28.1.2-1
12+
h2o-pysparkling-3.0==3.32.0.4-1
1713
sphinx-tabs
1814
IPython
19-
cloudpickle==1.6.0

splicemachine/features/feature_store.py

Lines changed: 248 additions & 31 deletions
Large diffs are not rendered by default.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .utils import AggWindow, FeatureAgg
2+
from .feature_aggregation import FeatureAggregation
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from typing import List
2+
3+
class FeatureAggregation:
4+
def __init__(self, column_name: str, agg_functions: List[str], agg_windows: List[str],
5+
feature_name_prefix: str = None, agg_default_value: float = None ):
6+
"""
7+
A class abstraction for defining aggregations that will generate a set of features for a feature set from a Pipeline
8+
9+
:param column_name: Name of the source column from a Source SQL statement
10+
:param agg_function: type of SQL aggregation (sum, min, max, avg, count etc). Available aggregations
11+
are in the splicemachine.features.pipelines.FeatureAgg class
12+
:param agg_windows: list of time windows over which to perform the aggregation ("1w","5d","3m")
13+
available windows: "s"econd, "m"inute, "d"ay, "w"eek, "mn"onth, "q"uarter, "y"ear
14+
:param agg_default_value: the default value in case of a null (no activity in the window)
15+
available windows: "s"econd, "m"ute, "d"ay, "w"eek, "mn"onth, "q"uarter, "y"ear
16+
setting this parameter would create 1 new feature for every agg_window+agg_function pair specified
17+
18+
The name of the features created through this FeatureAggregation will be: schema_table_sourcecol_aggfunc_aggwindow
19+
20+
:Example:
21+
.. code-block:: python
22+
23+
from splicemachine.features.pipelines import AggWindow, FeatureAgg, FeatureAggregation
24+
FeatureAggregation('revenue', [FeatureAgg.SUM, FeatureAgg.AVG],\
25+
[AggWindow.get_window(5, AggWindow.DAY), AggWindow.get_window(10, AggWindow.SECOND)], 0.0)
26+
27+
would yield:
28+
customer_rfm_revenue_wrate_sum_1d
29+
customer_rfm_revenue_wrate_sum_5w
30+
customer_rfm_revenue_wrate_mean_1d
31+
customer_rfm_revenue_wrate_mean_5w
32+
"""
33+
self.column_name=column_name
34+
self.agg_functions= agg_functions
35+
self.agg_windows = agg_windows
36+
self.agg_default_value = agg_default_value
37+
self.feature_name_prefix = feature_name_prefix
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
class FeatureAgg:
2+
"""
3+
A class defining the valid Aggregation functions available to features in order to create FeatureAggregations
4+
"""
5+
SUM = 'sum'
6+
COUNT = 'count'
7+
AVG = 'avg'
8+
MEAN = 'avg'
9+
MIN = 'min'
10+
MAX = 'max'
11+
12+
@staticmethod
13+
def get_valid():
14+
return (FeatureAgg.SUM, FeatureAgg.COUNT, FeatureAgg.AVG, FeatureAgg.MIN, FeatureAgg.MAX)
15+
16+
class AggWindow:
17+
"""
18+
A class defining the valid window types available to aggregation functions for use in FeatureAggregations
19+
"""
20+
SECOND = 's'
21+
MINUTE = 'm'
22+
HOUR = 'h'
23+
DAY = 'd'
24+
WEEK = 'w'
25+
MONTH = 'mn'
26+
QUARTER = 'q'
27+
YEAR = 'y'
28+
29+
@staticmethod
30+
def get_valid():
31+
return (AggWindow.SECOND, AggWindow.MINUTE, AggWindow.HOUR, AggWindow.DAY,
32+
AggWindow.WEEK, AggWindow.MONTH, AggWindow.QUARTER, AggWindow.YEAR)
33+
34+
@staticmethod
35+
def get_window(length: int, window: str):
36+
"""
37+
A function to help in creating a valid window aggregation. Validates and returns the proper window aggregation
38+
syntax for use in FeatureAggregations
39+
40+
:param length: The length of time for the window
41+
:param window: The window type as defined in WindowAgg.get_valid()
42+
:return: (str) the proper window aggregation syntax
43+
44+
:Example:
45+
.. code-block:: python
46+
47+
48+
WindowAgg.get_window(5, WindowAgg.SECOND) -> '5s'
49+
WindowAgg.get_window(10, WindowAgg.MONTH) -> '10mn'
50+
"""
51+
assert window in AggWindow.get_valid(), f'The provided window {window} is not valid. ' \
52+
f'Use one of {AggWindow.get_valid()}'
53+
assert length > 0 and type(length) == int, f'Length must be a positive, nonzero integer, but got {length}'
54+
return f'{length}{window}'

splicemachine/features/utils/http_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ class Endpoints:
3030
"""
3131
DEPLOYMENTS: str = "deployments"
3232
FEATURES: str = "features"
33+
FEATURE_DETAILS: str = "feature-details"
3334
FEATURE_SETS: str = "feature-sets"
34-
FEATURE_SET_DESCRIPTIONS: str = "feature-set-descriptions"
35+
FEATURE_SET_DETAILS: str = "feature-set-details"
3536
DEPLOY_FEATURE_SET: str = "deploy-feature-set"
3637
FEATURE_VECTOR: str = "feature-vector"
3738
FEATURE_VECTOR_SQL: str = "feature-vector-sql"
@@ -40,10 +41,15 @@ class Endpoints:
4041
TRAINING_SET_FROM_DEPLOYMENT: str = "training-set-from-deployment"
4142
TRAINING_SET_FROM_VIEW: str = "training-set-from-view"
4243
TRAINING_VIEWS: str = "training-views"
43-
TRAINING_VIEW_DESCRIPTIONS: str = "training-view-descriptions"
44+
TRAINING_VIEW_DETAILS: str = "training-view-details"
4445
TRAINING_VIEW_FEATURES: str = "training-view-features"
4546
TRAINING_VIEW_ID: str = "training-view-id"
4647
SUMMARY: str = "summary"
48+
SOURCE: str = "source"
49+
AGG_FEATURE_SET_FROM_SOURCE: str = 'agg-feature-set-from-source'
50+
BACKFILL_SQL: str = 'backfill-sql'
51+
BACKFILL_INTERVALS: str = 'backfill-intervals'
52+
PIPELINE_SQL: str = 'pipeline-sql'
4753

4854
def make_request(url: str, endpoint: str, method: str, auth: HTTPBasicAuth,
4955
params: Optional[Dict[str, Any]] = None,

0 commit comments

Comments
 (0)