Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Dbaas 4183 (#76)
Browse files Browse the repository at this point in the history
* model cols fix + better model checking for spark

* fix for issue 70 and 73

* fix in case user doesn't pass in model_cols

* py4j version

* better exception for serializetobundle
  • Loading branch information
Ben Epstein authored Aug 10, 2020
1 parent 9faa3d1 commit 7b24863
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 12 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
py4j==0.10.8.1
py4j==0.10.7
pytest==5.1.3
mlflow==1.6.0
mleap==0.15.0
Expand All @@ -11,7 +11,7 @@ numpy==1.18.2
pandas==1.0.3
scipy==1.4.1
tensorflow==2.2.0
pyspark==2.4.0
pyspark
h2o-pysparkling-2.4==3.28.1.2-1
sphinx-tabs
IPython
1 change: 1 addition & 0 deletions splicemachine/mlflow_support/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ class ModelStatuses():
deployed: str = 'DEPLOYED'
deleted: str = 'DELETED'
SUPPORTED_STATUSES = [deployed, deleted]

18 changes: 13 additions & 5 deletions splicemachine/mlflow_support/mlflow_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,10 @@ def _deploy_db(db_schema_name,
Will ONLY be used if the table does not exist and a dataframe is passed in
:param model_cols: (List[str]) The columns from the table to use for the model. If None, all columns in the table
will be passed to the model. If specified, the columns will be passed to the model
IN THAT ORDER. The columns passed here must exist in the table.
:param classes: (List[str]) The classes (prediction labels) for the model being deployed.\n
IN THAT ORDER. The columns passed here must exist in the table. If creating the
table from a dataframe, the table will be created from the columns in the DF, not
model_cols. model_cols is only used at prediction time
:param classes: (List[str]) The classes (prediction labels) for the model being deployed.
NOTE: If not supplied, the table will have default column names for each class
:param sklearn_args: (dict{str: str}) Prediction options for sklearn models: \n
* Available key value options: \n
Expand Down Expand Up @@ -703,6 +705,8 @@ def _deploy_db(db_schema_name,

schema_table_name = f'{db_schema_name}.{db_table_name}'

# Feature columns are all of the columns of the table, model_cols are the subset of feature columns that are used \
# in predictions. schema_types contains all columns from feature_columns
feature_columns, schema_types = get_feature_columns_and_types(mlflow._splice_context, df, create_model_table,
model_cols, schema_table_name)

Expand All @@ -725,7 +729,9 @@ def _deploy_db(db_schema_name,
# Create the schema of the table (we use this a few times)
schema_str = ''
for i in feature_columns:
schema_str += f'\t{i} {CONVERSIONS[schema_types[str(i)]]},'
spark_data_type = schema_types[str(i)]
assert spark_data_type in CONVERSIONS, f'Type {spark_data_type} not supported for table creation. Remove column and try again'
schema_str += f'\t{i} {CONVERSIONS[spark_data_type]},'

try:
# Create/Alter table 1: DATA
Expand All @@ -739,11 +745,13 @@ def _deploy_db(db_schema_name,

# Create Trigger 1: model prediction
print('Creating model prediction trigger ...', end=' ')
# If model_cols were passed in, we'll use them here. Otherwise, use all of the columns (stored in feature_columns)
model_cols = model_cols or feature_columns
if model_type in (H2OModelType.KEY_VALUE, SklearnModelType.KEY_VALUE, KerasModelType.KEY_VALUE):
create_vti_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, feature_columns, schema_types,
create_vti_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, model_cols, schema_types,
schema_str, primary_key, classes, model_type, sklearn_args, pred_threshold, verbose)
else:
create_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, feature_columns, schema_types,
create_prediction_trigger(mlflow._splice_context, schema_table_name, run_id, model_cols, schema_types,
schema_str, primary_key, model_type, verbose)
print('Done.')

Expand Down
25 changes: 20 additions & 5 deletions splicemachine/mlflow_support/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from pyspark.ml.base import Model as SparkModel
from pyspark.ml.feature import IndexToString
from pyspark.ml.wrapper import JavaModel
from pyspark.ml import classification as spark_classification, regression as spark_regression, \
clustering as spark_clustering, recommendation as spark_recommendation
from pyspark.sql.types import StructType
from pyspark.sql.dataframe import DataFrame as SparkDF
from pandas.core.frame import DataFrame as PandasDF
Expand Down Expand Up @@ -452,6 +454,8 @@ def prep_model_for_deployment(splice_context: PySpliceContext,


class SparkUtils:
MODEL_MODULES = [spark_classification.__name__, spark_recommendation.__name__, spark_clustering.__name__,
spark_regression.__name__]
@staticmethod
def get_stages(pipeline: PipelineModel):
"""
Expand Down Expand Up @@ -531,9 +535,10 @@ def get_model_stage(pipeline: PipelineModel) -> SparkModel:
for i in SparkUtils.get_stages(pipeline):
# StandardScaler is also implemented as a base Model and JavaModel for some reason but that's not a model
# So we need to make sure the stage isn't a feature
if isinstance(i, SparkModel) and isinstance(i, JavaModel) and 'feature' not in i.__module__:
if getattr(i, '__module__', None) in SparkUtils.MODEL_MODULES:
return i
raise AttributeError('Could not find model stage in Pipeline! Is this a fitted spark Pipeline?')
raise AttributeError("It looks like you're trying to deploy a pipeline without a supported Spark Model. Supported Spark models "
"are listed here: https://mleap-docs.combust.ml/core-concepts/transformers/support.html")

@staticmethod
def try_get_class_labels(pipeline: PipelineModel):
Expand Down Expand Up @@ -707,6 +712,12 @@ def prep_model_for_deployment(splice_context: PySpliceContext,
:param classes:
:return:
"""

# Check if model is not a pipeline. This would occur when user logs a Pipeline with 1 stage
if not SparkUtils.is_spark_pipeline(fittedPipe):
print('You are deploying a singular Spark Model. It will be deployed as a Pipeline with 1 stage. This will'
'not affect expected behavior or outcomes.')
fittedPipe = PipelineModel(stages=[fittedPipe])
# Get model type
model_type = SparkUtils.get_model_type(fittedPipe)
# See if the labels are in an IndexToString stage. Will either return List[str] or empty []
Expand Down Expand Up @@ -892,7 +903,13 @@ def get_mleap_model(splice_context: PySpliceContext,
# Serialize the Spark model into Mleap format
if f'{run_id}.zip' in rbash('ls /tmp').read():
remove(f'/tmp/{run_id}.zip')
fittedPipe.serializeToBundle(f"jar:file:///tmp/{run_id}.zip", df)

try:
fittedPipe.serializeToBundle(f"jar:file:///tmp/{run_id}.zip", df)
except:
m = getattr(fittedPipe, '__class__', 'UnknownModel')
raise SpliceMachineException(f'It look like your model type {m} is not supported. Supported models are listed'
f'here https://mleap-docs.combust.ml/core-concepts/transformers/support.html') from None

jvm = splice_context.jvm
java_import(jvm, "com.splicemachine.mlrunner.FileRetriever")
Expand Down Expand Up @@ -1292,8 +1309,6 @@ def get_feature_columns_and_types(splice_ctx: PySpliceContext,
assert type(df) in (SparkDF, PandasDF), "Dataframe must be a PySpark or Pandas dataframe!"
if type(df) == PandasDF:
df = splice_ctx.spark_session.createDataFrame(df)
if model_cols:
df = df.select(*model_cols)
feature_columns = df.columns
# Get the datatype of each column in the dataframe
schema_types = {str(i.name): re.sub("[0-9,()]", "", str(i.dataType)) for i in df.schema}
Expand Down

0 comments on commit 7b24863

Please sign in to comment.