diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/__init__.py new file mode 100644 index 0000000000..40c0430a74 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .collector import Collector + +__all__ = ["Collector"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector.py new file mode 100644 index 0000000000..2770ac9b63 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector.py @@ -0,0 +1,28 @@ +"""For collector.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typing import Callable, Any + +from .collector_json import JsonCollector +from .context import CorrelationContext + + +class Collector: + """For collector class.""" + + def __init__( + self, + *, + name: str, + on_error: Callable[[Exception], Any] = None + ): + """For init.""" + self._impl = JsonCollector(name=name, on_error=on_error) + + def collect( + self, + data, # supported type: Union[pd.DataFrame] + correlation_context: CorrelationContext = None) -> CorrelationContext: + """For collect.""" + return self._impl.collect(data, correlation_context) diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_base.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_base.py new file mode 100644 index 0000000000..d0e830793a --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_base.py @@ -0,0 +1,34 @@ +"""For Collector base.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging + +from .init import init, is_sdk_ready +from .config import get_config +from .context import CorrelationContext, get_context_wrapper + + +class CollectorBase: + """For CollectorBase.""" + + def __init__( + self, + model_version: str): + """For init.""" + if not is_sdk_ready(): + init(model_version) + + self.logger = logging.getLogger("mdc.collector") + self.config = get_config() + + def _response( + self, + context: CorrelationContext, + success: bool, + message: str) -> CorrelationContext: + """For response.""" + if self.config.is_debug(): + return get_context_wrapper(context, success, message) + + return context diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_json.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_json.py new file mode 100644 index 0000000000..e230aa5717 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/collector_json.py @@ -0,0 +1,128 @@ +"""For collector json.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +import random +from typing import Callable, Any + +from .payload import PandasFrameData +from .payload.payload import build_payload +from .queue import get_queue + +from .collector_base import CollectorBase +from .context import CorrelationContext, get_context + +try: + import pandas as pd +except ImportError: + pass + + +def _build_log_data_by_type(data): + """For build log data by type.""" + if 'pandas' in sys.modules and isinstance(data, pd.DataFrame): + return PandasFrameData(data) + + raise TypeError("data type (%s) not supported, " + "supported types: pandas.DataFrame" + % type(data).__name__) + + +def _raise_if_exception(e: Exception): + """For raise if exception.""" + raise e + + +class JsonCollector(CollectorBase): + """For JsonCollector.""" + + def __init__( + self, + *, + name: str, + on_error: Callable[[Exception], Any] = None + ): + """For init.""" + super().__init__("default") + self.name = name + if on_error: + self.on_error = on_error + else: + self.on_error = _raise_if_exception + + self._validate_mdc_config() + + def _validate_mdc_config(self): + """For validate mdc config.""" + if not self.name or len(self.name) <= 0: + # unexpected drop + msg = "collection name is not provided" + self.on_error(Exception(msg)) + return False, msg + + config = self.config + if config is None: + # unexpected drop + msg = "data collector is not initialized" + self.on_error(Exception(msg)) + return False, msg + + if not config.enabled(): + # unexpected drop + msg = "custom logging is not enabled, drop the data" + self.on_error(Exception(msg)) + return False, msg + + if not config.collection_enabled(self.name): + # unexpected drop + msg = "collection {} is not enabled, drop the data".format(self.name) + self.on_error(Exception(msg)) + return False, msg + + return True, None + + def collect( + self, + data, # supported type: Union[pd.DataFrame] + correlation_context: CorrelationContext = None) -> CorrelationContext: + """For collect.""" + if correlation_context is None: + correlation_context = get_context() + + success, msg = self._validate_mdc_config() + + if not success: + return self._response(correlation_context, False, msg) + + config = self.config + + percentage = config.collection_sample_rate_percentage(self.name) + + if percentage < 100: + if percentage <= random.random() * 100.0: + # expected drop + self.logger.debug("sampling not hit, drop the data") + # TBD: send empty message to mdc to collect metrics of dropped messages? + return self._response(correlation_context, False, "dropped_sampling") + + try: + # build payload and put into payload queue + log_data = _build_log_data_by_type(data) + except TypeError as e: + # unexpected drop + self.on_error(e) + return self._response(correlation_context, False, e.args[0]) + + payload = build_payload( + self.name, + data=log_data, + model_version=config.model_version(), + context=correlation_context) + + success, msg = get_queue().enqueue(payload) + + if not success: + # unexpected drop + self.on_error(Exception(msg)) + return self._response(correlation_context, success, msg) diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/common/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/common/__init__.py new file mode 100644 index 0000000000..af30b4fff7 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/common/__init__.py @@ -0,0 +1,5 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +__version__ = "0.1.0" diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/__init__.py new file mode 100644 index 0000000000..e8f0698957 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .config import init_config, teardown_config, is_debug, get_config + +__all__ = ["init_config", "teardown_config", "is_debug", "get_config"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/config.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/config.py new file mode 100644 index 0000000000..7cad6fbcf0 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/config/config.py @@ -0,0 +1,215 @@ +"""For config.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# pylint: disable=global-statement +import os +import json + +from ..logger import is_debug, logger + +default_queue_capacity = 1500 +default_worker_count = 1 +default_sample_rate_percentage = 100 + + +class MdcConfig: + """For MdcConfig.""" + + # pylint: disable=too-many-instance-attributes + def __init__( + self, + enabled=False, + host="127.0.0.1", + port=50011, + debug=False, + sample_rate_percentage=default_sample_rate_percentage, + model_version=None, + queue_capacity=default_queue_capacity, + worker_disabled=False, + worker_count=default_worker_count, + local_capture=False, + compact_format=False, + ): + """For init.""" + self._debug = debug + self._enabled = enabled + self._sample_rate_percentage = sample_rate_percentage + self._host = host + self._port = port + self._model_version = model_version + # queue - max length + self._queue_capacity = queue_capacity + # worker - disabled for test purpose only + self._worker_disabled = worker_disabled + self._worker_count = worker_count + # payload sender + self._local_capture = local_capture + self._compact_format = compact_format + self._collections = {} + + def is_debug(self): + """For is debug.""" + return self._debug + + def enabled(self): + """For enabled.""" + return self._enabled + + def set_enabled(self, enabled): + """For set enabled.""" + self._enabled = enabled + + def compact_format(self): + """For compact format.""" + return self._compact_format + + def sample_rate_percentage(self): + """For sample rate percentage.""" + return self._sample_rate_percentage + + def host(self): + """For host.""" + return self._host + + def port(self): + """For port.""" + return self._port + + def model_version(self): + """For model version.""" + return self._model_version + + def queue_capacity(self): + """For queue capacity.""" + return self._queue_capacity + + def worker_disabled(self): + """For worker disabled.""" + return self._worker_disabled + + def worker_count(self): + """For worker count.""" + return self._worker_count + + def local_capture(self): + """For local capture.""" + return self._local_capture + + def add_collection(self, col_name, enabled=False, sample_rate_percentage=100): + """For add collection.""" + self._collections[col_name] = { + "enabled": enabled, + "sampleRatePercentage": sample_rate_percentage, + } + + def collections(self): + """For collections.""" + return self._collections + + def collection_enabled(self, collection_name): + """For collection enabled.""" + path = os.getenv("AZUREML_MDC_CONFIG_PATH") + if not path: + # for legacy settings, we depend on a global switch to see whether collections are enabled or not. + return self.enabled() + + for n, c in self._collections.items(): + if n == collection_name: + return c.get("enabled", False) + + return False + + def collection_sample_rate_percentage(self, collection_name): + """For collection sample rate percentage.""" + path = os.getenv("AZUREML_MDC_CONFIG_PATH") + if not path: + # for legacy settings, we take the global sample_rate_percentage. + return self.sample_rate_percentage() + + for n, c in self._collections.items(): + if n == collection_name: + return c.get("sampleRatePercentage", default_sample_rate_percentage) + + return default_sample_rate_percentage + + +def loadConfig(model_version=None): + """For loadConfig.""" + debug = is_debug() + path = os.getenv("AZUREML_MDC_CONFIG_PATH") + + if path: + with open(path) as f: + cfg = json.load(f) + mdc_cfg = MdcConfig( + host=os.getenv("AZUREML_MDC_HOST", "127.0.0.1"), + port=int(os.getenv("AZUREML_MDC_PORT", "50011")), + debug=debug, + model_version=model_version, + local_capture=cfg.get("runMode", "cloud") == "local" + ) + + collection_cfg = cfg.get("collections", {}) + custom_logging_enabled = False + for col_name, c in collection_cfg.items(): + col_name_lower = col_name.lower() + if c.get("enabled", False): + mdc_cfg.add_collection(col_name_lower, True, c.get("sampleRatePercentage", 100)) + if col_name_lower not in ('request', 'response'): + custom_logging_enabled = True + + mdc_cfg.set_enabled(custom_logging_enabled) + + return mdc_cfg + + enabled = os.getenv("AZUREML_MDC_ENABLED", "false") + if enabled.lower() == "true": + return MdcConfig( + enabled=True, + host=os.getenv("AZUREML_MDC_HOST", "127.0.0.1"), + port=int(os.getenv("AZUREML_MDC_PORT", "50011")), + debug=debug, + sample_rate_percentage=int(os.getenv("AZUREML_MDC_SAMPLE_RATE", str(default_sample_rate_percentage))), + queue_capacity=int(os.getenv("AZUREML_MDC_QUEUE_CAPACITY", str(default_queue_capacity))), + worker_disabled=os.getenv("AZUREML_MDC_WORKER_DISABLED", "false").lower() == "true", + worker_count=int(os.getenv("AZUREML_MDC_WORKER_COUNT", str(default_worker_count))), + compact_format=os.getenv("AZUREML_MDC_FORMAT_COMPACT", "false").lower() == "true", + local_capture=os.getenv("AZUREML_MDC_LOCAL_CAPTURE", "false").lower() == "true", + model_version=model_version, + ) + + return MdcConfig(enabled=False, debug=debug) + + +mdc_config = None + + +def init_config(model_version=None): + """For init config.""" + global mdc_config + mdc_config = loadConfig(model_version) + + logger.info("mdc enabled: %r", mdc_config.enabled()) + logger.info("mdc collections count %d", len(mdc_config.collections())) + for n, c in mdc_config.collections().items(): + logger.info("mdc collection %s ", + n, + c.get("enabled", False), + c.get("sampleRatePercentage", default_sample_rate_percentage)) + + if mdc_config.is_debug(): + config_json = json.dumps(mdc_config.__dict__) + logger.debug("mdc config: %s", config_json) + + +def teardown_config(): + """For teardown config.""" + global mdc_config + mdc_config = None + + +def get_config(): + """For get config.""" + global mdc_config + return mdc_config diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/__init__.py new file mode 100644 index 0000000000..8863baeb75 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .context import CorrelationContext, BasicCorrelationContext, get_context, get_context_wrapper + +__all__ = ["CorrelationContext", "BasicCorrelationContext", "get_context", "get_context_wrapper"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/context.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/context.py new file mode 100644 index 0000000000..d08c90abda --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/context/context.py @@ -0,0 +1,94 @@ +"""For context.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import uuid +import time + + +class CorrelationContext: + """For CorrelationContext.""" + + def __init__(self): + """For init.""" + pass + + @staticmethod + def get_id() -> str: + """For get id.""" + return "" + + def __str__(self): + """For str.""" + return self.get_id() + + def __repr__(self): + """For repr.""" + return self.__str__() + + +class BasicCorrelationContext(CorrelationContext): + """For BasicCorrelationContext.""" + + # pylint: disable=redefined-builtin + def __init__(self, id: str = None, timestamp: int = None, headers=None): + """For init.""" + super().__init__() + self.id = id if id else str(uuid.uuid4()) + self.timestamp = timestamp if timestamp else int(time.time()) + self.headers = headers if headers else {} + # pylint: enable=redefined-builtin + + def get_id(self) -> str: + """For get id.""" + return self.id + + def get_timestamp(self) -> int: + """For get timestamp.""" + return self.timestamp + + def get_headers(self) -> dict: + """For get headers.""" + return self.headers + + +class WrapperContext(CorrelationContext): + """For WrapperContext.""" + + def __init__(self, correlation_context: CorrelationContext, success: bool, message: str): + """For init.""" + super().__init__() + self._context = correlation_context + self._success = success + self._message = message + + def get_id(self) -> str: + """For get id.""" + return self._context.get_id() + + def get_timestamp(self) -> int: + """For get timestamp.""" + return self._context.get_timestamp() + + def get_headers(self) -> dict: + """For get headers.""" + return self._context.get_headers() + + def is_success(self) -> bool: + """For is success.""" + return self._success + + def get_message(self) -> str: + """For get message.""" + return self._message + + +def get_context() -> CorrelationContext: + """For get context.""" + return BasicCorrelationContext() + + +# test purpose +def get_context_wrapper(context: CorrelationContext, success: bool, message: str) -> CorrelationContext: + """For get context wrapper.""" + return WrapperContext(context, success, message) diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/init.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/init.py new file mode 100644 index 0000000000..a45c362b98 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/init.py @@ -0,0 +1,70 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# pylint: disable=global-statement +from .logger import init_logging, logger +from .config import init_config, teardown_config, get_config +from .queue import init_queue, teardown_queue, get_queue +from .worker import init_worker, teardown_worker + +sdk_ready = False + + +def is_sdk_ready(): + """For is sdk ready.""" + global sdk_ready + return sdk_ready + + +def init(model_version=None): + """For init.""" + global sdk_ready + if sdk_ready: + return + + init_logging() + + logger.info("init data collector") + + # init configuration for MDC + init_config(model_version) + + config = get_config() + if config.is_debug(): + logger.warning("data collector debugging is enabled") + + if not config.enabled(): + logger.warning("data collector is not enabled") + return + + # create default payload queue in memory + init_queue(config.queue_capacity()) + logger.debug("data collector in-memory queue created") + + # start worker thread for payload sending + queue = get_queue() + if not config.worker_disabled(): + init_worker(queue, config) + logger.debug("data collector worker started") + else: + logger.warning("data collector worker is disabled") + + # init done + logger.info("data collector ready") + sdk_ready = True + + +def teardown(wait_for_flush=False): + """For teardown.""" + global sdk_ready + if not sdk_ready: + return + + logger.debug("tear down data collector") + teardown_worker(wait_for_flush) + teardown_queue() + teardown_config() + + # tear down done + sdk_ready = False diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/__init__.py new file mode 100644 index 0000000000..451d285146 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .logger import init_logging, is_debug, logger + +__all__ = ["init_logging", "is_debug", "logger"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/logger.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/logger.py new file mode 100644 index 0000000000..fab24b0d80 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/logger/logger.py @@ -0,0 +1,150 @@ +"""For logger.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import os +import sys +from typing import Any, Dict +import logging +from logging.config import dictConfig + +logger = logging.getLogger("mdc") + +DEFAULT_SDK_LOGGING_CONFIG: Dict[str, Any] = dict( + version=1, + disable_existing_loggers=False, + loggers={ + "mdc": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "mdc.root": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "mdc.collector": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "mdc.worker": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "local.capture": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "mdc.sender": { + "level": "WARN", + "propagate": False, + "handlers": ["console"] + }, + "mdc.error": { + "level": "ERROR", + "handlers": ["error_console"], + "propagate": False, + "qualname": "mdc.error", + }, + }, + handlers={ + "console": { + "class": "logging.StreamHandler", + "formatter": "generic", + "stream": sys.stdout, + }, + "error_console": { + "class": "logging.StreamHandler", + "formatter": "generic", + "stream": sys.stderr, + }, + }, + formatters={ + "generic": { + "format": "%(asctime)s | %(name)s | %(levelname)s | %(message)s", + # "datefmt": "%Y-%m-%d %H:%M:%S", + "class": "logging.Formatter", + }, + }, +) + +DEBUG_SDK_LOGGING_CONFIG: Dict[str, Any] = dict( + version=1, + disable_existing_loggers=False, + loggers={ + "mdc": { + "level": "DEBUG", + "propagate": False, + "handlers": ["console"] + }, + "mdc.root": { + "level": "DEBUG", + "propagate": False, + "handlers": ["console"] + }, + "mdc.collector": { + "level": "DEBUG", + "propagate": False, + "handlers": ["console"] + }, + "mdc.worker": { + "level": "DEBUG", + "propagate": False, + "handlers": ["console"] + }, + "local.capture": { + "level": "DEBUG", + "propagate": False, + "handlers": ["console"] + }, + "mdc.sender": { + "level": "INFO", + "propagate": False, + "handlers": ["console"] + }, + "mdc.error": { + "level": "ERROR", + "handlers": ["error_console"], + "propagate": False, + "qualname": "mdc.error", + }, + }, + handlers={ + "console": { + "class": "logging.StreamHandler", + "formatter": "generic", + "stream": sys.stdout, + }, + "error_console": { + "class": "logging.StreamHandler", + "formatter": "generic", + "stream": sys.stderr, + }, + }, + formatters={ + "generic": { + "format": "%(asctime)s | %(name)s | %(levelname)s | %(message)s", + # "datefmt": "%Y-%m-%d %H:%M:%S", + "class": "logging.Formatter", + }, + }, +) + + +def is_debug(): + """For is debug.""" + return os.getenv("AZUREML_MDC_DEBUG", "false").lower() == "true" + + +def init_logging(): + """For init logging.""" + if is_debug(): + log_config = DEBUG_SDK_LOGGING_CONFIG + else: + log_config = DEFAULT_SDK_LOGGING_CONFIG + + dictConfig(log_config) diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/__init__.py new file mode 100644 index 0000000000..5bc8e638c1 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/__init__.py @@ -0,0 +1,8 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .payload import build_payload +from .logdata import PandasFrameData + +__all__ = ["build_payload", "PandasFrameData"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/logdata.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/logdata.py new file mode 100644 index 0000000000..4ebacef460 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/logdata.py @@ -0,0 +1,45 @@ +"""For logdata.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +def get_type_fullname(o): + """For get type fullname.""" + c = o.__class__ + module = c.__module__ + if module == 'builtins': + return "python." + c.__qualname__ # avoid outputs like 'builtins.str' + return module + '.' + c.__qualname__ + + +class LogData(dict): + """For LogData.""" + + def __init__(self, data): + """For init.""" + dict.__init__(self) + self._data = data + + def type(self): + """For type.""" + return get_type_fullname(self._data) + + def to_json(self): + """For to json.""" + pass + + def to_bytes(self): + """For to bytes.""" + pass + + +class PandasFrameData(LogData): + """For PandasFrameData.""" + + def to_json(self): + """For to json.""" + return self._data.to_json(orient="records") + + def to_bytes(self): + """For to bytes.""" + return bytes(self._data.to_string(), "utf-8") diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/payload.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/payload.py new file mode 100644 index 0000000000..9a301bd6f4 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/payload/payload.py @@ -0,0 +1,134 @@ +"""For payload.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import time +import uuid + +from .logdata import LogData +from ..logger import logger +from ..common import __version__ +from ..config import get_config + + +class Payload: + """For payload class.""" + + def __init__(self, designation, data, model_version, timestamp, correlation_id, headers): + """For init.""" + self._id = str(uuid.uuid4()) + self._designation = designation + self._model_version = model_version + self._agent = "azureml-ai-monitoring/" + __version__ + self._contenttype = "" + + if not isinstance(data, LogData): + raise TypeError("argument data (%s) must be one of LogData types." % type(data).__name__) + + self._data = data + self._correlation_id = correlation_id + if timestamp is not None: + self._time = timestamp + else: + self._time = int(time.time()) + self._headers = headers + + def time(self): + """For time.""" + return self._time + + def id(self): + """For id.""" + return self._id + + def designation(self): + """For designation.""" + return self._designation + + def model_version(self): + """For model version.""" + return self._model_version + + def agent(self): + """For agent.""" + return self._agent + + def correlation_id(self): + """For correlation id.""" + return self._correlation_id + + def headers(self): + """For headers.""" + return self._headers + + def type(self): + """For type.""" + return self._data.type() + + def contenttype(self): + """For contenttype.""" + return self._contenttype + + def content(self): + """For content.""" + pass + + +class JsonPayload(Payload): + """For JsonPayload.""" + + def __init__(self, designation, data, model_version=None, timestamp=None, correlation_id=None, headers=None): + """For init.""" + if headers is None: + headers = {} + super().__init__(designation, data, model_version, timestamp, correlation_id, headers) + self._contenttype = "application/json" + + def content(self): + """For content.""" + return self._data.to_json() + + +class CompactPayload(Payload): + """For CompactPayload.""" + + def __init__(self, designation, data, model_version=None, timestamp=None, correlation_id=None, headers=None): + """For init.""" + if headers is None: + headers = {} + super().__init__(designation, data, model_version, timestamp, correlation_id, headers) + self._contenttype = "application/octet-stream" + + def content(self): + """For content.""" + return self._data.to_bytes() + + +def build_payload(designation, data, model_version=None, context=None): + """For build payload.""" + logger.debug("building payload for collection %s, data type: %s", designation, type(data).__name__) + headers = {} + timestamp = None + correlation_id = None + if context is not None: + correlation_id = context.get_id() + timestamp = context.get_timestamp() + headers = context.get_headers() + + config = get_config() + if config.compact_format(): + return CompactPayload( + designation, + data, + model_version=model_version, + correlation_id=correlation_id, + timestamp=timestamp, + headers=headers) + + return JsonPayload( + designation, + data, + model_version=model_version, + correlation_id=correlation_id, + timestamp=timestamp, + headers=headers) diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/__init__.py new file mode 100644 index 0000000000..66696978bb --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .queue import init_queue, teardown_queue, get_queue + +__all__ = ["init_queue", "teardown_queue", "get_queue"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/queue.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/queue.py new file mode 100644 index 0000000000..2a0558daa8 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/queue/queue.py @@ -0,0 +1,96 @@ +"""For queue.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# pylint: disable=global-statement +from threading import Condition +from ..logger import logger + + +# A blocking queue +class MdcQueue: + """For MdcQueue.""" + + def __init__(self, capacity): + """For init.""" + self._max_length = capacity + self._count = 0 + self._queue = [] + self._closed = False + self._condition = Condition() + + def capacity(self): + """For capacity.""" + return self._max_length + + def length(self): + """For length.""" + return self._count + + def close(self): + """For close.""" + self._condition.acquire() + self._closed = True + self._condition.notify_all() + self._condition.release() + logger.info("data collector queue closed") + + def enqueue(self, payload): + """For enqueue.""" + if payload is None: + return False, "payload is None" + + self._condition.acquire() + if self._closed: + self._condition.release() + return False, "queue closed" + + if self._count < self._max_length: + self._queue.append(payload) + self._count = self._count + 1 + self._condition.notify_all() + self._condition.release() + return True, "accepted" + + self._condition.release() + return (False, + "Too many requests lead to message queue is full, please reduce the number of requests.") + + def dequeue(self): + """For dequeue.""" + self._condition.acquire() + while True: + if self._count > 0: + payload = self._queue.pop(0) + length = len(self._queue) + self._count = self._count - 1 + self._condition.release() + return payload, length + if self._closed: + self._condition.release() + return None, 0 + # wait for new item enqueue or queue closed + self._condition.wait() + + +mdc_queue = None + + +def init_queue(capacity): + """For init queue.""" + global mdc_queue + logger.debug("init data collector queue, capacity: %d", capacity) + mdc_queue = MdcQueue(capacity) + + +def teardown_queue(): + """For teardown queue.""" + global mdc_queue + logger.debug("tear down data collector queue") + mdc_queue = None + + +def get_queue(): + """For get queue.""" + global mdc_queue + return mdc_queue diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/__init__.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/__init__.py new file mode 100644 index 0000000000..2218576c14 --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/__init__.py @@ -0,0 +1,7 @@ +"""For init.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .worker import init_worker, get_worker, teardown_worker + +__all__ = ["init_worker", "get_worker", "teardown_worker"] diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/sender.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/sender.py new file mode 100644 index 0000000000..281ce1039f --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/sender.py @@ -0,0 +1,135 @@ +"""For sender.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging +import time +# pylint: disable=import-error +import requests +# pylint: enable=import-error + + +def create_sender(config): + """For create sender.""" + if config.local_capture(): + return LocalCapture() + + return MdcSender(config.host(), config.port()) + + +def _is_success(status_code): + """For is success.""" + return 200 <= status_code < 300 + + +class MdcSender: + """For MdcSender.""" + + def __init__(self, host, port): + """For init.""" + self.logger = logging.getLogger("mdc.sender") + self.host = host + self.port = port + self.path = "/modeldata/log" + + def send(self, payload): + """For send.""" + designation = payload.designation() + self.logger.debug("sending payload to mdc: %s@%s", designation, payload.id()) + + event_type = "azureml.inference.%s" % designation + headers = { + "Accept": "*/*", + "Content-Type": payload.contenttype(), + # MDC use trace id to track the id for request/payload + "trace-id": payload.id(), + # event attributes + "ce-time": str(payload.time()), + "ce-type": event_type, + "ce-collect-data-type": payload.type(), + "ce-agent": payload.agent(), + } + correlation_id = payload.correlation_id() + if correlation_id is not None: + headers["ce-x-request-id"] = correlation_id + model_version = payload.model_version() + if model_version is not None: + headers["ce-model-version"] = model_version + + merged_headers = {} + # add prefix "ce-" on each header from payload + for key, value in payload.headers().items(): + merged_headers["ce-%s" % key] = str(value) + # overwrite during merge if has duplicate headers + for key in headers: + merged_headers[key] = headers[key] + + content = payload.content() + if payload.contenttype() == "application/json": + return self._send_json(merged_headers, content) + + return self._send_binary(merged_headers, content) + + def _send_json(self, headers, json_data): + """For send json.""" + if not isinstance(json_data, str): + raise TypeError("json_data: str type expected") + + try: + status, msg = self._post_request(self._build_url(), json_data, headers) + if not _is_success(status): + self.logger.error("failed to send json payload: %d, %s", status, msg) + return False, "%d: %s" % (status, msg) + + return True, "%d: %s" % (status, msg) + except requests.exceptions.RequestException as ex: + self.logger.error("mdc request raise exception: %s", ex) + return False, str(ex) + + def _send_binary(self, headers, binary_data): + """For send binary.""" + if not isinstance(binary_data, bytes): + raise TypeError("binary_data: bytes type expected, actual - %s" % type(binary_data).__name__) + + # TODO: split payload if it is large? + try: + status, msg = self._post_request(self._build_url(), binary_data, headers) + if not _is_success(status): + self.logger.error("failed to send binary payload: %d, %s", status, msg) + return False, "%d: %s" % (status, msg) + + return True, "%d: %s" % (status, msg) + except requests.exceptions.RequestException as ex: + self.logger.error("mdc request raise exception: %s", ex) + return False, str(ex) + + def _build_url(self): + """For build url.""" + return "http://%s:%d%s" % (self.host, self.port, self.path) + + def _post_request(self, url, payload, headers=None): + """For post request.""" + self.logger.debug("posting request to %s", url) + self.logger.debug("request headers: %s", headers) + self.logger.debug("request payload(%s): %s", type(payload), payload) + + r = requests.post(url, data=payload, headers=headers) + self.logger.debug("response: %d, %s", r.status_code, r.text) + return r.status_code, r.text + + +class LocalCapture: + """For LocalCaptures.""" + + def __init__(self): + """For init.""" + self.logger = logging.getLogger("local.capture") + + def send(self, payload): + """For send.""" + self.logger.info("%s | %s | %s | %s", + time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(payload.time())), + payload.designation(), + payload.correlation_id(), + payload.content()) + return True, "done" diff --git a/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/worker.py b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/worker.py new file mode 100644 index 0000000000..2295765f5a --- /dev/null +++ b/assets/inference/environments/mlflow-ubuntu22.04-py312-cpu-inference/context/mlmonitoring/worker/worker.py @@ -0,0 +1,103 @@ +"""For worker.""" +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +# pylint: disable=global-statement +import logging +from threading import Thread +import traceback + +from .sender import create_sender + + +class MdcWorker: + """For MdcWorker.""" + + def __init__(self, queue, config): + """For init.""" + self.logger = logging.getLogger("mdc.worker") + self._queue = queue + self._config = config + self._stopped = True + self._threads = [] + self._sender = create_sender(config) + + def start(self): + """For start.""" + self._stopped = False + # create worker threads + for index in range(self._config.worker_count()): + thread = Thread(target=self._thread_run, kwargs={'index': index}, daemon=True) + self._threads.append(thread) + # start all + for thread in self._threads: + thread.start() + + def enqueue(self, payload): + """For enqueue.""" + return self._queue.enqueue(payload) + + def stop(self, wait_for_flush=False): + """For stop.""" + if not wait_for_flush: + self._stopped = True + + # must stop the queue, or threads may keep waiting on empty queue without exit. + self._queue.close() + + for thread in self._threads: + thread.join() + + self._threads = [] + self.logger.debug("data collector worker stopped") + + def _thread_run(self, index): + """For thread run.""" + self.logger.debug("worker thread %d: start", index) + while not self._stopped: + payload, queue_len = self._queue.dequeue() + if payload is None: # queue closed + break + + # processing the payload + self.logger.debug("worker thread %d: sending payload %s@%s, queue length - %d", + index, payload.designation(), payload.id(), queue_len) + try: + succeed, msg = self._sender.send(payload) + if not succeed: + self.logger.error("worker thread %d: send payload %s@%s failed - %s", + index, payload.designation(), payload.id(), msg) + else: + self.logger.debug("worker thread %d: send payload %s@%s succeeded - %s", + index, payload.designation(), payload.id(), msg) + except TypeError as err: + traceback.print_exc() + self.logger.error("worker thread %d: send payload %s@%s raise exception: %s", + index, payload.designation(), payload.id(), "{0}".format(err)) + + self.logger.debug("worker thread %d: exit", index) + + +mdc_worker = None + + +def init_worker(queue, config): + """For init worker.""" + global mdc_worker + mdc_worker = MdcWorker(queue, config) + # start worker thread + mdc_worker.start() + + +def get_worker(): + """For get worker.""" + global mdc_worker + return mdc_worker + + +def teardown_worker(wait_for_flush=False): + """For teardown worker.""" + global mdc_worker + if mdc_worker is not None: + mdc_worker.stop(wait_for_flush) + mdc_worker = None