Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit

Permalink
Add support for run_name parameter (#24)
Browse files Browse the repository at this point in the history
Removed duplicate code and added support for adding a run_name to a new run instead of a randomly generated string
  • Loading branch information
Epstein authored Oct 2, 2019
1 parent 3025af8 commit 68e63b2
Showing 1 changed file with 16 additions and 51 deletions.
67 changes: 16 additions & 51 deletions splicemachine/ml/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,17 @@ def get_pod_uri(pod, port, pod_count=0, testing=False):
raise KeyError(
"Uh Oh! MLFLOW_URL variable was not found... are you running in the Cloud service?")

===

from mlflow.exceptions import MlflowException

def _get_user():
"""
Get the current logged in user to
Jupyter
:return: (str) name of the logged in user
"""
try:
uname = env_vars['JUPYTERHUB_USER']
uname = env_vars.get('JUPYTERHUB_USER') or env_vars['USER']
return uname
except KeyError:
raise Exception("Could not determine current running user. Running MLManager outside of Splice Machine Cloud Jupyter "
Expand All @@ -59,7 +58,6 @@ def _readable_pipeline_stage(pipeline_stage):
def _get_stages(pipeline):
"""
Extract the stages from a fit or unfit pipeline
:param pipeline: a fit or unfit Spark pipeline
:return: stages list
"""
Expand Down Expand Up @@ -200,7 +198,7 @@ def wrapped(self, *args, **kwargs):
raise Exception("Please either use set_active_experiment or create_experiment "
"to set an active experiment before running this function")
elif not self.active_run:
raise Exception("Please either use set_active_run or create_run to set an active "
raise Exception("Please either use set_active_run or start_run to set an active "
"run before running this function")
else:
return func(self, *args, **kwargs)
Expand Down Expand Up @@ -264,41 +262,11 @@ def create_experiment(self, experiment_name, reset=False):
self.active_experiment = self.get_experiment_by_name(experiment_name)
print("Set experiment id=" + str(experiment_id) + " to the active experiment")

def create_experiment(self, experiment_name, reset=False):
"""
Create a new experiment. If the experiment
already exists, it will be set to active experiment.
If the experiment doesn't exist, it will be created
and set to active. If the reset option is set to true
(please use with caution), the runs within the existing
experiment will be deleted
:param experiment_name: (str) the name of the experiment to create
:param reset: (bool) whether or not to overwrite the existing run
"""
experiment = self.get_experiment_by_name(experiment_name)
if experiment:
print("Experiment " + experiment_name + " already exists... setting to active experiment")
self.active_experiment = experiment
print("Active experiment has id " + str(experiment.id))
if reset:
print("Keyword argument \"reset\" was set to True. Overwriting experiment and its associated runs...")
experiment_id = self.active_experiment.experiment_id
associated_runs = self.list_run_infos(experiment_id)
for run in associated_runs:
print("Deleting run with UUID " + run.run_uuid)
manager.delete_run(run.run_uuid)
print("Successfully overwrote experiment")
else:
experiment_id = super(MLManager, self).create_experiment(experiment_name)
print("Created experiment w/ id=" + str(experiment_id))
self.set_active_experiment(experiment_id)


def set_active_experiment(self, experiment_name):
"""
Set the active experiment of which all new runs will be created under
Does not apply to already created runs
:param experiment_name: either an integer (experiment id) or a string (experiment name)
"""

Expand All @@ -315,7 +283,7 @@ def set_active_run(self, run_id):
"""
self.active_run = self.get_run(run_id)

def start_run(self, tags=None, experiment_id=None):
def start_run(self, tags=None, run_name=None, experiment_id=None, nested=False):
"""
Create a new run in the active experiment and set it to be active
:param tags: a dictionary containing metadata about the current run.
Expand All @@ -327,7 +295,7 @@ def start_run(self, tags=None, experiment_id=None):
:param run_name: an optional name for the run to show up in the MLFlow UI
:param experiment_id: if this is specified, the experiment id of this
will override the active run.
:param nester: Controls whether run is nested in parent run. True creates a nest run
"""
if experiment_id:
new_run_exp_id = experiment_id
Expand All @@ -336,14 +304,21 @@ def start_run(self, tags=None, experiment_id=None):
new_run_exp_id = self.active_experiment.experiment_id
else:
new_run_exp_id = 0
self.set_active_experiment(new_run_exp_id)
try:
self.set_active_experiment(new_run_exp_id)
except MlflowException:
raise MlflowException("There are no experiements available yet. Please create an experiment before starting a run")

if not tags:
tags = {}

tags['mlflow.user'] = _get_user()

self.active_run = super(MLManager, self).create_run(new_run_exp_id, tags=tags)
if run_name:
manager.set_tag('mlflow.runName',run_name)
print(f'Setting {run_name} to active run')


def get_run(self, run_id):
"""
Expand Down Expand Up @@ -396,9 +371,11 @@ def set_tag(self, *args, **kwargs):
@check_active
def set_tags(self, tags):
"""
Log a list of tags in order
Log a list of tags in order or a dictionary of tags
:param params: a list of tuples containing tags mapped to tag values
"""
if isinstance(tags,dict):
tags = list(tags.items())
for tag in tags:
self.set_tag(*tag)

Expand Down Expand Up @@ -480,7 +457,6 @@ def _is_spark_model(spark_object):
is a model, it will return True, if it is a
pipeline model is will return False.
Otherwise, it will throw an exception
:param spark_object: (Model) Spark object to check
:return: (bool) whether or not the object is a model
:exception: (Exception) throws an error if it is not either
Expand Down Expand Up @@ -535,11 +511,9 @@ def log_pipeline_stages(self, pipeline):
"""
Log the human-friendly names of each stage in
a Spark pipeline.
*Warning*: With a big pipeline, this could result in
a lot of parameters in MLFlow. It is probably best
to log them yourself, so you can ensure useful tracking
:param pipeline: the fitted/unfit pipeline object
"""

Expand All @@ -551,7 +525,6 @@ def log_pipeline_stages(self, pipeline):
def _find_first_input_by_output(dictionary, value):
"""
Find the first input column for a given column
:param dictionary: dictionary to search
:param value: column
:return: None if not found, otherwise first column
Expand All @@ -566,7 +539,6 @@ def log_feature_transformations(self, unfit_pipeline):
"""
Log the preprocessing transformation sequence
for every feature in the UNFITTED Spark pipeline
:param unfit_pipeline: UNFITTED spark pipeline!!
"""
transformations = defaultdict(lambda: [[], None]) # transformations, outputColumn
Expand Down Expand Up @@ -596,7 +568,6 @@ def start_timer(self, timer_name):
Start a given timer with the specified
timer name, which will be logged when the
run is stopped
:param timer_name: the name to call the timer (will appear in MLFlow UI)
"""
self.timer_name = timer_name
Expand Down Expand Up @@ -626,10 +597,8 @@ def log_evaluator_metrics(self, splice_evaluator):
"""
Takes an Splice evaluator and logs
all of the associated metrics with it
:param splice_evaluator: a Splice evaluator (from
splicemachine.ml.utilities package in pysplice)
:return: retrieved metrics dict
"""
results = splice_evaluator.get_results('dict')
Expand Down Expand Up @@ -705,7 +674,6 @@ def download_artifact(self, name, local_path, run_id=None):
Download the artifact at the given
run id (active default) + name
to the local path
:param name: (str) artifact name to load
(with respect to the run)
:param local_path: (str) local path to download the
Expand Down Expand Up @@ -745,7 +713,6 @@ def login_director(self, username, password):
"""
Login to MLmanager Director so we can
submit jobs
:param username: (str) database username
:param password: (str) database password
"""
Expand All @@ -756,7 +723,6 @@ def login_director(self, username, password):
def _initiate_job(self, payload, endpoint):
"""
Send a job to the initiation endpoint
:param payload: (dict) JSON payload for POST request
:param endpoint: (str) REST endpoint to target
:return: (str) Response text from request
Expand Down Expand Up @@ -868,7 +834,6 @@ def deploy_azure(self, endpoint_name, resource_group, workspace, run_id=None, re
cpu_cores=0.1, allocated_ram=0.5, model_name=None):
"""
Deploy a given run to AzureML.
:param endpoint_name: (str) the name of the endpoint in AzureML when deployed to
Azure Container Services. Must be unique.
:param resource_group: (str) Azure Resource Group for model. Automatically created if
Expand Down

0 comments on commit 68e63b2

Please sign in to comment.