Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit 48c0a2a

Browse files
author
Epstein
authored
Release 2.1.0 k8 (#56)
* Dbaas 3689 (#52) * DBAAS-3689: using potential IndexToString to try to get class labels for spark pipeline * more specific check on model * more specific check on model * edge case * escaping labels * more escaping * wrong esacping :/ * incorrect assumption of spark model * code cleanup and pass logic to scala (#53) * function cleanup and pass logic to scala * python to scala list * Dbaas 3804 (#55) * initial sklearn deploy code * more config for sklearn * support pandas df * typing * syntax * returning file_ext but shouldn't * fixing model insert for sklearn * signature object needs parameters * predict_args not predict_params * missing logic for prediction table * sql formatting * edge cases * elif to if * base case * more work around pipelines * more validation of skearln_args * set comparison * sklearn_args cleanup * fix for pipeline model type function * need file ext
1 parent 22a52fa commit 48c0a2a

File tree

6 files changed

+410
-164
lines changed

6 files changed

+410
-164
lines changed

splicemachine/mlflow_support/constants.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
class DBLibraries():
77
MLeap = 'mleap'
88
H2OMOJO = 'h2omojo'
9+
SKLearn = 'sklearn'
910
SUPPORTED_LIBRARIES = [MLeap, H2OMOJO]
1011

1112
class H2OModelType(Enum): # Based off https://github.com/h2oai/h2o-3/blob/master/h2o-genmodel/src/main/java/hex/ModelCategory.java
@@ -24,6 +25,16 @@ class SparkModelType(Enum):
2425
CLUSTERING_WITH_PROB = 2
2526
CLUSTERING_WO_PROB = 3
2627

28+
class SklearnModelType(Enum):
29+
"""
30+
Model Types for SKLearn models
31+
Sklearn isn't as well defined in their model categories, so we are going to classify them by their return values
32+
"""
33+
POINT_PREDICTION_REG = 0
34+
POINT_PREDICTION_CLF = 1
35+
KEY_VALUE = 2
36+
37+
2738
class FileExtensions():
2839
"""
2940
Class containing names for

splicemachine/mlflow_support/mlflow_support.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from requests.auth import HTTPBasicAuth
1111
from mleap.pyspark import spark_support
1212
import pyspark
13-
from pyspark.ml.base import Estimator as SparkModel
1413
import sklearn
1514
from sklearn.base import BaseEstimator as ScikitModel
1615
from tensorflow import __version__ as tf_version
@@ -21,6 +20,8 @@
2120
from splicemachine.mlflow_support.utilities import *
2221
from splicemachine.spark.context import PySpliceContext
2322
from splicemachine.spark.constants import CONVERSIONS
23+
from pyspark.sql.dataframe import DataFrame as SparkDF
24+
from pandas.core.frame import DataFrame as PandasDF
2425

2526
_TESTING = env_vars.get("TESTING", False)
2627
_TRACKING_URL = get_pod_uri("mlflow", "5001", _TESTING)
@@ -277,9 +278,9 @@ def _log_model_params(pipeline_or_model):
277278
for param in verbose_parameters:
278279
try:
279280
value = float(verbose_parameters[param])
280-
mlflow.log_param('Hyperparameter- ' + param.split('-')[0], value)
281+
mlflow.log_param(param.split('-')[0], value)
281282
except:
282-
mlflow.log_param('Hyperparameter- ' + param.split('-')[0], verbose_parameters[param])
283+
mlflow.log_param(param.split('-')[0], verbose_parameters[param])
283284

284285

285286
@_mlflow_patch('timer')
@@ -292,12 +293,14 @@ def _timer(timer_name, param=True):
292293
:return:
293294
"""
294295
try:
296+
print(f'Starting Code Block {timer_name}...', end=' ')
295297
t0 = time.time()
296298
yield
297299
finally:
298300
t1 = time.time() - t0
299301
# Syntactic Sugar
300302
(mlflow.log_param if param else mlflow.log_metric)(timer_name, t1)
303+
print('Done.')
301304
print(
302305
f"Code Block {timer_name}:\nRan in {round(t1, 3)} secs\nRan in {round(t1 / 60, 3)} mins"
303306
)
@@ -320,7 +323,7 @@ def _download_artifact(name, local_path, run_id=None):
320323
file_ext = path.splitext(local_path)[1]
321324

322325
run_id = run_id or mlflow.active_run().info.run_uuid
323-
blob_data, f_etx = SparkUtils.retrieve_artifact_stream(mlflow._splice_context, run_id, name)
326+
blob_data, f_ext = SparkUtils.retrieve_artifact_stream(mlflow._splice_context, run_id, name)
324327

325328
if not file_ext: # If the user didn't provide the file (ie entered . as the local_path), fill it in for them
326329
local_path += f'/{name}.{f_etx}'
@@ -508,31 +511,50 @@ def _deploy_azure(endpoint_name, resource_group, workspace, run_id=None, region=
508511
return _initiate_job(request_payload, '/api/rest/initiate')
509512

510513
@_mlflow_patch('deploy_database')
511-
def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
512-
run_id=None, classes=None, verbose=False, replace=False) -> None:
513-
"""
514-
Function to deploy a trained (Spark for now) model to the Database. This creates 2 tables: One with the features of the model, and one with the prediction and metadata.
514+
def _deploy_db(fittedModel,
515+
df,
516+
db_schema_name,
517+
db_table_name,
518+
primary_key,
519+
run_id: str=None,
520+
classes=None,
521+
sklearn_args={},
522+
verbose=False,
523+
replace=False) -> None:
524+
"""
525+
Function to deploy a trained (currently Spark, Sklearn or H2O) model to the Database.
526+
This creates 2 tables: One with the features of the model, and one with the prediction and metadata.
515527
They are linked with a column called MOMENT_ID
516528
517-
:param fittedPipe: (spark pipeline or model) The fitted pipeline to deploy
529+
:param fittedModel: (ML pipeline or model) The fitted pipeline to deploy
518530
:param df: (Spark DF) The dataframe used to train the model
519531
NOTE: this dataframe should NOT be transformed by the model. The columns in this df are the ones
520532
that will be used to create the table.
521533
:param db_schema_name: (str) the schema name to deploy to. If None, the currently set schema will be used.
522534
:param db_table_name: (str) the table name to deploy to. If none, the run_id will be used for the table name(s)
523535
:param primary_key: (List[Tuple[str, str]]) List of column + SQL datatype to use for the primary/composite key
524536
:param run_id: (str) The active run_id
525-
:param classes: List[str] The classes (prediction values) for the model being deployed.
526-
NOTE: If not supplied, the table will have column named c0,c1,c2 etc for each class
527-
:param verbose: bool Whether or not to print out the queries being created. Helpful for debugging
537+
:param classes: (List[str]) The classes (prediction labels) for the model being deployed.
538+
NOTE: If not supplied, the table will have default column names for each class
539+
:param sklearn_args: (dict{str: str}) Prediction options for sklearn models
540+
Available key value options:
541+
'predict_call': 'predict', 'predict_proba', or 'transform'
542+
- Determines the function call for the model
543+
If blank, predict will be used
544+
(or transform if model doesn't have predict)
545+
'predict_args': 'return_std' or 'return_cov' - For Bayesian and Gaussian models
546+
Only one can be specified
547+
If the model does not have the option specified, it will be ignored.
548+
:param verbose: (bool) Whether or not to print out the queries being created. Helpful for debugging
549+
:param replace: (bool) whether or not to replace a currently existing model. This param does not yet work
528550
529551
This function creates the following:
530552
* Table (default called DATA_{run_id}) where run_id is the run_id of the mlflow run associated to that model. This will have a column for each feature in the feature vector as well as a MOMENT_ID as primary key
531553
* Table (default called DATA_{run_id}_PREDS) That will have the columns:
532554
USER which is the current user who made the request
533555
EVAL_TIME which is the CURRENT_TIMESTAMP
534556
MOMENT_ID same as the DATA table to link predictions to rows in the table
535-
PREDICTION. The prediction of the model. If the :classes: param is not filled in, this will be c0,c1,c2 etc for classification models
557+
PREDICTION. The prediction of the model. If the :classes: param is not filled in, this will be default values for classification models
536558
A column for each class of the predictor with the value being the probability/confidence of the model if applicable
537559
* A trigger that runs on (after) insertion to the data table that runs an INSERT into the prediction table,
538560
calling the PREDICT function, passing in the row of data as well as the schema of the dataset, and the run_id of the model to run
@@ -541,10 +563,14 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
541563
"""
542564
_check_for_splice_ctx()
543565
classes = classes if classes else []
566+
544567
run_id = run_id if run_id else mlflow.active_run().info.run_uuid
545568
db_table_name = db_table_name if db_table_name else f'data_{run_id}'
546569
schema_table_name = f'{db_schema_name}.{db_table_name}' if db_schema_name else db_table_name
547-
assert type(df) is pyspark.sql.dataframe.DataFrame, "Dataframe must be a PySpark dataframe!"
570+
assert type(df) in (SparkDF, PandasDF), "Dataframe must be a PySpark or Pandas dataframe!"
571+
572+
if type(df) == PandasDF:
573+
df = mlflow._splice_context.spark_session.createDataFrame(df)
548574

549575
feature_columns = df.columns
550576
# Get the datatype of each column in the dataframe
@@ -553,14 +579,13 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
553579
# Make sure primary_key is valid format
554580
validate_primary_key(primary_key)
555581

556-
557-
# library = get_model_library(run_id)
558-
typ = str(type(fittedPipe))
559-
library = 'mleap' if 'pyspark' in typ else 'h2omojo' if 'h2o' in typ else None
582+
library = get_model_library(fittedModel)
560583
if library == DBLibraries.MLeap:
561-
modelType, classes = SparkUtils.prep_model_for_deployment(mlflow._splice_context, fittedPipe, df, classes, run_id)
584+
model_type, classes = SparkUtils.prep_model_for_deployment(mlflow._splice_context, fittedModel, df, classes, run_id)
562585
elif library == DBLibraries.H2OMOJO:
563-
modelType, classes = H2OUtils.prep_model_for_deployment(mlflow._splice_context, fittedPipe, classes, run_id)
586+
model_type, classes = H2OUtils.prep_model_for_deployment(mlflow._splice_context, fittedModel, classes, run_id)
587+
elif library == DBLibraries.SKLearn:
588+
model_type, classes = SKUtils.prep_model_for_deployment(mlflow._splice_context, fittedModel, classes, run_id, sklearn_args)
564589
else:
565590
raise SpliceMachineException('Model type is not supported for in DB Deployment!. '
566591
'Currently, model must be H2O or Spark.')
@@ -581,23 +606,24 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
581606

582607
# Create table 2: DATA_PREDS
583608
print('Creating prediction table ...', end=' ')
584-
create_data_preds_table(mlflow._splice_context, run_id, schema_table_name, classes, primary_key, modelType, verbose)
609+
create_data_preds_table(mlflow._splice_context, run_id, schema_table_name, classes, primary_key, model_type, verbose)
585610
print('Done.')
586611

587612
# Create Trigger 1: model prediction
588613
print('Creating model prediction trigger ...', end=' ')
589-
if modelType == H2OModelType.KEY_VALUE_RETURN:
590-
create_vti_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, feature_columns, schema_types, schema_str, primary_key, classes, verbose)
614+
if model_type in (H2OModelType.KEY_VALUE_RETURN, SklearnModelType.KEY_VALUE):
615+
create_vti_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, feature_columns,
616+
schema_types, schema_str, primary_key, classes, model_type, sklearn_args, verbose)
591617
else:
592618
create_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, feature_columns, schema_types,
593-
schema_str, primary_key, modelType, verbose)
619+
schema_str, primary_key, model_type, verbose)
594620
print('Done.')
595621

596-
if modelType in (SparkModelType.CLASSIFICATION, SparkModelType.CLUSTERING_WITH_PROB,
622+
if model_type in (SparkModelType.CLASSIFICATION, SparkModelType.CLUSTERING_WITH_PROB,
597623
H2OModelType.CLASSIFICATION):
598624
# Create Trigger 2: model parsing
599625
print('Creating parsing trigger ...', end=' ')
600-
create_parsing_trigger(mlflow._splice_context, schema_table_name, primary_key, run_id, classes, modelType, verbose)
626+
create_parsing_trigger(mlflow._splice_context, schema_table_name, primary_key, run_id, classes, model_type, verbose)
601627
print('Done.')
602628
except Exception as e:
603629
import traceback

0 commit comments

Comments
 (0)