Skip to content

Commit

Permalink
AOAI eval fixesOnline Evaluation Schedule AOAI Evaluators Support (#3568
Browse files Browse the repository at this point in the history
)

* AOAI eval fixes

* Fix for evaluator issues and logger

* Bug fixes

* Remove debug logs

* check-style fix and logger changes

* Added logger to check query results

---------

Co-authored-by: Ankush Bhatia <[email protected]>
  • Loading branch information
apeddauppari and Ankush Bhatia authored Nov 7, 2024
1 parent 8efee95 commit 3fcebcb
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from collections import defaultdict
import importlib
import sys
import shutil
import mlflow

from promptflow.client import load_flow
from azure.ai.evaluation import evaluate
Expand All @@ -30,6 +32,7 @@ def get_args():
default="./preprocessed_data_output.jsonl")
parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl")
parser.add_argument("--evaluators", type=str, dest="evaluators")
parser.add_argument("--evaluator_name_id_map", type=str, dest="evaluator_name_id_map")
parser.add_argument("--sampling_rate", type=str, dest="sampling_rate", default="1")

args, _ = parser.parse_known_args()
Expand Down Expand Up @@ -101,6 +104,24 @@ def download_evaluators_and_update_local_path(evaluators):
return evaluators


def copy_evaluator_files(command_line_args):
"""Copy the mounted evaluator files to the relative paths to enable read/write."""
evaluators = json.loads(command_line_args["evaluators"])
evaluator_name_id_map = json.loads(command_line_args["evaluator_name_id_map"])
for evaluator_name, evaluator_id in evaluator_name_id_map.items():
dir_path = find_file_and_get_parent_dir(evaluator_id)
if dir_path:
shutil.copytree(dir_path, f"./{evaluator_name}")
logger.info(f"Copying {dir_path} to ./{evaluator_name}")
copied_dir = os.listdir(f"./{evaluator_name}")
logger.info(f"Directory ./{evaluator_name} now contains: {copied_dir}")
sys.path.append(os.path.abspath(f"./{evaluator_name}"))
evaluators[evaluator_name]["local_path"] = os.path.abspath(f"./{evaluator_name}")
else:
logger.info(f"Directory for evaluator {evaluator_name} not found.")
return evaluators


def load_evaluators(input_evaluators):
"""Initialize the evaluators using correct parameters and credentials for rai evaluators."""
loaded_evaluators, loaded_evaluator_configs = {}, {}
Expand All @@ -112,16 +133,25 @@ def load_evaluators(input_evaluators):
init_params["credential"] = AzureMLOnBehalfOfCredential()
loaded_evaluators[evaluator_name] = flow(**init_params)
loaded_evaluator_configs[evaluator_name] = {"column_mapping": evaluator.get("DataMapping", {})}
logger.info(f"Loaded Evaluator: {flow}")
logger.info(f"Using Evaluator: {loaded_evaluators[evaluator_name]}")
logger.info(f"Loaded evaluator config: {loaded_evaluator_configs[evaluator_name]}")
return loaded_evaluators, loaded_evaluator_configs


def run_evaluation(command_line_args, evaluators, evaluator_configs):
"""Run the evaluation."""
# Todo: can we get only results back instead of the whole response?
logger.info(f"Running the evaluators: {list(evaluators.keys())}")
logger.info(f"With the evaluator config {evaluator_configs}")
results = evaluate(data=command_line_args["preprocessed_data"], evaluators=evaluators,
evaluator_config=evaluator_configs)
logger.info("Evaluation Completed")
logger.info("results here", results)
metrics = {}
for metric_name, metric_value in results["metrics"].items():
logger.info(f"Logging metric added with name {metric_name}, and value {metric_value}")
metrics[metric_name] = metric_value
mlflow.log_metrics(metrics)
logger.info("Evaluation Completed Successfully")
final_results = defaultdict(list)
for result in results["rows"]:
for evaluator_name in evaluators:
Expand All @@ -130,11 +160,20 @@ def run_evaluation(command_line_args, evaluators, evaluator_configs):
if len(filtered_result) == 1:
final_results[evaluator_name].append(filtered_result[list(filtered_result.keys())[0]])
else:
if len(filtered_result) == 0:
logger.warning(f"No output score generated for current evaluator {evaluator_name}")
logger.info(f"Found multiple results for {evaluator_name}. Adding as json string.")
final_results[evaluator_name].append(json.dumps(filtered_result))
final_results = pd.DataFrame(final_results)
logger.info(final_results)
final_results.to_json(command_line_args["evaluated_data"], orient="records", lines=True)
if results and results.get("rows"):
# Convert the results to a DataFrame
df = pd.DataFrame(results["rows"])

# Save the DataFrame as a JSONL file
df.to_json("instance_results.jsonl", orient="records", lines=True)
mlflow.log_artifact("instance_results.jsonl")


rai_evaluators = [
Expand All @@ -151,7 +190,8 @@ def run_evaluation(command_line_args, evaluators, evaluator_configs):
def run(args):
"""Entry point of model prediction script."""
evaluators = json.loads(args["evaluators"])
evaluators = download_evaluators_and_update_local_path(evaluators)
# evaluators = download_evaluators_and_update_local_path(evaluators)
evaluators = copy_evaluator_files(args)
evaluators, evaluator_configs = load_evaluators(evaluators)
run_evaluation(args, evaluators, evaluator_configs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import preprocess
import evaluate
import postprocess
import mlflow


def get_args():
Expand All @@ -23,6 +24,7 @@ def get_args():
default="./preprocessed_data_output.jsonl")
parser.add_argument("--evaluated_data", type=str, dest="evaluated_data", default="./evaluated_data_output.jsonl")
parser.add_argument("--evaluators", type=str, dest="evaluators")
parser.add_argument("--evaluator_name_id_map", type=str, dest="evaluator_name_id_map")
parser.add_argument("--service_name", type=str, dest="service_name", default="evaluation.app")

args, _ = parser.parse_known_args()
Expand All @@ -40,4 +42,5 @@ def run():


if __name__ == "__main__":
run()
with mlflow.start_run() as _run:
run()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from opentelemetry import _logs
from opentelemetry.trace.span import TraceFlags
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter

import logging
Expand Down Expand Up @@ -41,7 +41,6 @@ def configure_logging(args) -> LoggerProvider:
logger.info("Configuring logging")
provider = LoggerProvider()
_logs.set_logger_provider(provider)
provider.add_log_record_processor(BatchLogRecordProcessor(ConsoleLogExporter()))
args["connection_string"] = None if args["connection_string"] == "" else args["connection_string"]
provider.add_log_record_processor(
BatchLogRecordProcessor(AzureMonitorLogExporter(connection_string=args["connection_string"])))
Expand Down Expand Up @@ -103,10 +102,7 @@ def get_combined_data(preprocessed_data, evaluated_data, service_name):

def run(args):
"""Entry point of model prediction script."""
logger.info(
f"Sampling Rate: {args['sampling_rate']}, Connection String: {args['connection_string']}, "
f"Service Name: {args['service_name']}"
)
logger.info(f"Commandline args:> Service Name: {args['service_name']}")
provider = configure_logging(args)
data = get_combined_data(args["preprocessed_data"], args["evaluated_data"],
args["service_name"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def get_logs(client, resource_id: str, query: str, start_time: datetime, end_tim
raise Exception(f"Unable to parse query results. Unexpected number of tables: {len(data)}.")
table = data[0]
df = pd.DataFrame(data=table.rows, columns=table.columns)
logger.info(f"Query returned {len(df)} rows, {len(df.columns)} columns, and df.columns: {df.columns}")
return df
except Exception as e:
logger.info("something fatal happened")
Expand All @@ -76,8 +77,9 @@ def get_logs(client, resource_id: str, query: str, start_time: datetime, end_tim
def save_output(result, args):
"""Save output."""
try:
logger.info("Saving output.")
# Todo: One conversation will be split across multiple rows. how to combine them?
logger.info(f"Saving output to {args['preprocessed_data']}")
logger.info(f"First few rows of output: {result.head()}")
result.to_json(args["preprocessed_data"], orient="records", lines=True)
except Exception as e:
logger.info("Unable to save output.")
Expand Down

0 comments on commit 3fcebcb

Please sign in to comment.