diff --git a/.env.example b/.env.example index 3c63d969..36eb0c54 100644 --- a/.env.example +++ b/.env.example @@ -69,8 +69,8 @@ AML_CLUSTER_MAX_NODES_SCORING = '4' AML_CLUSTER_MIN_NODES_SCORING = '0' AML_CLUSTER_PRIORITY_SCORING = 'lowpriority' AML_REBUILD_ENVIRONMENT_SCORING = 'true' -BATCHSCORE_SCRIPT_PATH = 'scoring/parallel_batchscore.py' -BATCHSCORE_COPY_SCRIPT_PATH = 'scoring/parallel_batchscore_copyoutput.py' +BATCHSCORE_SCRIPT_PATH = 'diabetes_regression/scoring/parallel_batchscore.py' +BATCHSCORE_COPY_SCRIPT_PATH = 'diabetes_regression/scoring/parallel_batchscore_copyoutput.py' SCORING_DATASTORE_INPUT_CONTAINER = 'input' @@ -78,4 +78,7 @@ SCORING_DATASTORE_INPUT_FILENAME = 'diabetes_scoring_input.csv' SCORING_DATASTORE_OUTPUT_CONTAINER = 'output' SCORING_DATASTORE_OUTPUT_FILENAME = 'diabetes_scoring_output.csv' SCORING_DATASET_NAME = 'diabetes_scoring_ds' -SCORING_PIPELINE_NAME = 'diabetes-scoring-pipeline' \ No newline at end of file +SCORING_PIPELINE_NAME = 'diabetes-scoring-pipeline' + +# Observability +APP_INSIGHTS_CONNECTION_STRING = '' \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3ab04e2f..6902cfd3 100644 --- a/.gitignore +++ b/.gitignore @@ -109,3 +109,6 @@ condaenv.* .mypy_cache/ .DS_Store + +#pycharm +.idea \ No newline at end of file diff --git a/.pipelines/diabetes_regression-batchscoring-ci.yml b/.pipelines/diabetes_regression-batchscoring-ci.yml index 1392fddb..7ee42d17 100644 --- a/.pipelines/diabetes_regression-batchscoring-ci.yml +++ b/.pipelines/diabetes_regression-batchscoring-ci.yml @@ -68,6 +68,8 @@ stages: python -m ml_service.pipelines.diabetes_regression_build_parallel_batchscore_pipeline env: SCORING_DATASTORE_ACCESS_KEY: $(SCORING_DATASTORE_ACCESS_KEY) + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) + - job: "Run_Batch_Score_Pipeline" displayName: "Run Batch Scoring Pipeline" diff --git a/.pipelines/diabetes_regression-cd.yml b/.pipelines/diabetes_regression-cd.yml index 8dd35e47..7e43098c 100644 --- a/.pipelines/diabetes_regression-cd.yml +++ b/.pipelines/diabetes_regression-cd.yml @@ -55,7 +55,7 @@ stages: inputs: azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' scriptLocation: inlineScript - workingDirectory: $(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/scoring + workingDirectory: $(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/diabetes_regression/scoring inlineScript: | set -e # fail on error @@ -101,7 +101,7 @@ stages: inputs: azureSubscription: '$(WORKSPACE_SVC_CONNECTION)' scriptLocation: inlineScript - workingDirectory: $(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/scoring + workingDirectory: $(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/diabetes_regression/scoring inlineScript: | set -e # fail on error @@ -111,6 +111,8 @@ stages: --dc deployment_config_aks.yml \ -g $(RESOURCE_GROUP) --workspace-name $(WORKSPACE_NAME) \ --overwrite -v + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - task: AzureCLI@1 displayName: 'Smoke test' inputs: @@ -120,6 +122,8 @@ stages: set -e # fail on error export SUBSCRIPTION_ID=$(az account show --query id -o tsv) python -m ml_service.util.smoke_test_scoring_service --type AKS --service "$(AKS_DEPLOYMENT_NAME)" + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - stage: 'Deploy_Webapp' displayName: 'Deploy to Webapp' @@ -138,8 +142,8 @@ stages: - template: diabetes_regression-package-model-template.yml parameters: modelId: $(MODEL_NAME):$(get_model.MODEL_VERSION) - scoringScriptPath: '$(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/scoring/score.py' - condaFilePath: '$(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/conda_dependencies.yml' + scoringScriptPath: '$(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/diabetes_regression/scoring/score.py' + condaFilePath: '$(Build.SourcesDirectory)/$(SOURCES_DIR_TRAIN)/diabetes_regression/conda_dependencies.yml' - script: echo $(IMAGE_LOCATION) >image_location.txt displayName: "Write image location file" - task: AzureWebAppContainer@1 @@ -159,3 +163,5 @@ stages: set -e # fail on error export SUBSCRIPTION_ID=$(az account show --query id -o tsv) python -m ml_service.util.smoke_test_scoring_service --type Webapp --service "$(WebAppDeploy.AppServiceApplicationUrl)/score" + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) diff --git a/.pipelines/diabetes_regression-ci.yml b/.pipelines/diabetes_regression-ci.yml index 5a539af0..f7e30e94 100644 --- a/.pipelines/diabetes_regression-ci.yml +++ b/.pipelines/diabetes_regression-ci.yml @@ -45,6 +45,8 @@ stages: # Invoke the Python building and publishing a training pipeline python -m ml_service.pipelines.diabetes_regression_build_train_pipeline displayName: 'Publish Azure Machine Learning Pipeline' + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - stage: 'Trigger_AML_Pipeline' displayName: 'Train and evaluate model' @@ -70,6 +72,8 @@ stages: # Set AMLPIPELINEID variable for next AML Pipeline task in next job AMLPIPELINEID="$(cat pipeline_id.txt)" echo "##vso[task.setvariable variable=AMLPIPELINEID;isOutput=true]$AMLPIPELINEID" + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) name: 'getpipelineid' displayName: 'Get Pipeline ID' - job: "Run_ML_Pipeline" @@ -87,6 +91,8 @@ stages: PipelineId: '$(AMLPIPELINE_ID)' ExperimentName: '$(EXPERIMENT_NAME)' PipelineParameters: '"ParameterAssignments": {"model_name": "$(MODEL_NAME)"}, "tags": {"BuildId": "$(Build.BuildId)", "BuildUri": "$(BUILD_URI)"}, "StepTags": {"BuildId": "$(Build.BuildId)", "BuildUri": "$(BUILD_URI)"}' + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - job: "Training_Run_Report" dependsOn: "Run_ML_Pipeline" condition: always() diff --git a/.pipelines/diabetes_regression-get-model-id-artifact-template.yml b/.pipelines/diabetes_regression-get-model-id-artifact-template.yml index b9e61306..d1fc8a2c 100644 --- a/.pipelines/diabetes_regression-get-model-id-artifact-template.yml +++ b/.pipelines/diabetes_regression-get-model-id-artifact-template.yml @@ -27,6 +27,8 @@ steps: runId: '${{ parameters.artifactBuildId }}' runBranch: '$(Build.SourceBranch)' path: $(Build.SourcesDirectory)/bin + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - task: Bash@3 name: get_model displayName: Parse Json for Model Name and Version diff --git a/.pipelines/diabetes_regression-publish-model-artifact-template.yml b/.pipelines/diabetes_regression-publish-model-artifact-template.yml index 00e45105..f796dfce 100644 --- a/.pipelines/diabetes_regression-publish-model-artifact-template.yml +++ b/.pipelines/diabetes_regression-publish-model-artifact-template.yml @@ -25,5 +25,7 @@ steps: echo $FOUND_MODEL >model.json name: 'getversion' displayName: "Determine if evaluation succeeded and new model is registered (CLI)" + env: + APP_INSIGHTS_CONNECTION_STRING: $(APP_INSIGHTS_CONNECTION_STRING) - publish: model.json artifact: model diff --git a/.pipelines/diabetes_regression-variables-template.yml b/.pipelines/diabetes_regression-variables-template.yml index 502753fb..2f5857e6 100644 --- a/.pipelines/diabetes_regression-variables-template.yml +++ b/.pipelines/diabetes_regression-variables-template.yml @@ -3,19 +3,19 @@ variables: # Source Config # The directory containing the scripts for training, evaluating, and registering the model - name: SOURCES_DIR_TRAIN - value: diabetes_regression + value: . # The path to the model training script under SOURCES_DIR_TRAIN - name: TRAIN_SCRIPT_PATH - value: training/train_aml.py + value: diabetes_regression/training/train_aml.py # The path to the model evaluation script under SOURCES_DIR_TRAIN - name: EVALUATE_SCRIPT_PATH - value: evaluate/evaluate_model.py + value: diabetes_regression/evaluate/evaluate_model.py # The path to the model registration script under SOURCES_DIR_TRAIN - name: REGISTER_SCRIPT_PATH - value: register/register_model.py + value: diabetes_regression/register/register_model.py # The path to the model scoring script relative to SOURCES_DIR_TRAIN - name: SCORE_SCRIPT - value: scoring/score.py + value: diabetes_regression/scoring/score.py # Azure ML Variables @@ -66,8 +66,8 @@ variables: # value: "true" # Flag to allow rebuilding the AML Environment after it was built for the first time. This enables dependency updates from conda_dependencies.yaml. - # - name: AML_REBUILD_ENVIRONMENT - # value: "false" +# - name: AML_REBUILD_ENVIRONMENT +# value: "true" # Variables below are used for controlling various aspects of batch scoring - name: USE_GPU_FOR_SCORING @@ -95,9 +95,9 @@ variables: value: lowpriority # The path to the batch scoring script relative to SOURCES_DIR_TRAIN - name: BATCHSCORE_SCRIPT_PATH - value: scoring/parallel_batchscore.py + value: diabetes_regression/scoring/parallel_batchscore.py - name: BATCHSCORE_COPY_SCRIPT_PATH - value: scoring/parallel_batchscore_copyoutput.py + value: diabetes_regression/scoring/parallel_batchscore_copyoutput.py # Flag to allow rebuilding the AML Environment after it was built for the first time. # This enables dependency updates from the conda dependencies yaml for scoring activities. - name: AML_REBUILD_ENVIRONMENT_SCORING @@ -126,4 +126,7 @@ variables: # Scoring pipeline name - name: SCORING_PIPELINE_NAME value: "diabetes-scoring-pipeline" - \ No newline at end of file + + #Observability + - name: APP_INSIGHTS_CONNECTION_STRING + value: "" diff --git a/diabetes_regression/ci_dependencies.yml b/diabetes_regression/ci_dependencies.yml index c54f3e32..d77fcd83 100644 --- a/diabetes_regression/ci_dependencies.yml +++ b/diabetes_regression/ci_dependencies.yml @@ -27,3 +27,6 @@ dependencies: - flake8==3.7.* - flake8_formatter_junit_xml==0.0.* - azure-cli==2.3.* + - opencensus==0.7.7 + - opencensus-context==0.1.1 + - opencensus-ext-azure==1.0.2 diff --git a/diabetes_regression/conda_dependencies.yml b/diabetes_regression/conda_dependencies.yml index 57f2b999..e420fd94 100644 --- a/diabetes_regression/conda_dependencies.yml +++ b/diabetes_regression/conda_dependencies.yml @@ -37,3 +37,12 @@ dependencies: # MLOps with R - azure-storage-blob + + # Observability + - opencensus==0.7.7 + - opencensus-context==0.1.1 + - opencensus-ext-azure==1.0.2 + - python-dotenv==0.10.3 + + # Data Classes + - dataclasses diff --git a/diabetes_regression/conda_dependencies_scorecopy.yml b/diabetes_regression/conda_dependencies_scorecopy.yml index dffafd08..876bf278 100644 --- a/diabetes_regression/conda_dependencies_scorecopy.yml +++ b/diabetes_regression/conda_dependencies_scorecopy.yml @@ -29,3 +29,12 @@ dependencies: # Score copying deps - azure-storage-blob + + # Observability + - opencensus==0.7.7 + - opencensus-context==0.1.1 + - opencensus-ext-azure==1.0.2 + - python-dotenv==0.10.3 + + # Data Classes + - dataclasses diff --git a/diabetes_regression/conda_dependencies_scoring.yml b/diabetes_regression/conda_dependencies_scoring.yml index 60c45c44..fca0405e 100644 --- a/diabetes_regression/conda_dependencies_scoring.yml +++ b/diabetes_regression/conda_dependencies_scoring.yml @@ -30,3 +30,12 @@ dependencies: # Scoring deps - scikit-learn - pandas + + # Observability + - opencensus==0.7.7 + - opencensus-context==0.1.1 + - opencensus-ext-azure==1.0.2 + - python-dotenv==0.10.3 + + # Data Classes + - dataclasses diff --git a/diabetes_regression/evaluate/evaluate_model.py b/diabetes_regression/evaluate/evaluate_model.py index 5a69addb..b7bfab65 100644 --- a/diabetes_regression/evaluate/evaluate_model.py +++ b/diabetes_regression/evaluate/evaluate_model.py @@ -26,7 +26,11 @@ from azureml.core import Run import argparse import traceback -from util.model_helper import get_model +from diabetes_regression.util.model_helper import get_model +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() run = Run.get_context() @@ -42,7 +46,7 @@ # load_dotenv() # sources_dir = os.environ.get("SOURCES_DIR_TRAIN") # if (sources_dir is None): -# sources_dir = 'diabetes_regression' +# sources_dir = '.' # path_to_util = os.path.join(".", sources_dir, "util") # sys.path.append(os.path.abspath(path_to_util)) # NOQA: E402 # from model_helper import get_model @@ -89,7 +93,8 @@ parser.add_argument( "--allow_run_cancel", type=str, - help="Set this to false to avoid evaluation step from cancelling run after an unsuccessful evaluation", # NOQA: E501 + help="Set this to false to avoid evaluation step from cancelling " + "run after an unsuccessful evaluation", default="true", ) @@ -109,10 +114,10 @@ tag_name = 'experiment_name' model = get_model( - model_name=model_name, - tag_name=tag_name, - tag_value=exp.name, - aml_workspace=ws) + model_name=model_name, + tag_name=tag_name, + tag_value=exp.name, + aml_workspace=ws) if (model is not None): production_model_mse = 10000 @@ -120,12 +125,12 @@ production_model_mse = float(model.tags[metric_eval]) new_model_mse = float(run.parent.get_metrics().get(metric_eval)) if (production_model_mse is None or new_model_mse is None): - print("Unable to find", metric_eval, "metrics, " - "exiting evaluation") - if((allow_run_cancel).lower() == 'true'): + observability.log("Unable to find" + + metric_eval + "metrics, exiting evaluation") + if ((allow_run_cancel).lower() == 'true'): run.parent.cancel() else: - print( + observability.log( "Current Production model mse: {}, " "New trained model mse: {}".format( production_model_mse, new_model_mse @@ -133,18 +138,23 @@ ) if (new_model_mse < production_model_mse): - print("New trained model performs better, " - "thus it should be registered") + observability.log("New trained model performs better, " + "thus it should be registered") else: - print("New trained model metric is worse than or equal to " - "production model so skipping model registration.") - if((allow_run_cancel).lower() == 'true'): + observability.log("New trained model metric is worse " + "than or equal to " + "production model so skipping " + "model registration.") + if ((allow_run_cancel).lower() == 'true'): run.parent.cancel() else: - print("This is the first model, " - "thus it should be registered") + observability.log("This is the first model, " + "thus it should be registered") except Exception: traceback.print_exc(limit=None, file=None, chain=True) - print("Something went wrong trying to evaluate. Exiting.") + observability.log( + description="Something went wrong trying to evaluate. Exiting.", + severity=Severity.ERROR) + raise diff --git a/diabetes_regression/register/register_model.py b/diabetes_regression/register/register_model.py index bca55a83..452ee896 100644 --- a/diabetes_regression/register/register_model.py +++ b/diabetes_regression/register/register_model.py @@ -31,10 +31,13 @@ import joblib from azureml.core import Run, Experiment, Workspace, Dataset from azureml.core.model import Model as AMLModel +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability +observability = Observability() -def main(): +def main(): run = Run.get_context() if (run.id.startswith('OfflineRun')): from dotenv import load_dotenv @@ -87,15 +90,17 @@ def main(): model_name = args.model_name model_path = args.step_input - print("Getting registration parameters") + observability.log("Getting registration parameters") # Load the registration parameters from the parameters file - with open("parameters.json") as f: + with open("diabetes_regression/parameters.json") as f: pars = json.load(f) try: register_args = pars["registration"] except KeyError: - print("Could not load registration values from file") + observability.log( + description="Could not load registration values from file", + severity=Severity.ERROR) register_args = {"tags": []} model_tags = {} @@ -104,10 +109,12 @@ def main(): mtag = run.parent.get_metrics()[tag] model_tags[tag] = mtag except KeyError: - print(f"Could not find {tag} metric on parent run.") + observability.log( + description=f"Could not find {tag} metric on parent run.", + severity=Severity.ERROR) # load the model - print("Loading model from " + model_path) + observability.log("Loading model from " + model_path) model_file = os.path.join(model_path, model_name) model = joblib.load(model_file) parent_tags = run.parent.get_tags() @@ -115,14 +122,18 @@ def main(): build_id = parent_tags["BuildId"] except KeyError: build_id = None - print("BuildId tag not found on parent run.") - print(f"Tags present: {parent_tags}") + observability.log("BuildId tag not found on parent run.", + severity=Severity.ERROR) + observability.log(description=f"Tags present: {parent_tags}", + severity=Severity.ERROR) try: build_uri = parent_tags["BuildUri"] except KeyError: build_uri = None - print("BuildUri tag not found on parent run.") - print(f"Tags present: {parent_tags}") + observability.log(description="BuildUri tag not found on parent run.", + severity=Severity.ERROR) + observability.log(description=f"Tags present: {parent_tags}", + severity=Severity.ERROR) if (model is not None): dataset_id = parent_tags["dataset_id"] @@ -154,7 +165,7 @@ def main(): build_id, build_uri) else: - print("Model not found. Skipping model registration.") + observability.log("Model not found. Skipping model registration.") sys.exit(0) @@ -163,21 +174,21 @@ def model_already_registered(model_name, exp, run_id): if len(model_list) >= 1: e = ("Model name:", model_name, "in workspace", exp.workspace, "with run_id ", run_id, "is already registered.") - print(e) + observability.log(description=e, severity=Severity.ERROR) raise Exception(e) else: - print("Model is not registered for this run.") + observability.log("Model is not registered for this run.") def register_aml_model( - model_path, - model_name, - model_tags, - exp, - run_id, - dataset_id, - build_id: str = 'none', - build_uri=None + model_path, + model_name, + model_tags, + exp, + run_id, + dataset_id, + build_id: str = 'none', + build_uri=None ): try: tagsValue = {"area": "diabetes_regression", @@ -198,7 +209,7 @@ def register_aml_model( datasets=[('training data', Dataset.get_by_id(exp.workspace, dataset_id))]) os.chdir("..") - print( + observability.log( "Model registered: {} \nModel Description: {} " "\nModel Version: {}".format( model.name, model.description, model.version @@ -206,7 +217,8 @@ def register_aml_model( ) except Exception: traceback.print_exc(limit=None, file=None, chain=True) - print("Model registration failed") + observability.log("Model registration failed", + severity=Severity.ERROR) raise diff --git a/diabetes_regression/scoring/parallel_batchscore.py b/diabetes_regression/scoring/parallel_batchscore.py index cd42c79c..d47a6bfb 100644 --- a/diabetes_regression/scoring/parallel_batchscore.py +++ b/diabetes_regression/scoring/parallel_batchscore.py @@ -29,8 +29,12 @@ import joblib import sys from typing import List -from util.model_helper import get_model +from diabetes_regression.util.model_helper import get_model from azureml.core import Model +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() model = None @@ -68,7 +72,7 @@ def parse_args() -> List[str]: model_version = ( None if len(model_version_param) < 1 - or len(model_version_param[0][1].strip()) == 0 # NOQA: E501 + or len(model_version_param[0][1].strip()) == 0 # NOQA: E501 else model_version_param[0][1] ) @@ -80,7 +84,7 @@ def parse_args() -> List[str]: model_tag_name = ( None if len(model_tag_name_param) < 1 - or len(model_tag_name_param[0][1].strip()) == 0 # NOQA: E501 + or len(model_tag_name_param[0][1].strip()) == 0 # NOQA: E501 else model_tag_name_param[0][1] ) @@ -105,7 +109,7 @@ def init(): line arguments and get the right model to use for scoring. """ try: - print("Initializing batch scoring script...") + observability.log("Initializing batch scoring script...") # Get the model using name/version/tags filter model_filter = parse_args() @@ -120,9 +124,10 @@ def init(): modelpath = Model.get_model_path( model_name=amlmodel.name, version=amlmodel.version) model = joblib.load(modelpath) - print("Loaded model {}".format(model_filter[0])) + observability.log("Loaded model {}".format(model_filter[0])) except Exception as ex: - print("Error: {}".format(ex)) + observability.log(description="Error: {}".format(ex), + severity=Severity.ERROR) def run(mini_batch: pd.DataFrame) -> pd.DataFrame: @@ -154,4 +159,4 @@ def run(mini_batch: pd.DataFrame) -> pd.DataFrame: ) except Exception as ex: - print(ex) + observability.log(description=ex, severity=Severity.ERROR) diff --git a/diabetes_regression/scoring/parallel_batchscore_copyoutput.py b/diabetes_regression/scoring/parallel_batchscore_copyoutput.py index cc4af42c..64f04bfd 100644 --- a/diabetes_regression/scoring/parallel_batchscore_copyoutput.py +++ b/diabetes_regression/scoring/parallel_batchscore_copyoutput.py @@ -28,6 +28,9 @@ from datetime import datetime, date, timezone import argparse import os +from utils.logger.observability import Observability + +observability = Observability() def parse_args(): @@ -42,7 +45,7 @@ def parse_args(): def copy_output(args): - print("Output : {}".format(args.output_path)) + observability.log("Output : {}".format(args.output_path)) accounturl = "https://{}.blob.core.windows.net".format( args.scoring_datastore @@ -86,6 +89,6 @@ def copy_output(args): or args.output_path is None or args.output_path.strip() == "" ): - print("Missing parameters") + observability.log("Missing parameters") else: copy_output(args) diff --git a/diabetes_regression/scoring/score.py b/diabetes_regression/scoring/score.py index 4acd5c8d..16e43690 100644 --- a/diabetes_regression/scoring/score.py +++ b/diabetes_regression/scoring/score.py @@ -31,6 +31,9 @@ import input_schema, output_schema from inference_schema.parameter_types.numpy_parameter_type \ import NumpyParameterType +from utils.logger.observability import Observability + +observability = Observability() def init(): @@ -70,13 +73,13 @@ def run(data, request_headers): # The HTTP 'traceparent' header may be set by the caller to implement # distributed tracing (per the W3C Trace Context proposed specification) # and can be used to correlate the request to external systems. - print(('{{"RequestId":"{0}", ' - '"TraceParent":"{1}", ' - '"NumberOfPredictions":{2}}}' - ).format( - request_headers.get("X-Ms-Request-Id", ""), - request_headers.get("Traceparent", ""), - len(result) + observability.log(('{{"RequestId":"{0}", ' + '"TraceParent":"{1}", ' + '"NumberOfPredictions":{2}}}' + ).format( + request_headers.get("X-Ms-Request-Id", ""), + request_headers.get("Traceparent", ""), + len(result) )) return {"result": result.tolist()} @@ -87,4 +90,4 @@ def run(data, request_headers): init() test_row = '{"data":[[1,2,3,4,5,6,7,8,9,10],[10,9,8,7,6,5,4,3,2,1]]}' prediction = run(test_row, {}) - print("Test result: ", prediction) + observability.log("Test result: ", prediction) diff --git a/diabetes_regression/training/train.py b/diabetes_regression/training/train.py index 22258042..03a531dd 100644 --- a/diabetes_regression/training/train.py +++ b/diabetes_regression/training/train.py @@ -29,6 +29,9 @@ from sklearn.linear_model import Ridge from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split +from utils.logger.observability import Observability + +observability = Observability() # Split the dataframe into test and train data @@ -59,7 +62,7 @@ def get_model_metrics(model, data): def main(): - print("Running train.py") + observability.log("Running train.py") # Define training parameters ridge_args = {"alpha": 0.5} @@ -77,7 +80,7 @@ def main(): # Log the metrics for the model metrics = get_model_metrics(model, data) for (k, v) in metrics.items(): - print(f"{k}: {v}") + observability.log_metric(name=k, value=v) if __name__ == '__main__': diff --git a/diabetes_regression/training/train_aml.py b/diabetes_regression/training/train_aml.py index 9303198b..ab08f49e 100644 --- a/diabetes_regression/training/train_aml.py +++ b/diabetes_regression/training/train_aml.py @@ -29,14 +29,19 @@ import argparse import joblib import json -from train import split_data, train_model, get_model_metrics +from diabetes_regression.training.train import split_data, \ + train_model, get_model_metrics +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() def register_dataset( - aml_workspace: Workspace, - dataset_name: str, - datastore_name: str, - file_path: str + aml_workspace: Workspace, + dataset_name: str, + datastore_name: str, + file_path: str ) -> Dataset: datastore = Datastore.get(aml_workspace, datastore_name) dataset = Dataset.Tabular.from_delimited_files(path=(datastore, file_path)) @@ -48,7 +53,7 @@ def register_dataset( def main(): - print("Running train_aml.py") + observability.log("Running train_aml.py") parser = argparse.ArgumentParser("train") parser.add_argument( @@ -93,12 +98,12 @@ def main(): args = parser.parse_args() - print("Argument [model_name]: %s" % args.model_name) - print("Argument [step_output]: %s" % args.step_output) - print("Argument [dataset_version]: %s" % args.dataset_version) - print("Argument [data_file_path]: %s" % args.data_file_path) - print("Argument [caller_run_id]: %s" % args.caller_run_id) - print("Argument [dataset_name]: %s" % args.dataset_name) + observability.log("Argument [model_name]: %s" % args.model_name) + observability.log("Argument [step_output]: %s" % args.step_output) + observability.log("Argument [dataset_version]: %s" % args.dataset_version) + observability.log("Argument [data_file_path]: %s" % args.data_file_path) + observability.log("Argument [caller_run_id]: %s" % args.caller_run_id) + observability.log("Argument [dataset_name]: %s" % args.dataset_name) model_name = args.model_name step_output_path = args.step_output @@ -108,19 +113,21 @@ def main(): run = Run.get_context() - print("Getting training parameters") + observability.log("Getting training parameters") # Load the training parameters from the parameters file - with open("parameters.json") as f: + with open("diabetes_regression/parameters.json") as f: pars = json.load(f) try: train_args = pars["training"] except KeyError: - print("Could not load training values from file") + observability.log( + description="Could not load training values from file", + severity=Severity.ERROR) train_args = {} # Log the training parameters - print(f"Parameters: {train_args}") + observability.log(f"Parameters: {train_args}") for (k, v) in train_args.items(): run.log(k, v) run.parent.log(k, v) @@ -136,7 +143,7 @@ def main(): data_file_path) else: e = ("No dataset provided") - print(e) + observability.log(description=e, severity=Severity.ERROR) raise Exception(e) # Link dataset to the step run so it is trackable in the UI @@ -153,8 +160,8 @@ def main(): # Evaluate and log the metrics returned from the train function metrics = get_model_metrics(model, data) for (k, v) in metrics.items(): - run.log(k, v) - run.parent.log(k, v) + observability.log_metric(name=k, value=v) + observability.log_metric(name=k, value=v, log_parent=True) # Pass model file to next step os.makedirs(step_output_path, exist_ok=True) @@ -167,7 +174,7 @@ def main(): joblib.dump(value=model, filename=output_path) run.tag("run_type", value="train") - print(f"tags now present for run: {run.tags}") + observability.log(f"tags now present for run: {run.tags}") run.complete() diff --git a/diabetes_regression/util/model_helper.py b/diabetes_regression/util/model_helper.py index 0fd20ef0..d175930a 100644 --- a/diabetes_regression/util/model_helper.py +++ b/diabetes_regression/util/model_helper.py @@ -4,6 +4,10 @@ from azureml.core import Run from azureml.core import Workspace from azureml.core.model import Model as AMLModel +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() def get_current_workspace() -> Workspace: @@ -44,17 +48,19 @@ def get_model( None. """ if aml_workspace is None: - print("No workspace defined - using current experiment workspace.") + observability.log("No workspace defined - " + "using current experiment workspace.") aml_workspace = get_current_workspace() tags = None if tag_name is not None or tag_value is not None: # Both a name and value must be specified to use tags. if tag_name is None or tag_value is None: - raise ValueError( - "model_tag_name and model_tag_value should both be supplied" - + "or excluded" # NOQA: E501 - ) + + error = "model_tag_name and model_tag_value should " \ + "both be supplied or excluded" + observability.log(description=error, severity=Severity.ERROR) + raise ValueError(error) tags = [[tag_name, tag_value]] model = None @@ -74,6 +80,8 @@ def get_model( if len(models) == 1: model = models[0] elif len(models) > 1: - raise Exception("Expected only one model") + error = "Expected only one model" + observability.log(description=error, severity=Severity.ERROR) + raise Exception(error) return model diff --git a/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py b/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py index ac3d3407..f72874c1 100644 --- a/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py +++ b/ml_service/pipelines/diabetes_regression_build_parallel_batchscore_pipeline.py @@ -41,9 +41,14 @@ from azureml.pipeline.steps import PythonScriptStep from typing import Tuple +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + def get_or_create_datastore( - datastorename: str, ws: Workspace, env: Env, input: bool = True + datastorename: str, ws: Workspace, env: Env, input: bool = True ) -> Datastore: """ Obtains a datastore with matching name. Creates it if none exists. @@ -59,7 +64,9 @@ def get_or_create_datastore( :raises: ValueError """ if datastorename is None: - raise ValueError("Datastore name is required.") + error = "Datastore name is required." + observability.log(description=error, severity=Severity.ERROR) + raise ValueError(error) containername = ( env.scoring_datastore_input_container @@ -73,8 +80,8 @@ def get_or_create_datastore( # the datastore is not registered but we have all details to register it elif ( - env.scoring_datastore_access_key is not None - and containername is not None # NOQA: E501 + env.scoring_datastore_access_key is not None + and containername is not None # NOQA: E501 ): # NOQA:E501 datastore = Datastore.register_azure_blob_container( @@ -85,11 +92,10 @@ def get_or_create_datastore( container_name=containername, ) else: - raise ValueError( - "No existing datastore named {} nor was enough information supplied to create one.".format( # NOQA: E501 - datastorename - ) - ) + error = "No existing datastore named {} nor was enough " \ + "information supplied to create one.".format(datastorename) + observability.log(description=error, severity=Severity.ERROR) + raise ValueError(error) return datastore @@ -145,12 +151,11 @@ def get_fallback_input_dataset(ws: Workspace, env: Env) -> Dataset: if not os.path.exists(env.scoring_datastore_input_filename): error_message = ( - "Could not find CSV dataset for scoring at {}. " - + "No alternate data store location was provided either.".format( - env.scoring_datastore_input_filename - ) # NOQA: E501 + "Could not find CSV dataset for scoring at {}. " + + "No alternate data store location was provided either." + .format(env.scoring_datastore_input_filename) ) - + observability.log(description=error_message, severity=Severity.ERROR) raise FileNotFoundError(error_message) # upload the input data to the workspace default datastore @@ -162,16 +167,18 @@ def get_fallback_input_dataset(ws: Workspace, env: Env) -> Dataset: ) scoringinputds = ( - Dataset.Tabular.from_delimited_files(scoreinputdataref) - .register(ws, env.scoring_dataset_name, create_new_version=True) - .as_named_input(env.scoring_dataset_name) + Dataset.Tabular.from_delimited_files(scoreinputdataref).register( + ws, + env.scoring_dataset_name, + create_new_version=True).as_named_input( + env.scoring_dataset_name) ) return scoringinputds def get_output_location( - ws: Workspace, env: Env, outputdatastore: Datastore = None + ws: Workspace, env: Env, outputdatastore: Datastore = None ) -> PipelineData: """ Returns a Datastore wrapped as a PipelineData instance suitable @@ -200,7 +207,7 @@ def get_output_location( def get_inputds_outputloc( - ws: Workspace, env: Env + ws: Workspace, env: Env ) -> Tuple[Dataset, PipelineData]: # NOQA: E501 """ Prepare the input and output for the scoring step. Input is a tabular @@ -234,7 +241,7 @@ def get_inputds_outputloc( def get_run_configs( - ws: Workspace, computetarget: ComputeTarget, env: Env + ws: Workspace, computetarget: ComputeTarget, env: Env ) -> Tuple[ParallelRunConfig, RunConfiguration]: """ Creates the necessary run configurations required by the @@ -281,13 +288,13 @@ def get_run_configs( def get_scoring_pipeline( - scoring_dataset: Dataset, - output_loc: PipelineData, - score_run_config: ParallelRunConfig, - copy_run_config: RunConfiguration, - computetarget: ComputeTarget, - ws: Workspace, - env: Env, + scoring_dataset: Dataset, + output_loc: PipelineData, + score_run_config: ParallelRunConfig, + copy_run_config: RunConfiguration, + computetarget: ComputeTarget, + ws: Workspace, + env: Env, ) -> Pipeline: """ Creates the scoring pipeline. @@ -418,9 +425,9 @@ def build_batchscore_pipeline(): pipeline_id_string = "##vso[task.setvariable variable=pipeline_id;isOutput=true]{}".format( # NOQA: E501 published_pipeline.id ) - print(pipeline_id_string) + observability.log(pipeline_id_string) except Exception as e: - print(e) + observability.log(description=e, severity=Severity.ERROR) exit(1) diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py index 03937186..8d7dba6f 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline.py @@ -9,6 +9,11 @@ from ml_service.util.manage_environment import get_environment import os +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + def main(): e = Env() @@ -18,14 +23,14 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, ) - print("get_workspace:") - print(aml_workspace) + observability.log("get_workspace:") + observability.log(aml_workspace) # Get Azure machine learning cluster aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: - print("aml_compute:") - print(aml_compute) + observability.log("aml_compute:") + observability.log(aml_compute) # Create a reusable Azure ML environment environment = get_environment( @@ -44,6 +49,9 @@ def main(): run_config.environment.environment_variables[ "DATASTORE_NAME" ] = datastore_name # NOQA: E501 + run_config.environment.environment_variables[ + "APP_INSIGHTS_CONNECTION_STRING" + ] = e.app_insights_connection_string model_name_param = PipelineParameter(name="model_name", default_value=e.model_name) # NOQA: E501 dataset_version_param = PipelineParameter( @@ -68,10 +76,12 @@ def main(): file_name = "diabetes.csv" if not os.path.exists(file_name): - raise Exception( - 'Could not find CSV dataset at "%s". If you have bootstrapped your project, you will need to provide a CSV.' # NOQA: E501 - % file_name - ) # NOQA: E501 + error = ( + 'Could not find CSV dataset at "%s". ' + 'If you have bootstrapped your project, you will need to provide a CSV.' # NOQA: E501 + % file_name) + observability.log(description=error, severity=Severity.ERROR) + raise Exception(error) # NOQA: E501 # Upload file to default datastore in workspace datatstore = Datastore.get(aml_workspace, datastore_name) @@ -124,7 +134,7 @@ def main(): runconfig=run_config, allow_reuse=True, ) - print("Step Train created") + observability.log("Step Train created") evaluate_step = PythonScriptStep( name="Evaluate Model ", @@ -140,7 +150,7 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Evaluate created") + observability.log("Step Evaluate created") register_step = PythonScriptStep( name="Register Model ", @@ -152,15 +162,16 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Register created") + observability.log("Step Register created") # Check run_evaluation flag to include or exclude evaluation step. if (e.run_evaluation).lower() == "true": - print("Include evaluation step before register step.") + observability.log("Include evaluation step before register step.") evaluate_step.run_after(train_step) register_step.run_after(evaluate_step) steps = [train_step, evaluate_step, register_step] else: - print("Exclude evaluation step and directly run register step.") + observability.log("Exclude evaluation step and " + "directly run register step.") register_step.run_after(train_step) steps = [train_step, register_step] @@ -172,8 +183,8 @@ def main(): description="Model training/retraining pipeline", version=e.build_id, ) - print(f"Published pipeline: {published_pipeline.name}") - print(f"for build {published_pipeline.version}") + observability.log(f"Published pipeline: {published_pipeline.name}") + observability.log(f"for build {published_pipeline.version}") if __name__ == "__main__": diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py index 254f22eb..626ff1ca 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r.py @@ -6,6 +6,10 @@ from ml_service.util.env_variables import Env from ml_service.util.manage_environment import get_environment +from utils.logger.observability import Observability + +observability = Observability() + def main(): e = Env() @@ -15,14 +19,14 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group, ) - print("get_workspace:") - print(aml_workspace) + observability.log("get_workspace:") + observability.log(aml_workspace) # Get Azure machine learning cluster aml_compute = get_compute(aml_workspace, e.compute_name, e.vm_size) if aml_compute is not None: - print("aml_compute:") - print(aml_compute) + observability.log("aml_compute:") + observability.log(aml_compute) # Create a reusable Azure ML environment # Make sure to include `r-essentials' @@ -44,7 +48,7 @@ def main(): runconfig=run_config, allow_reuse=False, ) - print("Step Train created") + observability.log("Step Train created") steps = [train_step] @@ -55,8 +59,8 @@ def main(): description="Model training/retraining pipeline", version=e.build_id, ) - print(f"Published pipeline: {published_pipeline.name}") - print(f"for build {published_pipeline.version}") + observability.log(f"Published pipeline: {published_pipeline.name}") + observability.log(f"for build {published_pipeline.version}") if __name__ == "__main__": diff --git a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r_on_dbricks.py b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r_on_dbricks.py index ae607b3b..00b09853 100644 --- a/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r_on_dbricks.py +++ b/ml_service/pipelines/diabetes_regression_build_train_pipeline_with_r_on_dbricks.py @@ -4,6 +4,10 @@ from azureml.pipeline.steps import DatabricksStep from ml_service.util.env_variables import Env +from utils.logger.observability import Observability + +observability = Observability() + def main(): e = Env() @@ -13,8 +17,8 @@ def main(): subscription_id=e.subscription_id, resource_group=e.resource_group ) - print("get_workspace:") - print(aml_workspace) + observability.log("get_workspace:") + observability.log(aml_workspace) # Get Azure machine learning cluster aml_compute = get_compute( @@ -22,8 +26,8 @@ def main(): e.compute_name, e.vm_size) if aml_compute is not None: - print("aml_compute:") - print(aml_compute) + observability.log("aml_compute:") + observability.log(aml_compute) train_step = DatabricksStep( name="DBPythonInLocalMachine", @@ -36,7 +40,7 @@ def main(): allow_reuse=False ) - print("Step Train created") + observability.log("Step Train created") steps = [train_step] @@ -47,8 +51,8 @@ def main(): description="Model training/retraining pipeline", version=e.build_id ) - print(f'Published pipeline: {published_pipeline.name}') - print(f'for build {published_pipeline.version}') + observability.log(f'Published pipeline: {published_pipeline.name}') + observability.log(f'for build {published_pipeline.version}') if __name__ == '__main__': diff --git a/ml_service/pipelines/diabetes_regression_verify_train_pipeline.py b/ml_service/pipelines/diabetes_regression_verify_train_pipeline.py new file mode 100644 index 00000000..e69de29b diff --git a/ml_service/pipelines/run_parallel_batchscore_pipeline.py b/ml_service/pipelines/run_parallel_batchscore_pipeline.py index c046eb9c..5af6b3f9 100644 --- a/ml_service/pipelines/run_parallel_batchscore_pipeline.py +++ b/ml_service/pipelines/run_parallel_batchscore_pipeline.py @@ -30,6 +30,11 @@ from azureml.pipeline.core import PublishedPipeline import argparse +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + def parse_args(): parser = argparse.ArgumentParser() @@ -47,9 +52,10 @@ def get_pipeline(pipeline_id, ws: Workspace, env: Env): ] # noqa E501 if scoringpipelinelist.count == 0: - raise Exception( - "No pipeline found matching name:{}".format(env.scoring_pipeline_name) # NOQA: E501 - ) + error = "No pipeline found matching name:{}".\ + format(env.scoring_pipeline_name) + observability.log(description=error, severity=Severity.ERROR) + raise Exception(error) else: # latest published scoringpipeline = scoringpipelinelist[0] @@ -127,7 +133,7 @@ def run_batchscore_pipeline(): copy_output(list(run.get_steps())[0].id, env) except Exception as ex: - print("Error: {}".format(ex)) + observability.log(description=ex, severity=Severity.ERROR) if __name__ == "__main__": diff --git a/ml_service/pipelines/run_train_pipeline.py b/ml_service/pipelines/run_train_pipeline.py index b68b9a15..457a9d81 100644 --- a/ml_service/pipelines/run_train_pipeline.py +++ b/ml_service/pipelines/run_train_pipeline.py @@ -3,9 +3,13 @@ import argparse from ml_service.util.env_variables import Env +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() -def main(): +def main(): parser = argparse.ArgumentParser("register") parser.add_argument( "--output_pipeline_id_file", @@ -38,22 +42,28 @@ def main(): if p.version == e.build_id: matched_pipes.append(p) - if(len(matched_pipes) > 1): + if (len(matched_pipes) > 1): published_pipeline = None - raise Exception(f"Multiple active pipelines are published for build {e.build_id}.") # NOQA: E501 - elif(len(matched_pipes) == 0): + error = f"Multiple active pipelines are published for build {e.build_id}." # NOQA: E501 + observability.log(description=error, severity=Severity.ERROR) + raise Exception(error) + elif (len(matched_pipes) == 0): published_pipeline = None - raise KeyError(f"Unable to find a published pipeline for this build {e.build_id}") # NOQA: E501 + error = f"Unable to find a published pipeline for this build " \ + f"{e.build_id}" + observability.log(description=error, severity=Severity.ERROR) + raise KeyError(error) # NOQA: E501 else: published_pipeline = matched_pipes[0] - print("published pipeline id is", published_pipeline.id) + observability.log("published pipeline id is" + + str(published_pipeline.id)) # Save the Pipeline ID for other AzDO jobs after script is complete if args.output_pipeline_id_file is not None: with open(args.output_pipeline_id_file, "w") as out_file: out_file.write(published_pipeline.id) - if(args.skip_train_execution is False): + if (args.skip_train_execution is False): pipeline_parameters = {"model_name": e.model_name} tags = {"BuildId": e.build_id} if (e.build_uri is not None): @@ -66,7 +76,7 @@ def main(): tags=tags, pipeline_parameters=pipeline_parameters) - print("Pipeline run initiated ", run.id) + observability.log("Pipeline run initiated " + str(run.id)) if __name__ == "__main__": diff --git a/ml_service/util/attach_compute.py b/ml_service/util/attach_compute.py index ad9668db..bf0f61e2 100644 --- a/ml_service/util/attach_compute.py +++ b/ml_service/util/attach_compute.py @@ -1,17 +1,21 @@ - from azureml.core import Workspace from azureml.core.compute import AmlCompute from azureml.core.compute import ComputeTarget from azureml.exceptions import ComputeTargetException from ml_service.util.env_variables import Env +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + def get_compute(workspace: Workspace, compute_name: str, vm_size: str, for_batch_scoring: bool = False): # NOQA E501 try: if compute_name in workspace.compute_targets: compute_target = workspace.compute_targets[compute_name] if compute_target and type(compute_target) is AmlCompute: - print("Found existing compute target " + compute_name + " so using it.") # NOQA + observability.log("Found existing compute target " + compute_name + " so using it.") # NOQA else: e = Env() compute_config = AmlCompute.provisioning_configuration( @@ -33,6 +37,7 @@ def get_compute(workspace: Workspace, compute_name: str, vm_size: str, for_batch ) return compute_target except ComputeTargetException as ex: - print(ex) - print("An error occurred trying to provision compute.") + observability.log(ex, severity=Severity.ERROR) + observability.log("An error occurred trying to provision compute.", + severity=Severity.ERROR) exit(1) diff --git a/ml_service/util/create_scoring_image.py b/ml_service/util/create_scoring_image.py index 378cb3b4..82e1d37b 100644 --- a/ml_service/util/create_scoring_image.py +++ b/ml_service/util/create_scoring_image.py @@ -6,6 +6,11 @@ import shutil from ml_service.util.env_variables import Env +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + e = Env() # Get Azure machine learning workspace @@ -27,7 +32,7 @@ model = Model(ws, name=e.model_name, version=e.model_version) sources_dir = e.sources_directory_train if (sources_dir is None): - sources_dir = 'diabetes_regression' + sources_dir = '.' score_script = os.path.join(".", sources_dir, e.score_script) score_file = os.path.basename(score_script) path_to_scoring = os.path.dirname(score_script) @@ -43,17 +48,21 @@ package = Model.package(ws, [model], inference_config) package.wait_for_creation(show_output=True) # Display the package location/ACR path -print(package.location) +observability.log(package.location) os.chdir(cwd) if package.state != "Succeeded": - raise Exception("Image creation status: {package.creation_state}") + error = "Image creation status: {package.creation_state}" + observability.log(description=error, severity=Severity.ERROR) + raise Exception(error) -print("Package stored at {} with build log {}".format(package.location, package.package_build_log_uri)) # NOQA: E501 +observability.log( + "Package stored at {} with build log {}".format(package.location, package.package_build_log_uri)) # NOQA: E501 # Save the Image Location for other AzDO jobs after script is complete if args.output_image_location_file is not None: - print("Writing image location to %s" % args.output_image_location_file) + observability.log("Writing image location to %s" + % args.output_image_location_file) with open(args.output_image_location_file, "w") as out_file: out_file.write(str(package.location)) diff --git a/ml_service/util/env_variables.py b/ml_service/util/env_variables.py index 753c152d..9252de75 100644 --- a/ml_service/util/env_variables.py +++ b/ml_service/util/env_variables.py @@ -124,3 +124,6 @@ class Env: batchscore_copy_script_path: Optional[str] = os.environ.get( "BATCHSCORE_COPY_SCRIPT_PATH" ) # NOQA: E501 + app_insights_connection_string: Optional[str] = os.environ.get( + "APP_INSIGHTS_CONNECTION_STRING" + ) # NOQA: E501 diff --git a/ml_service/util/manage_environment.py b/ml_service/util/manage_environment.py index 54c5a72f..ed4b8547 100644 --- a/ml_service/util/manage_environment.py +++ b/ml_service/util/manage_environment.py @@ -1,17 +1,21 @@ - import os from azureml.core import Workspace, Environment from ml_service.util.env_variables import Env from azureml.core.runconfig import DEFAULT_CPU_IMAGE, DEFAULT_GPU_IMAGE +from utils.logger.logger_interface import Severity +from utils.logger.observability import Observability + +observability = Observability() + def get_environment( - workspace: Workspace, - environment_name: str, - conda_dependencies_file: str, - create_new: bool = False, - enable_docker: bool = None, - use_gpu: bool = False + workspace: Workspace, + environment_name: str, + conda_dependencies_file: str, + create_new: bool = False, + enable_docker: bool = None, + use_gpu: bool = False ): try: e = Env() @@ -24,7 +28,7 @@ def get_environment( if restored_environment is None or create_new: new_env = Environment.from_conda_specification( environment_name, - os.path.join(e.sources_directory_train, conda_dependencies_file), # NOQA: E501 + os.path.join(e.sources_directory_train, "diabetes_regression/" + conda_dependencies_file), # NOQA: E501 ) # NOQA: E501 restored_environment = new_env if enable_docker is not None: @@ -33,8 +37,8 @@ def get_environment( restored_environment.register(workspace) if restored_environment is not None: - print(restored_environment) + observability.log(restored_environment) return restored_environment except Exception as e: - print(e) + observability.log(description=e, severity=Severity.ERROR) exit(1) diff --git a/ml_service/util/smoke_test_scoring_service.py b/ml_service/util/smoke_test_scoring_service.py index 0fa34b1e..8307a865 100644 --- a/ml_service/util/smoke_test_scoring_service.py +++ b/ml_service/util/smoke_test_scoring_service.py @@ -6,6 +6,9 @@ from ml_service.util.env_variables import Env import secrets +from utils.logger.observability import Observability + +observability = Observability() input = {"data": [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]]} @@ -18,7 +21,7 @@ def call_web_service(e, service_type, service_name): subscription_id=e.subscription_id, resource_group=e.resource_group ) - print("Fetching service") + observability.log("Fetching service") headers = {} if service_type == "ACI": service = AciWebservice(aml_workspace, service_name) @@ -27,8 +30,8 @@ def call_web_service(e, service_type, service_name): if service.auth_enabled: service_keys = service.get_keys() headers['Authorization'] = 'Bearer ' + service_keys[0] - print("Testing service") - print(". url: %s" % service.scoring_uri) + observability.log("Testing service") + observability.log(". url: %s" % service.scoring_uri) output = call_web_app(service.scoring_uri, headers) return output @@ -51,8 +54,8 @@ def call_web_app(url, headers): except requests.exceptions.HTTPError as e: if i == retries - 1: raise e - print(e) - print("Retrying...") + observability.log(e) + observability.log("Retrying...") time.sleep(1) @@ -80,11 +83,11 @@ def main(): output = call_web_app(args.service, {}) else: output = call_web_service(e, args.type, args.service) - print("Verifying service output") + observability.log("Verifying service output") assert "result" in output assert len(output["result"]) == output_len - print("Smoke test successful.") + observability.log("Smoke test successful.") if __name__ == '__main__': diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utils/images/Metrics.png b/utils/images/Metrics.png new file mode 100644 index 00000000..88b7cf0b Binary files /dev/null and b/utils/images/Metrics.png differ diff --git a/utils/images/logs-aml.png b/utils/images/logs-aml.png new file mode 100644 index 00000000..24b6ef8d Binary files /dev/null and b/utils/images/logs-aml.png differ diff --git a/utils/images/logs-appinsights.png b/utils/images/logs-appinsights.png new file mode 100644 index 00000000..c06f0d9c Binary files /dev/null and b/utils/images/logs-appinsights.png differ diff --git a/utils/images/metrics-appinsights.png b/utils/images/metrics-appinsights.png new file mode 100644 index 00000000..c7a6ba15 Binary files /dev/null and b/utils/images/metrics-appinsights.png differ diff --git a/utils/logger/__init__.py b/utils/logger/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utils/logger/app_insights_logger.py b/utils/logger/app_insights_logger.py new file mode 100644 index 00000000..f0d799c2 --- /dev/null +++ b/utils/logger/app_insights_logger.py @@ -0,0 +1,130 @@ +import logging +import time +import uuid + +from opencensus.ext.azure import metrics_exporter +from opencensus.ext.azure.log_exporter import AzureLogHandler + + +from opencensus.stats import aggregation as aggregation_module +from opencensus.stats import measure as measure_module +from opencensus.stats import stats as stats_module +from opencensus.stats import view as view_module +from opencensus.tags import tag_map as tag_map_module + +from ml_service.util.env_variables import Env +from utils.logger.logger_interface import ( + LoggerInterface, + ObservabilityAbstract, + Severity, +) + + +class AppInsightsLogger(LoggerInterface, ObservabilityAbstract): + def __init__(self, run, export_interval): + self.env = Env() + # initializes log exporter + handler = AzureLogHandler( + connection_string=self.env.app_insights_connection_string, + logging_sampling_rate=1.0, + ) + handler.add_telemetry_processor(self.callback_function) + self.run_id = self.get_run_id(run) + self.logger = logging.getLogger(__name__) + self.logger.addHandler(handler) + # initializes metric exporter + self.export_interval = export_interval + exporter = metrics_exporter.new_metrics_exporter( + enable_standard_metrics=False, + export_interval=self.export_interval, + connection_string=self.env.app_insights_connection_string, + ) + exporter.add_telemetry_processor(self.callback_function) + stats_module.stats.view_manager.register_exporter(exporter) + + def log_metric( + self, name="", value="", description="", log_parent=False, + ): + """ + Sends a custom metric to appInsights + :param name: name of the metric + :param value: value of the metric + :param description: description of the metric + :param log_parent: not being used for this logger + :return: + """ + measurement_map = \ + stats_module.stats.stats_recorder.new_measurement_map() + tag_map = tag_map_module.TagMap() + + measure = measure_module.MeasureFloat(name, description) + self.set_view(name, description, measure) + measurement_map.measure_float_put(measure, value) + measurement_map.record(tag_map) + # Default export interval is every 15.0s + # Your application should run for at least this amount + # of time so the exporter will meet this interval + # Sleep can fulfill this https://pypi.org/project/opencensus-ext-azure/ + time.sleep(self.export_interval) + + def log(self, description="", severity=Severity.WARNING): + """ + Sends the logs to App Insights + :param description: log description + :param severity: log severity + :return: + """ + + if severity == self.severity.DEBUG: + self.logger.debug(description) + elif severity == self.severity.INFO: + self.logger.info(description) + elif severity == self.severity.WARNING: + self.logger.warning(description) + elif severity == self.severity.ERROR: + self.logger.error(description) + elif severity == self.severity.CRITICAL: + self.logger.critical(description) + + def get_run_id(self, run): + """ + gets the correlation ID by the in following order: + - If the script is running in an Online run Context of AML --> run_id + - If the script is running where a build_id + environment variable is set --> build_id + - Else --> generate a unique id + :param run: + :return: correlation_id + """ + run_id = str(uuid.uuid1()) + if not run.id.startswith(self.OFFLINE_RUN): + run_id = run.id + elif self.env.build_id: + run_id = self.env.build_id + return run_id + + @staticmethod + def set_view(metric, description, measure): + """ + Sets the view for the custom metric + """ + prompt_view = view_module.View( + metric, + description, + [], + measure, + aggregation_module.LastValueAggregation() + ) + stats_module.stats.view_manager.register_view(prompt_view) + + def callback_function(self, envelope): + """ + Attaches a correlation_id as a custom + dimension to the exporter just before + sending the logs/metrics + :param envelope: + :return: Always return True + (if False, it does not export metrics/logs) + """ + envelope.data.baseData.properties[self.CORRELATION_ID] = self.run_id + return True diff --git a/utils/logger/azure_ml_logger.py b/utils/logger/azure_ml_logger.py new file mode 100644 index 00000000..64b8d22e --- /dev/null +++ b/utils/logger/azure_ml_logger.py @@ -0,0 +1,49 @@ +import datetime +import time + +from utils.logger.logger_interface import ( + LoggerInterface, + ObservabilityAbstract, + Severity, +) + + +class AzureMlLogger(LoggerInterface, ObservabilityAbstract): + def __init__(self, run=None): + self.run = run + + def log_metric(self, name, value, description, log_parent): + """Log a metric value to the run with the given name. + :param log_parent: mark True if you want to log to parent Run + :param name: The name of metric. + :type name: str + :param value: The value to be posted to the service. + :type value: + :param description: An optional metric description. + :type description: str + """ + if name != "" and value != "": + self.run.log( + name, value, description + ) if log_parent is False \ + else self.run.parent.log(name, value, description) + + def log(self, description="", severity=Severity.WARNING): + """ + Sends the logs to AML (experiments -> logs/outputs) + :param description: log description + :param severity: log severity + :return: + """ + + time_stamp = datetime.datetime.fromtimestamp(time.time()).strftime( + "%Y-%m-%d %H:%M:%S" + ) + callee = self.get_callee( + 2 + ) # to get the script who is calling Observability + print( + "{}, [{}], {}:{}".format( + time_stamp, self.severity_map[severity], callee, description + ) + ) diff --git a/utils/logger/logger_interface.py b/utils/logger/logger_interface.py new file mode 100644 index 00000000..f7402962 --- /dev/null +++ b/utils/logger/logger_interface.py @@ -0,0 +1,41 @@ +import inspect + + +class Severity: + DEBUG = 10 + INFO = 20 + WARNING = 30 + ERROR = 40 + CRITICAL = 50 + + +class LoggerInterface: + def log_metric(self, name, value, description, log_parent): + pass + + def log(self, name, value, description, severity, log_parent): + pass + + +class ObservabilityAbstract: + OFFLINE_RUN = "OfflineRun" + CORRELATION_ID = "correlation_id" + severity = Severity() + severity_map = {10: "DEBUG", 20: "INFO", + 30: "WARNING", 40: "ERROR", 50: "CRITICAL"} + + @staticmethod + def get_callee(stack_level): + """ + This method get the callee location in [file_name:line_number] format + :param stack_level: + :return: string of [file_name:line_number] + """ + try: + stack = inspect.stack() + file_name = stack[stack_level + 1].filename.split("/")[-1] + line_number = stack[stack_level + 1].lineno + return "{}:{}".format(file_name, line_number) + except IndexError: + print("Index error, failed to log to AzureML") + return "" diff --git a/utils/logger/observability.py b/utils/logger/observability.py new file mode 100644 index 00000000..9a86c25b --- /dev/null +++ b/utils/logger/observability.py @@ -0,0 +1,81 @@ +from azureml.core import Run + +from ml_service.util.env_variables import Env +from utils.logger.app_insights_logger import AppInsightsLogger +from utils.logger.azure_ml_logger import AzureMlLogger +from utils.logger.logger_interface import ( + ObservabilityAbstract, + LoggerInterface, + Severity, +) + + +class Loggers(ObservabilityAbstract): + def __init__(self, export_interval) -> None: + self.loggers: LoggerInterface = [] + self.register_loggers(export_interval) + + def add(self, logger) -> None: + self.loggers.append(logger) + + def get_loggers_string(self) -> None: + return ", ".join([type(x).__name__ for x in self.loggers]) + + def register_loggers(self, export_interval): + """ + This method is responsible to create loggers/tracers + and add them to the list of loggers + Notes: + - If the context of the Run object is offline, + we do not create AzureMlLogger instance + - If APP_INSIGHTS_CONNECTION_STRING is notset + to ENV variable, we do not create AppInsightsLogger + instance + """ + run = Run.get_context() + if not run.id.startswith(self.OFFLINE_RUN): + self.loggers.append(AzureMlLogger(run)) + if Env().app_insights_connection_string: + self.loggers.append(AppInsightsLogger(run, export_interval)) + + +class Observability(LoggerInterface): + def __init__(self, export_interval=15) -> None: + self._loggers = Loggers(export_interval) + + def log_metric( + self, name="", value="", description="", log_parent=False, + ): + """ + this method sends the metrics to all registered loggers + :param name: metric name + :param value: metric value + :param description: description of the metric + :param log_parent: (only for AML), send the metric to the run.parent + :return: + """ + for logger in self._loggers.loggers: + logger.log_metric(name, value, description, log_parent) + + def log(self, description="", severity=Severity.WARNING): + """ + this method sends the logs to all registered loggers + :param description: Actual log description to be sent + :param severity: log Severity + :return: + """ + for logger in self._loggers.loggers: + logger.log(description, severity) + + def get_logger(self, logger_class): + """ + This method iterate over the loggers and it + returns the logger with the same type as the provided one. + this is a reference that can be used in case + any of the built in functions of the loggers is required + :param logger_class: + :return: a logger class + """ + for logger in self._loggers.loggers: + if type(logger) is type(logger_class): + return logger diff --git a/utils/observability-usage.md b/utils/observability-usage.md new file mode 100644 index 00000000..9e92e96a --- /dev/null +++ b/utils/observability-usage.md @@ -0,0 +1,130 @@ +## Observability Usage + +To use the logger in your project: + +**1. dependencies:** + +Observability requires the following dependencies. Make sure you add them into your project before the usage: +```yaml + - opencensus==0.7.7 + - opencensus-context==0.1.1 + - opencensus-ext-azure==1.0.2 +``` +Additionally, AppInsightsLogger needs the connection to AppInsights. Therefore make sure you add the Instrumentation key to the environment variables of the running environment: +```python +os.environ["APP_INSIGHTS_CONNECTION_STRING"] = "_____" +``` + + +**2. Import Observability** + +Import Observability object in your script and create an object of Observability type. +Observability object, checks the context of the +[Run](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.run(class)?view=azure-ml-py) object and if + the context is **Online**, it will add **AzureMlLogger** to the list of loggers. +If the Run context is offline, it will only add AppInsightLogger to list of loggers. +```python +from util.logger.observability import Observability + +observability = Observability() +``` + +**3. Log your message/metrics** + + +**3.1 log metrics:** +Below is the usage of **log_metric** method. +```python +observability.log_metric(name="alpha", value="2.1", description="value of alpha") +observability.log_metric(name="alpha", value="2.1", description="value of alpha", log_parent=True) +``` +upon a call to _log_metric_: +- If the context of Run class is online, metrics will be sent to azure ML. +- Metrics will be sent to AppInsights as a custom metrics. +Please note that metrics exporter sends the metrics at **export_interval** rate (in seconds) to +AppInsights based on their last entered value. The default export_interval is 15 seconds. + + +**3.2 log messages:** +Below is the usage of **log** method. + + +```python +observability.log(description="value of alpha",severity=Severity.WARNING) +``` +upon a call to _log_metric_: +- If the context of Run class is online, logs will be sent to azure ML. +- Logs will be sent to AppInsights. + + + +**4. check the logs/metrics** + +Observability is sending the logs/metrics to AzureMl( in online context) and to AppInsights. + +**Logs/Metrics Correlation:** + +The correlation_id is used to map specific Run/Build with the logs/metrics being sent to different resources. +Correlation_id will be created and added to telemetry processor in a custom dimension in following steps: + +1. If the logging is happening in an **Online Run Context**, +the correlationId is similar to **RunId** + +1. If the logging is happening in an **Offline Run Context**, +the correlationId is the same as the **buildId** of the Pipeline +(fetched from EnvVariable) + +1. If the logging is happening in Offline Run Context and there +is no BuildId set to the Environment variable, it will will +be associated with a **unique identifier**. + +**Check Metrics in Azure ML:** + +To check the metrics in Azure ML, navigate to [Azure ML portal](https://ml.azure.com/), find your desired experiment + and in the Metrics tab you can see the logged metrics. + ![Metrcis](images/Metrics.png) + +**Check Metrics in AppInsights:** + +To check the metrics in Application Insights, navigate to Azure Portal and your application Insights service. +Click on Logs tab and select custom Metrics. You may use the below queries to retrieve your metrics: +The following lists all the custom metrics: +```sql +customMetrics +``` +To narrow your search to the specific run you can provide the correlation_id: + +```sql +customMetrics +| where customDimensions.correlation_id contains "e56b31b7-513f-4c34-9158-c2e1b28a5aaf" +``` +![metrics-appInsights](images/metrics-appinsights.png) + +**Check logs in Azure ML:** + +Logs will be sent to AzureML only in Online context. +You can check the logs by logging in to [Azure ML portal](https://ml.azure.com/) portal. +Then click on desired experiment and select the specific step. +click on logs/output tab and check your logs. +logs will be sent to Azure ML in the following format: +```text +timeStamp, [severity], callee_file_name:line_number:description +``` +![logs-aml](images/logs-aml.png) + +**Check logs in Application Insights:** + +To check the logs in Application Insights, navigate to Azure Portal and your application Insights service. +Click on Logs tab and select traces. You may use the below queries to retrieve your logs: +The following lists all the custom metrics: +```sql +traces +``` +To narrow your search to the specific run you can provide the correlation_id: + +```sql +traces +| where customDimensions.correlation_id contains "e56b31b7-513f-4c34-9158-c2e1b28a5aaf" +``` +![logs-appInsights](images/logs-appinsights.png) + diff --git a/utils/tests/__init__.py b/utils/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/utils/tests/test_app_insights_logger.py b/utils/tests/test_app_insights_logger.py new file mode 100644 index 00000000..5f54fb6a --- /dev/null +++ b/utils/tests/test_app_insights_logger.py @@ -0,0 +1,72 @@ +import logging +import unittest +from unittest.mock import patch + +from utils.logger.app_insights_logger import AppInsightsLogger + + +class RealAppInsightsLogger(AppInsightsLogger): + def __init__(self): + self.logger = logging.getLogger(__name__) + self.env = MockEnv("") + + +class MockRun: + def __init__(self, run_id): + self.id = run_id + + +class MockEnv: + def __init__(self, run_id): + self.build_id = run_id + + +class TestObservability(unittest.TestCase): + @patch("utils.logger.app_insights_logger.AppInsightsLogger") + def setUp(cls, mock_app_insights_logger): + cls.concert_app_insights_logger = RealAppInsightsLogger() + cls.mock_app_insights_logger = mock_app_insights_logger + + def test_get_run_id_having_online_context(self): + expected = "FOO" + + response = self.concert_app_insights_logger.get_run_id(MockRun("FOO")) + + self.assertEqual(expected, response) + + def test_get_run_id_having_online_context_using_build_id(self): + self.concert_app_insights_logger.env.build_id = expected = "FOO" + + response = self.concert_app_insights_logger.\ + get_run_id(MockRun("OfflineRun")) + + self.assertEqual(expected, response) + + def test_get_run_id_having_online_context_using_uuid(self): + self.concert_app_insights_logger.env.build_id = "" + + response = self.concert_app_insights_logger.\ + get_run_id(MockRun("OfflineRun")) + + self.assertIsNotNone(response) + + def test_log_called_with_parameters(self): + self.mock_app_insights_logger.log("FOO", "BAZ") + + self.mock_app_insights_logger.log.assert_called_with("FOO", "BAZ") + + def test_log_metric_called_with_parameters(self): + self.mock_app_insights_logger.log_metric("FOO", "BAZ", "BAR", False) + + self.mock_app_insights_logger.log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + + def test_set_view_is_called_with_parameters(self): + self.mock_app_insights_logger.set_view("FOO", "BAR", "BAZ") + self.mock_app_insights_logger.set_view.\ + assert_called_with("FOO", "BAR", "BAZ") + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/tests/test_azure_ml_logger.py b/utils/tests/test_azure_ml_logger.py new file mode 100644 index 00000000..babdd62f --- /dev/null +++ b/utils/tests/test_azure_ml_logger.py @@ -0,0 +1,32 @@ +import unittest +from unittest.mock import patch + +from utils.logger.azure_ml_logger import AzureMlLogger + + +class TestObservability(unittest.TestCase): + @patch("utils.logger.azure_ml_logger.AzureMlLogger") + def setUp(cls, mock_azure_ml_logger): + cls.azure_ml_logger = mock_azure_ml_logger + + def test_log_called_with_parameters(self): + self.azure_ml_logger.log("FOO", "BAZ") + + self.azure_ml_logger.log.assert_called_with("FOO", "BAZ") + + def test_log_metric_called_with_parameters(self): + self.azure_ml_logger.log_metric("FOO", "BAZ", "BAR") + + self.azure_ml_logger.log_metric.assert_called_with("FOO", "BAZ", "BAR") + + def test_get_callee_returns_callee_file_with_line_number(self): + azure_ml_logger = AzureMlLogger() + expected = "test_azure_ml_logger.py:26" + + response = azure_ml_logger.get_callee(0) + + self.assertEqual(expected, response) + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/tests/test_observability.py b/utils/tests/test_observability.py new file mode 100644 index 00000000..17129d59 --- /dev/null +++ b/utils/tests/test_observability.py @@ -0,0 +1,55 @@ +import unittest +from unittest.mock import patch + +from utils.logger.observability import Observability + + +class ObservabilityMock(Observability): + @patch("utils.logger.app_insights_logger.AppInsightsLogger") + @patch("utils.logger.azure_ml_logger.AzureMlLogger") + @patch("utils.logger.observability.Loggers") + def __init__(self, mock_loggers, mock_aml_logger, mock_app_insight_logger): + mock_loggers.loggers = [mock_aml_logger, mock_app_insight_logger] + self._loggers = mock_loggers + + +class TestObservability(unittest.TestCase): + @patch("utils.logger.observability.Observability") + def setUp(cls, mock_observability): + cls.observability = mock_observability + + def test_log_metric_called_with_parameters(self): + self.observability.log_metric("FOO", "BAZ", "BAR") + + self.observability.log_metric.assert_called_with("FOO", "BAZ", "BAR") + + def test_log_called_with_parameters(self): + self.observability.log("FOO", "BAZ") + + self.observability.log.assert_called_with("FOO", "BAZ") + + def test_log_metric_is_being_called_by_all_loggers(self): + self.observability = ObservabilityMock() + + self.observability.log_metric("FOO", "BAZ", "BAR") + + self.observability._loggers.loggers[0].log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + self.observability._loggers.loggers[1].log_metric.assert_called_with( + "FOO", "BAZ", "BAR", False + ) + + def test_log_is_being_called_by_all_loggers(self): + self.observability = ObservabilityMock() + + self.observability.log("FOO", "BAZ") + + self.observability._loggers.loggers[0].\ + log.assert_called_with("FOO", "BAZ") + self.observability._loggers.loggers[1].\ + log.assert_called_with("FOO", "BAZ") + + +if __name__ == "__main__": + unittest.main()