diff --git a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate.py b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate.py index ed999ac4b6..cb229408d3 100644 --- a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate.py +++ b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate.py @@ -8,6 +8,8 @@ from collections import defaultdict import importlib import sys +import shutil +import mlflow from promptflow.client import load_flow from azure.ai.evaluation import evaluate @@ -30,6 +32,7 @@ def get_args(): default="./preprocessed_data_output.jsonl") parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl") parser.add_argument("--evaluators", type=str, dest="evaluators") + parser.add_argument("--evaluator_name_id_map", type=str, dest="evaluator_name_id_map") parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1") args, _ = parser.parse_known_args() @@ -101,6 +104,24 @@ def download_evaluators_and_update_local_path(evaluators): return evaluators +def copy_evaluator_files(command_line_args): + """Copy the mounted evaluator files to the relative paths to enable read/write.""" + evaluators = json.loads(command_line_args["evaluators"]) + evaluator_name_id_map = json.loads(command_line_args["evaluator_name_id_map"]) + for evaluator_name, evaluator_id in evaluator_name_id_map.items(): + dir_path = find_file_and_get_parent_dir(evaluator_id) + if dir_path: + shutil.copytree(dir_path, f"./{evaluator_name}") + logger.info(f"Copying {dir_path} to ./{evaluator_name}") + copied_dir = os.listdir(f"./{evaluator_name}") + logger.info(f"Directory ./{evaluator_name} now contains: {copied_dir}") + sys.path.append(os.path.abspath(f"./{evaluator_name}")) + evaluators[evaluator_name]["local_path"] = os.path.abspath(f"./{evaluator_name}") + else: + logger.info(f"Directory for evaluator {evaluator_name} not found.") + return evaluators + + def load_evaluators(input_evaluators): """Initialize the evaluators using correct parameters and credentials for rai evaluators.""" loaded_evaluators, loaded_evaluator_configs = {}, {} @@ -112,16 +133,25 @@ def load_evaluators(input_evaluators): init_params["credential"] = AzureMLOnBehalfOfCredential() loaded_evaluators[evaluator_name] = flow(**init_params) loaded_evaluator_configs[evaluator_name] = {"column_mapping": evaluator.get("DataMapping", {})} + logger.info(f"Loaded Evaluator: {flow}") + logger.info(f"Using Evaluator: {loaded_evaluators[evaluator_name]}") + logger.info(f"Loaded evaluator config: {loaded_evaluator_configs[evaluator_name]}") return loaded_evaluators, loaded_evaluator_configs def run_evaluation(command_line_args, evaluators, evaluator_configs): """Run the evaluation.""" # Todo: can we get only results back instead of the whole response? + logger.info(f"Running the evaluators: {list(evaluators.keys())}") + logger.info(f"With the evaluator config {evaluator_configs}") results = evaluate(data=command_line_args["preprocessed_data"], evaluators=evaluators, evaluator_config=evaluator_configs) - logger.info("Evaluation Completed") - logger.info("results here", results) + metrics = {} + for metric_name, metric_value in results["metrics"].items(): + logger.info(f"Logging metric added with name {metric_name}, and value {metric_value}") + metrics[metric_name] = metric_value + mlflow.log_metrics(metrics) + logger.info("Evaluation Completed Successfully") final_results = defaultdict(list) for result in results["rows"]: for evaluator_name in evaluators: @@ -130,11 +160,20 @@ def run_evaluation(command_line_args, evaluators, evaluator_configs): if len(filtered_result) == 1: final_results[evaluator_name].append(filtered_result[list(filtered_result.keys())[0]]) else: + if len(filtered_result) == 0: + logger.warning(f"No output score generated for current evaluator {evaluator_name}") logger.info(f"Found multiple results for {evaluator_name}. Adding as json string.") final_results[evaluator_name].append(json.dumps(filtered_result)) final_results = pd.DataFrame(final_results) logger.info(final_results) final_results.to_json(command_line_args["evaluated_data"], orient="records", lines=True) + if results and results.get("rows"): + # Convert the results to a DataFrame + df = pd.DataFrame(results["rows"]) + + # Save the DataFrame as a JSONL file + df.to_json("instance_results.jsonl", orient="records", lines=True) + mlflow.log_artifact("instance_results.jsonl") rai_evaluators = [ @@ -151,7 +190,8 @@ def run_evaluation(command_line_args, evaluators, evaluator_configs): def run(args): """Entry point of model prediction script.""" evaluators = json.loads(args["evaluators"]) - evaluators = download_evaluators_and_update_local_path(evaluators) + # evaluators = download_evaluators_and_update_local_path(evaluators) + evaluators = copy_evaluator_files(args) evaluators, evaluator_configs = load_evaluators(evaluators) run_evaluation(args, evaluators, evaluator_configs) diff --git a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate_online.py b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate_online.py index 54277c4ef1..dfb69ce07b 100644 --- a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate_online.py +++ b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/evaluate_online.py @@ -6,6 +6,7 @@ import preprocess import evaluate import postprocess +import mlflow def get_args(): @@ -23,6 +24,7 @@ def get_args(): default="./preprocessed_data_output.jsonl") parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl") parser.add_argument("--evaluators", type=str, dest="evaluators") + parser.add_argument("--evaluator_name_id_map", type=str, dest="evaluator_name_id_map") parser.add_argument("--service_name", type=str, dest="service_name", default="evaluation.app") args, _ = parser.parse_known_args() @@ -40,4 +42,5 @@ def run(): if __name__ == "__main__": - run() + with mlflow.start_run() as _run: + run() diff --git a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/postprocess.py b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/postprocess.py index 8213bcb7d3..ac279a42ac 100644 --- a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/postprocess.py +++ b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/postprocess.py @@ -12,7 +12,7 @@ from opentelemetry import _logs from opentelemetry.trace.span import TraceFlags from opentelemetry.sdk._logs import LoggerProvider -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter import logging @@ -41,7 +41,6 @@ def configure_logging(args) -> LoggerProvider: logger.info("Configuring logging") provider = LoggerProvider() _logs.set_logger_provider(provider) - provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter())) args["connection_string"] = None if args["connection_string"] == "" else args["connection_string"] provider.add_log_record_processor( BatchLogRecordProcessor(AzureMonitorLogExporter(connection_string=args["connection_string"]))) @@ -103,10 +102,7 @@ def get_combined_data(preprocessed_data, evaluated_data, service_name): def run(args): """Entry point of model prediction script.""" - logger.info( - f"Sampling Rate: {args['sampling_rate']}, Connection String: {args['connection_string']}, " - f"Service Name: {args['service_name']}" - ) + logger.info(f"Commandline args:> Service Name: {args['service_name']}") provider = configure_logging(args) data = get_combined_data(args["preprocessed_data"], args["evaluated_data"], args["service_name"]) diff --git a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/preprocess.py b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/preprocess.py index 041ee1fdb0..a203aa1b51 100644 --- a/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/preprocess.py +++ b/assets/evaluation_on_cloud/environments/evaluations-built-in/context/online_eval/preprocess.py @@ -67,6 +67,7 @@ def get_logs(client, resource_id: str, query: str, start_time: datetime, end_tim raise Exception(f"Unable to parse query results. Unexpected number of tables: {len(data)}.") table = data[0] df = pd.DataFrame(data=table.rows, columns=table.columns) + logger.info(f"Query returned {len(df)} rows, {len(df.columns)} columns, and df.columns: {df.columns}") return df except Exception as e: logger.info("something fatal happened") @@ -76,8 +77,9 @@ def get_logs(client, resource_id: str, query: str, start_time: datetime, end_tim def save_output(result, args): """Save output.""" try: - logger.info("Saving output.") # Todo: One conversation will be split across multiple rows. how to combine them? + logger.info(f"Saving output to {args['preprocessed_data']}") + logger.info(f"First few rows of output: {result.head()}") result.to_json(args["preprocessed_data"], orient="records", lines=True) except Exception as e: logger.info("Unable to save output.")