Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit aef15c3

Browse files
committed
Merge branch 'master' into spark2
2 parents 66c5db3 + 63ea350 commit aef15c3

File tree

4 files changed

+62
-21
lines changed

4 files changed

+62
-21
lines changed

splicemachine/features/feature_store.py

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,8 @@ def get_training_set_from_view(self, training_view: str, features: Union[List[Fe
298298
r = make_request(self._FS_URL, Endpoints.TRAINING_SET_FROM_VIEW, RequestType.POST, self._basic_auth, { "view": training_view },
299299
{ "features": features, "start_time": start_time, "end_time": end_time })
300300
sql = r["sql"]
301-
tvw = r["training_view"]
301+
tvw = TrainingView(**r["training_view"])
302+
features = [Feature(**f) for f in r["features"]]
302303

303304
# Link this to mlflow for model deployment
304305
if self.mlflow_ctx and not return_sql:
@@ -443,8 +444,8 @@ def describe_feature_sets(self) -> None:
443444

444445
print('Available feature sets')
445446
for desc in r:
446-
fset = FeatureSet(**desc["feature_set"])
447-
features = [Feature(**feature) for feature in desc["features"]]
447+
features = [Feature(**feature) for feature in desc.pop('features')]
448+
fset = FeatureSet(**desc)
448449
print('-' * 23)
449450
self._feature_set_describe(fset, features)
450451

@@ -466,8 +467,8 @@ def describe_feature_set(self, schema_name: str, table_name: str) -> None:
466467
if not descs: raise SpliceMachineException(
467468
f"Feature Set {schema_name}.{table_name} not found. Check name and try again.")
468469
desc = descs[0]
469-
fset = FeatureSet(**desc["feature_set"])
470-
features = [Feature(**feature) for feature in desc["features"]]
470+
features = [Feature(**feature) for feature in desc.pop("features")]
471+
fset = FeatureSet(**desc)
471472
self._feature_set_describe(fset, features)
472473

473474
def _feature_set_describe(self, fset: FeatureSet, features: List[Feature]):
@@ -487,8 +488,8 @@ def describe_training_views(self) -> None:
487488

488489
print('Available training views')
489490
for desc in r:
490-
tcx = TrainingView(**desc["training_view"])
491-
features = [Feature(**f) for f in desc["features"]]
491+
features = [Feature(**f) for f in desc.pop('features')]
492+
tcx = TrainingView(**desc)
492493
print('-' * 23)
493494
self._training_view_describe(tcx, features)
494495

@@ -504,8 +505,8 @@ def describe_training_view(self, training_view: str) -> None:
504505
descs = r
505506
if not descs: raise SpliceMachineException(f"Training view {training_view} not found. Check name and try again.")
506507
desc = descs[0]
507-
tcx = TrainingView(**desc['training_view'])
508-
feats = [Feature(**f) for f in desc['features']]
508+
feats = [Feature(**f) for f in desc.pop('features')]
509+
tcx = TrainingView(**desc)
509510
self._training_view_describe(tcx, feats)
510511

511512
def _training_view_describe(self, tcx: TrainingView, feats: List[Feature]):
@@ -533,16 +534,19 @@ def get_training_set_from_deployment(self, schema_name: str, table_name: str):
533534

534535
r = make_request(self._FS_URL, Endpoints.TRAINING_SET_FROM_DEPLOYMENT, RequestType.GET, self._basic_auth,
535536
{ "schema": schema_name, "table": table_name })
536-
metadata = r["metadata"]
537537

538-
sql = r["sql"]
539-
features = metadata['FEATURES'].split(',')
540-
tv_name = metadata['NAME']
541-
start_time = metadata['TRAINING_SET_START_TS']
542-
end_time = metadata['TRAINING_SET_END_TS']
538+
metadata = r['metadata']
539+
sql = r['sql']
540+
541+
tv_name = metadata['name']
542+
start_time = metadata['training_set_start_ts']
543+
end_time = metadata['training_set_end_ts']
544+
545+
tv = TrainingView(**r['training_view']) if 'training_view' in r else None
546+
features = [Feature(**f) for f in r['features']]
543547

544548
if self.mlflow_ctx:
545-
self.link_training_set_to_mlflow(features, start_time, end_time, tv_name)
549+
self.link_training_set_to_mlflow(features, start_time, end_time, tv)
546550
return self.splice_ctx.df(sql)
547551

548552
def remove_feature(self, name: str):
@@ -556,6 +560,28 @@ def remove_feature(self, name: str):
556560
"""
557561
make_request(self._FS_URL, Endpoints.FEATURES, RequestType.DELETE, self._basic_auth, { "name": name })
558562

563+
def get_deployments(self, schema_name: str = None, table_name: str = None, training_set: str = None):
564+
"""
565+
Returns a list of all (or specified) available deployments
566+
:param schema_name: model schema name
567+
:param table_name: model table name
568+
:param training_set: training set name
569+
:return: List[Deployment] the list of Deployments as dicts
570+
"""
571+
return make_request(self._FS_URL, Endpoints.DEPLOYMENTS, RequestType.GET, self._basic_auth,
572+
{ 'schema': schema_name, 'table': table_name, 'name': training_set })
573+
574+
def get_training_set_features(self, training_set: str = None):
575+
"""
576+
Returns a list of all features from an available Training Set, as well as details about that Training Set
577+
:param training_set: training set name
578+
:return: TrainingSet as dict
579+
"""
580+
r = make_request(self._FS_URL, Endpoints.TRAINING_SET_FEATURES, RequestType.GET, self._basic_auth,
581+
{ 'name': training_set })
582+
r['features'] = [Feature(**f) for f in r['features']]
583+
return r
584+
559585
def _retrieve_model_data_sets(self, schema_name: str, table_name: str):
560586
"""
561587
Returns the training set dataframe and model table dataframe for a given deployed model.
@@ -790,6 +816,7 @@ def link_training_set_to_mlflow(self, features: Union[List[Feature], List[str]],
790816

791817
self.mlflow_ctx._active_training_set: TrainingSet = ts
792818
ts._register_metadata(self.mlflow_ctx)
819+
793820

794821
def set_feature_store_url(self, url: str):
795822
self._FS_URL = url

splicemachine/features/training_set.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def _register_metadata(self, mlflow_ctx):
4848
"Training Set was logged to the current active run. If you call "
4949
"fs.get_training_set or fs.get_training_set_from_view before starting an "
5050
"mlflow run, all following runs will assume that Training Set to be the "
51-
"active Training Set, and will log the Training Set as metadata. For more "
52-
"information, refer to the documentation. If you'd like to use a new "
53-
"Training Set, end the current run, call one of the mentioned functions, "
54-
"and start your new run.") from None
51+
"active Training Set (until the next call to either of those functions), "
52+
"and will log the Training Set as metadata. For more information, "
53+
"refer to the documentation. If you'd like to use a new Training Set, "
54+
"end the current run, call one of the mentioned functions, and start "
55+
"your new run. Or, call mlflow.remove_active_training_set()") from None

splicemachine/features/utils/http_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ class Endpoints:
2828
"""
2929
Enum for Feature Store Endpoints
3030
"""
31+
DEPLOYMENTS: str = "deployments"
3132
FEATURES: str = "features"
3233
FEATURE_SETS: str = "feature-sets"
3334
FEATURE_SET_DESCRIPTIONS: str = "feature-set-descriptions"
3435
DEPLOY_FEATURE_SET: str = "deploy-feature-set"
3536
FEATURE_VECTOR: str = "feature-vector"
3637
FEATURE_VECTOR_SQL: str = "feature-vector-sql"
3738
TRAINING_SETS: str = "training-sets"
39+
TRAINING_SET_FEATURES: str = "training-set-features"
3840
TRAINING_SET_FROM_DEPLOYMENT: str = "training-set-from-deployment"
3941
TRAINING_SET_FROM_VIEW: str = "training-set-from-view"
4042
TRAINING_VIEWS: str = "training-views"

splicemachine/mlflow_support/mlflow_support.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,16 @@ def _start_run(run_id=None, tags=None, experiment_id=None, run_name=None, nested
439439

440440
return SpliceActiveRun(active_run)
441441

442+
@_mlflow_patch('remove_active_training_set')
443+
def _remove_active_training_set():
444+
"""
445+
Removes the active training set from mlflow. This function deletes mlflows active training set (retrieved from
446+
the feature store), which will in turn stop the automated logging of features to the active mlflow run. To recreate
447+
an active training set, call fs.get_training_set or fs.get_training_set_from_view in the Feature Store.
448+
"""
449+
if hasattr(mlflow,'_active_training_set'):
450+
del mlflow._active_training_set
451+
442452

443453
@_mlflow_patch('log_pipeline_stages')
444454
def _log_pipeline_stages(pipeline):
@@ -1003,7 +1013,8 @@ def apply_patches():
10031013
targets = [_register_feature_store, _register_splice_context, _lp, _lm, _timer, _log_artifact, _log_feature_transformations,
10041014
_log_model_params, _log_pipeline_stages, _log_model, _load_model, _download_artifact,
10051015
_start_run, _current_run_id, _current_exp_id, _deploy_aws, _deploy_azure, _deploy_db, _login_director,
1006-
_get_run_ids_by_name, _get_deployed_models, _deploy_kubernetes, _fetch_logs, _watch_job, _end_run, _set_mlflow_uri]
1016+
_get_run_ids_by_name, _get_deployed_models, _deploy_kubernetes, _fetch_logs, _watch_job, _end_run,
1017+
_set_mlflow_uri, _remove_active_training_set]
10071018

10081019
for target in targets:
10091020
gorilla.apply(gorilla.Patch(mlflow, target.__name__.lstrip('_'), target, settings=_GORILLA_SETTINGS))

0 commit comments

Comments
 (0)