1515import h2o
1616import pyspark
1717
18+ from splicemachine .mlflow_support .constants import *
1819from splicemachine .mlflow_support .utilities import *
1920from splicemachine .spark .context import PySpliceContext
2021from splicemachine .spark .constants import CONVERSIONS
@@ -67,7 +68,7 @@ def _check_for_splice_ctx():
6768 spark operations to take place
6869 """
6970
70- if not getattr (mlflow , '_splice_context' ):
71+ if not hasattr (mlflow , '_splice_context' ):
7172 raise SpliceMachineException (
7273 "You must run `mlflow.register_splice_context(py_splice_context) before "
7374 "you can run this mlflow operation!"
@@ -117,8 +118,8 @@ def _lm(key, value):
117118@_mlflow_patch ('log_model' )
118119def _log_model (model , name = 'model' ):
119120 """
120- Log a fitted spark pipeline or model
121- :param model: (PipelineModel or Model) is the fitted Spark Model/Pipeline to store
121+ Log a fitted spark pipeline/model or H2O model
122+ :param model: (PipelineModel or Model) is the fitted Spark Model/Pipeline or H2O model to store
122123 with the current run
123124 :param name: (str) the run relative name to store the model under
124125 """
@@ -319,7 +320,7 @@ def _load_model(run_id=None, name='model'):
319320 with open ('/tmp/model' , 'wb' ) as file :
320321 file .write (model_blob )
321322 model = h2o .load_model ('/tmp/model' )
322- rmtree ('/tmp/model' )
323+ remove ('/tmp/model' )
323324 return model
324325
325326
@@ -361,7 +362,7 @@ def _initiate_job(payload, endpoint):
361362 :param endpoint: (str) REST endpoint to target
362363 :return: (str) Response text from request
363364 """
364- if not getattr (mlflow , '_basic_auth' ):
365+ if not hasattr (mlflow , '_basic_auth' ):
365366 raise Exception (
366367 "You have not logged into MLManager director."
367368 " Please run mlflow.login_director(username, password)"
@@ -473,7 +474,6 @@ def _deploy_azure(endpoint_name, resource_group, workspace, run_id=None, region=
473474 }
474475 return _initiate_job (request_payload , '/api/rest/initiate' )
475476
476-
477477@_mlflow_patch ('deploy_database' )
478478def _deploy_db (fittedPipe , df , db_schema_name , db_table_name , primary_key ,
479479 run_id = None , classes = None , verbose = False , replace = False ) -> None :
@@ -511,6 +511,7 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
511511 run_id = run_id if run_id else mlflow .active_run ().info .run_uuid
512512 db_table_name = db_table_name if db_table_name else f'data_{ run_id } '
513513 schema_table_name = f'{ db_schema_name } .{ db_table_name } ' if db_schema_name else db_table_name
514+ assert type (df ) is pyspark .sql .dataframe .DataFrame , "Dataframe must be a PySpark dataframe!"
514515
515516 feature_columns = df .columns
516517 # Get the datatype of each column in the dataframe
@@ -519,37 +520,25 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
519520 # Make sure primary_key is valid format
520521 validate_primary_key (primary_key )
521522
522- # Get model type
523- modelType = SparkUtils .get_model_type (fittedPipe )
524523
525- print (f'Deploying model { run_id } to table { schema_table_name } ' )
526-
527- if classes :
528- if modelType not in (SparkModelType .CLASSIFICATION , SparkModelType .CLUSTERING_WITH_PROB ):
529- print ('Prediction labels found but model is not type Classification. Removing labels' )
530- classes = None
531- else :
532- # handling spaces in class names
533- classes = [c .replace (' ' , '_' ) for c in classes ]
534- print (
535- f'Prediction labels found. Using { classes } as labels for predictions { list (range (0 , len (classes )))} respectively' )
524+ # library = get_model_library(run_id)
525+ typ = str (type (fittedPipe ))
526+ library = 'mleap' if 'pyspark' in typ else 'h2omojo' if 'h2o' in typ else None
527+ if library == DBLibraries .MLeap :
528+ modelType , classes = SparkUtils .prep_model_for_deployment (mlflow ._splice_context , fittedPipe , df , classes , run_id )
529+ elif library == DBLibraries .H2OMOJO :
530+ modelType , classes = H2OUtils .prep_model_for_deployment (mlflow ._splice_context , fittedPipe , classes , run_id )
536531 else :
537- if modelType in (SparkModelType .CLASSIFICATION , SparkModelType .CLUSTERING_WITH_PROB ):
538- # Add a column for each class of the prediction to output the probability of the prediction
539- classes = [f'C{ i } ' for i in range (SparkUtils .get_num_classes (fittedPipe ))]
540-
541- # See if the df passed in has already been transformed.
542- # If not, transform it
543- if 'prediction' not in df .columns :
544- df = fittedPipe .transform (df )
545- # Get the Mleap model and insert it into the MODELS table
546- mleap_model = get_mleap_model (mlflow ._splice_context , fittedPipe , df , run_id )
547- insert_mleap_model (mlflow ._splice_context , run_id , mleap_model )
532+ raise SpliceMachineException ('Model type is not supported for in DB Deployment!. '
533+ 'Currently, model must be H2O or Spark.' )
534+
535+
536+ print (f'Deploying model { run_id } to table { schema_table_name } ' )
548537
549538 # Create the schema of the table (we use this a few times)
550539 schema_str = ''
551540 for i in feature_columns :
552- schema_str += f'\t { i } { CONVERSIONS [schema_types [str (i )]]} ,\n '
541+ schema_str += f'\t { i } { CONVERSIONS [schema_types [str (i )]]} ,'
553542
554543 try :
555544 # Create table 1: DATA
@@ -562,17 +551,20 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
562551 create_data_preds_table (mlflow ._splice_context , run_id , schema_table_name , classes , primary_key , modelType , verbose )
563552 print ('Done.' )
564553
565- # Create Trigger 1: ( model prediction)
554+ # Create Trigger 1: model prediction
566555 print ('Creating model prediction trigger ...' , end = ' ' )
567- create_prediction_trigger (mlflow ._splice_context , schema_table_name , run_id , feature_columns , schema_types ,
568- schema_str ,
569- primary_key , modelType , verbose )
556+ if modelType == H2OModelType .KEY_VALUE_RETURN :
557+ create_vti_prediction_trigger (mlflow ._splice_context , schema_table_name , run_id , feature_columns , schema_types , schema_str , primary_key , classes , verbose )
558+ else :
559+ create_prediction_trigger (mlflow ._splice_context , schema_table_name , run_id , feature_columns , schema_types ,
560+ schema_str , primary_key , modelType , verbose )
570561 print ('Done.' )
571562
572- if modelType in (SparkModelType .CLASSIFICATION , SparkModelType .CLUSTERING_WITH_PROB ):
563+ if modelType in (SparkModelType .CLASSIFICATION , SparkModelType .CLUSTERING_WITH_PROB ,
564+ H2OModelType .CLASSIFICATION ):
573565 # Create Trigger 2: model parsing
574566 print ('Creating parsing trigger ...' , end = ' ' )
575- create_parsing_trigger (mlflow ._splice_context , schema_table_name , primary_key , run_id , classes , verbose )
567+ create_parsing_trigger (mlflow ._splice_context , schema_table_name , primary_key , run_id , classes , modelType , verbose )
576568 print ('Done.' )
577569 except Exception as e :
578570 import traceback
@@ -581,6 +573,7 @@ def _deploy_db(fittedPipe, df, db_schema_name, db_table_name, primary_key,
581573 if not verbose :
582574 print ('For more insight into the SQL statement that generated this error, rerun with verbose=True' )
583575 traceback .print_exc ()
576+ raise SpliceMachineException ('Model deployment failed.' )
584577
585578 print ('Model Deployed.' )
586579
0 commit comments