Skip to content

Commit

Permalink
Implement rule evaluation and scoring (#11)
Browse files Browse the repository at this point in the history
Stitch together the manifest (collection of models) and the rule
registry (collection of rules) to evaluate every model with every rule
defined.

For every model, its score is computed. The aggregation of these scores
is also computed in the project score.\

Add a human-readable formatter to display those results upon CLI
invocation.
  • Loading branch information
matthieucan authored Apr 19, 2024
1 parent 133ec81 commit 1ced0af
Show file tree
Hide file tree
Showing 22 changed files with 683 additions and 18 deletions.
3 changes: 3 additions & 0 deletions docs/reference/evaluation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Evaluation

::: dbt_score.evaluation
3 changes: 3 additions & 0 deletions docs/reference/formatters/human_readable_formatter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Human-readable formatter

::: dbt_score.formatters.human_readable_formatter
3 changes: 3 additions & 0 deletions docs/reference/formatters/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Formatters

::: dbt_score.formatters
3 changes: 3 additions & 0 deletions docs/reference/scoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# CLI

::: dbt_score.scoring
7 changes: 6 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ nav:
- Reference:
- reference/cli.md
- reference/exceptions.md
- reference/evaluation.md
- reference/models.md
- reference/rule.md
- reference/rule_registry.md
- reference/scoring.md
- Formatters:
- reference/formatters/index.md
- reference/formatters/human_readable_formatter.md
- Rules:
- reference/rules/generic.md
- reference/rule_registry.md
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ module = "tests.*"
disallow_untyped_defs = false
disallow_incomplete_defs = false
disallow_untyped_calls = false
warn_no_return = false

### Ruff ###

Expand Down
77 changes: 77 additions & 0 deletions src/dbt_score/evaluation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""This module is responsible for evaluating rules."""

from __future__ import annotations

from typing import Type

from dbt_score.formatters import Formatter
from dbt_score.models import ManifestLoader, Model
from dbt_score.rule import Rule, RuleViolation
from dbt_score.rule_registry import RuleRegistry
from dbt_score.scoring import Scorer

# The results of a given model are stored in a dictionary, mapping rules to either:
# - None if there was no issue
# - A RuleViolation if a linting error was found
# - An Exception if the rule failed to run
ModelResultsType = dict[Type[Rule], None | RuleViolation | Exception]


class Evaluation:
"""Evaluate a set of rules on a set of nodes."""

def __init__(
self,
rule_registry: RuleRegistry,
manifest_loader: ManifestLoader,
formatter: Formatter,
scorer: Scorer,
) -> None:
"""Create an Evaluation object.
Args:
rule_registry: A rule registry to access rules.
manifest_loader: A manifest loader to access model metadata.
formatter: A formatter to display results.
scorer: A scorer to compute scores.
"""
self._rule_registry = rule_registry
self._manifest_loader = manifest_loader
self._formatter = formatter
self._scorer = scorer

# For each model, its results
self.results: dict[Model, ModelResultsType] = {}

# For each model, its computed score
self.scores: dict[Model, float] = {}

# The aggregated project score
self.project_score: float

def evaluate(self) -> None:
"""Evaluate all rules."""
# Instantiate all rules. In case they keep state across calls, this must be
# done only once.
rules = [rule_class() for rule_class in self._rule_registry.rules.values()]

for model in self._manifest_loader.models:
self.results[model] = {}
for rule in rules:
try:
result: RuleViolation | None = rule.evaluate(model)
except Exception as e:
self.results[model][rule.__class__] = e
else:
self.results[model][rule.__class__] = result

self.scores[model] = self._scorer.score_model(self.results[model])
self._formatter.model_evaluated(
model, self.results[model], self.scores[model]
)

# Compute score for project
self.project_score = self._scorer.score_aggregate_models(
list(self.scores.values())
)
self._formatter.project_evaluated(self.project_score)
26 changes: 26 additions & 0 deletions src/dbt_score/formatters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Formatters are used to output CLI results."""

from __future__ import annotations

import typing
from abc import ABC, abstractmethod

if typing.TYPE_CHECKING:
from dbt_score.evaluation import ModelResultsType
from dbt_score.models import Model


class Formatter(ABC):
"""Abstract class to define a formatter."""

@abstractmethod
def model_evaluated(
self, model: Model, results: ModelResultsType, score: float
) -> None:
"""Callback when a model has been evaluated."""
raise NotImplementedError

@abstractmethod
def project_evaluated(self, score: float) -> None:
"""Callback when a project has been evaluated."""
raise NotImplementedError
43 changes: 43 additions & 0 deletions src/dbt_score/formatters/human_readable_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Human readable formatter."""


from dbt_score.evaluation import ModelResultsType
from dbt_score.formatters import Formatter
from dbt_score.models import Model
from dbt_score.rule import RuleViolation


class HumanReadableFormatter(Formatter):
"""Formatter for human-readable messages in the terminal."""

indent = " "
label_ok = "\033[1;32mOK \033[0m"
label_warning = "\033[1;33mWARN\033[0m"
label_error = "\033[1;31mERR \033[0m"

@staticmethod
def bold(text: str) -> str:
"""Return text in bold."""
return f"\033[1m{text}\033[0m"

def model_evaluated(
self, model: Model, results: ModelResultsType, score: float
) -> None:
"""Callback when a model has been evaluated."""
print(f"Model {self.bold(model.name)}")
for rule, result in results.items():
if result is None:
print(f"{self.indent}{self.label_ok} {rule.source()}")
elif isinstance(result, RuleViolation):
print(
f"{self.indent}{self.label_warning} "
f"({rule.severity.name.lower()}) {rule.source()}: {result.message}"
)
else:
print(f"{self.indent}{self.label_error} {rule.source()}: {result!s}")
print(f"Score: {self.bold(str(round(score * 10, 1)))}")
print()

def project_evaluated(self, score: float) -> None:
"""Callback when a project has been evaluated."""
print(f"Project score: {self.bold(str(round(score * 10, 1)))}")
23 changes: 23 additions & 0 deletions src/dbt_score/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,31 @@

from pathlib import Path

from dbt_score.evaluation import Evaluation
from dbt_score.formatters.human_readable_formatter import HumanReadableFormatter
from dbt_score.models import ManifestLoader
from dbt_score.rule_registry import RuleRegistry
from dbt_score.scoring import Scorer


def lint_dbt_project(manifest_path: Path) -> None:
"""Lint dbt manifest."""
if not manifest_path.exists():
raise FileNotFoundError(f"Manifest not found at {manifest_path}.")

rule_registry = RuleRegistry()
rule_registry.load_all()

manifest_loader = ManifestLoader(manifest_path)

formatter = HumanReadableFormatter()

scorer = Scorer()

evaluation = Evaluation(
rule_registry=rule_registry,
manifest_loader=manifest_loader,
formatter=formatter,
scorer=scorer,
)
evaluation.evaluate()
6 changes: 5 additions & 1 deletion src/dbt_score/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ def from_node(
_raw_test_values=test_values,
)

def __hash__(self) -> int:
"""Compute a unique hash for a model."""
return hash(self.unique_id)


class ManifestLoader:
"""Load the models and tests from the manifest."""
Expand Down Expand Up @@ -241,7 +245,7 @@ def _load_models(self) -> None:
def _reindex_tests(self) -> None:
"""Index tests based on their model id."""
for node_values in self.raw_nodes.values():
# Only include tests that are attached to a model.
# Only include tests that are attached to a model
if node_values.get("resource_type") == "test" and (
attached_node := node_values.get("attached_node")
):
Expand Down
18 changes: 15 additions & 3 deletions src/dbt_score/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,17 @@ def evaluate(self, model: Model) -> RuleViolation | None:
"""Evaluates the rule."""
raise NotImplementedError("Subclass must implement method `evaluate`.")

@classmethod
def source(cls) -> str:
"""Return the source of the rule, i.e. a fully qualified name."""
return f"{cls.__module__}.{cls.__name__}"

# Use @overload to have proper typing for both @rule and @rule(...).
def __hash__(self) -> int:
"""Compute a unique hash for a rule."""
return hash(self.source())


# Use @overload to have proper typing for both @rule and @rule(...)
# https://mypy.readthedocs.io/en/stable/generics.html#decorator-factories


Expand Down Expand Up @@ -88,7 +97,7 @@ def decorator_rule(
if func.__doc__ is None and description is None:
raise AttributeError("Rule must define `description` or `func.__doc__`.")

# Get description parameter, otherwise use the docstring.
# Get description parameter, otherwise use the docstring
rule_description = description or (
func.__doc__.split("\n")[0] if func.__doc__ else None
)
Expand All @@ -97,14 +106,17 @@ def wrapped_func(self: Rule, *args: Any, **kwargs: Any) -> RuleViolation | None:
"""Wrap func to add `self`."""
return func(*args, **kwargs)

# Create the rule class inheriting from Rule.
# Create the rule class inheriting from Rule
rule_class = type(
func.__name__,
(Rule,),
{
"description": rule_description,
"severity": severity,
"evaluate": wrapped_func,
# Forward origin of the decorated function
"__qualname__": func.__qualname__, # https://peps.python.org/pep-3155/
"__module__": func.__module__,
},
)

Expand Down
9 changes: 6 additions & 3 deletions src/dbt_score/rule_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def _load(self, namespace_name: str) -> None:
for obj_name in dir(module):
obj = module.__dict__[obj_name]
if type(obj) is type and issubclass(obj, Rule):
if obj_name in self.rules:
raise DuplicatedRuleException(obj_name)
self._rules[obj_name] = obj
self._add_rule(obj_name, obj)

def _add_rule(self, name: str, rule: Type[Rule]) -> None:
if name in self.rules:
raise DuplicatedRuleException(name)
self._rules[name] = rule

def load_all(self) -> None:
"""Load all rules, core and third-party."""
Expand Down
55 changes: 55 additions & 0 deletions src/dbt_score/scoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Module computing scores."""

from __future__ import annotations

import typing

if typing.TYPE_CHECKING:
from dbt_score.evaluation import ModelResultsType
from dbt_score.rule import RuleViolation, Severity


class Scorer:
"""Logic for computing scores."""

# This magic number comes from rule severity.
# Assuming a rule violation:
# - A low severity yields a score 2/3
# - A medium severity yields a score 1/3
# - A high severity yields a score 0/3
score_cardinality = 3

min_score = 0.0
max_score = 1.0

def score_model(self, model_results: ModelResultsType) -> float:
"""Compute the score of a given model."""
if len(model_results) == 0:
# No rule? No problem
return self.max_score
if any(
rule.severity == Severity.CRITICAL and isinstance(result, RuleViolation)
for rule, result in model_results.items()
):
# If there's a CRITICAL violation, the score is 0
return self.min_score
else:
# Otherwise, the score is the weighted average (by severity) of the results
return sum(
[
# The more severe the violation, the more points are lost
self.score_cardinality - rule.severity.value
if isinstance(result, RuleViolation) # Either 0/3, 1/3 or 2/3
else self.score_cardinality # 3/3
for rule, result in model_results.items()
]
) / (self.score_cardinality * len(model_results))

def score_aggregate_models(self, scores: list[float]) -> float:
"""Compute the score of a list of models."""
if 0.0 in scores:
# Any model with a CRITICAL violation makes the project score 0
return self.min_score
if len(scores) == 0:
return self.max_score
return sum(scores) / len(scores)
Loading

0 comments on commit 1ced0af

Please sign in to comment.