Skip to content

Commit

Permalink
Added mlmonitoring directory in mlflow-ubuntu22.04-py312-cpu-inferenc…
Browse files Browse the repository at this point in the history
…e environment
  • Loading branch information
Tarun-Chevula committed Nov 7, 2024
1 parent 65acc57 commit f66eb9b
Show file tree
Hide file tree
Showing 20 changed files with 1,287 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from .collector import Collector

__all__ = ["Collector"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""For collector."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from typing import Callable, Any

from .collector_json import JsonCollector
from .context import CorrelationContext


class Collector:
"""For collector class."""

def __init__(
self,
*,
name: str,
on_error: Callable[[Exception], Any] = None
):
"""For init."""
self._impl = JsonCollector(name=name, on_error=on_error)

def collect(
self,
data, # supported type: Union[pd.DataFrame]
correlation_context: CorrelationContext = None) -> CorrelationContext:
"""For collect."""
return self._impl.collect(data, correlation_context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""For Collector base."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import logging

from .init import init, is_sdk_ready
from .config import get_config
from .context import CorrelationContext, get_context_wrapper


class CollectorBase:
"""For CollectorBase."""

def __init__(
self,
model_version: str):
"""For init."""
if not is_sdk_ready():
init(model_version)

self.logger = logging.getLogger("mdc.collector")
self.config = get_config()

def _response(
self,
context: CorrelationContext,
success: bool,
message: str) -> CorrelationContext:
"""For response."""
if self.config.is_debug():
return get_context_wrapper(context, success, message)

return context
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""For collector json."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import sys
import random
from typing import Callable, Any

from .payload import PandasFrameData
from .payload.payload import build_payload
from .queue import get_queue

from .collector_base import CollectorBase
from .context import CorrelationContext, get_context

try:
import pandas as pd
except ImportError:
pass


def _build_log_data_by_type(data):
"""For build log data by type."""
if 'pandas' in sys.modules and isinstance(data, pd.DataFrame):
return PandasFrameData(data)

raise TypeError("data type (%s) not supported, "
"supported types: pandas.DataFrame"
% type(data).__name__)


def _raise_if_exception(e: Exception):
"""For raise if exception."""
raise e


class JsonCollector(CollectorBase):
"""For JsonCollector."""

def __init__(
self,
*,
name: str,
on_error: Callable[[Exception], Any] = None
):
"""For init."""
super().__init__("default")
self.name = name
if on_error:
self.on_error = on_error
else:
self.on_error = _raise_if_exception

self._validate_mdc_config()

def _validate_mdc_config(self):
"""For validate mdc config."""
if not self.name or len(self.name) <= 0:
# unexpected drop
msg = "collection name is not provided"
self.on_error(Exception(msg))
return False, msg

config = self.config
if config is None:
# unexpected drop
msg = "data collector is not initialized"
self.on_error(Exception(msg))
return False, msg

if not config.enabled():
# unexpected drop
msg = "custom logging is not enabled, drop the data"
self.on_error(Exception(msg))
return False, msg

if not config.collection_enabled(self.name):
# unexpected drop
msg = "collection {} is not enabled, drop the data".format(self.name)
self.on_error(Exception(msg))
return False, msg

return True, None

def collect(
self,
data, # supported type: Union[pd.DataFrame]
correlation_context: CorrelationContext = None) -> CorrelationContext:
"""For collect."""
if correlation_context is None:
correlation_context = get_context()

success, msg = self._validate_mdc_config()

if not success:
return self._response(correlation_context, False, msg)

config = self.config

percentage = config.collection_sample_rate_percentage(self.name)

if percentage < 100:
if percentage <= random.random() * 100.0:
# expected drop
self.logger.debug("sampling not hit, drop the data")
# TBD: send empty message to mdc to collect metrics of dropped messages?
return self._response(correlation_context, False, "dropped_sampling")

try:
# build payload and put into payload queue
log_data = _build_log_data_by_type(data)
except TypeError as e:
# unexpected drop
self.on_error(e)
return self._response(correlation_context, False, e.args[0])

payload = build_payload(
self.name,
data=log_data,
model_version=config.model_version(),
context=correlation_context)

success, msg = get_queue().enqueue(payload)

if not success:
# unexpected drop
self.on_error(Exception(msg))
return self._response(correlation_context, success, msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

__version__ = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""For init."""
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from .config import init_config, teardown_config, is_debug, get_config

__all__ = ["init_config", "teardown_config", "is_debug", "get_config"]
Loading

0 comments on commit f66eb9b

Please sign in to comment.