Skip to content

Commit

Permalink
WIP Add basic dbt objects and rule definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jochemvandooren committed Mar 8, 2024
1 parent cba523e commit c243c5b
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 0 deletions.
125 changes: 125 additions & 0 deletions src/dbt_score/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from dataclasses import dataclass, field
from typing import Any, List


@dataclass
class Constraint:
"""Constraint for a column in a model."""

type: str
expression: str
name: str


@dataclass
class Test:
"""Test for a column or model."""

name: str
type: str
tags: list[str] = field(default_factory=list)


@dataclass
class Column:
"""Represents a column in a model."""

name: str
description: str
constraints: List[Constraint]
tests: List[Test] = field(default_factory=list)


@dataclass
class Model:
"""Represents a dbt model."""

id: str
name: str
description: str
file_path: str
config: dict[str, Any]
meta: dict[str, Any]
columns: dict[str, Column]
tests: list[Test] = field(default_factory=list)

@classmethod
def from_node(cls, node_values: dict[str, Any]) -> "Model":
"""Create a model object from a node in the manifest."""
columns = {
name: Column(
name=values.get("name"),
description=values.get("description"),
constraints=[
Constraint(
name=constraint.get("name"),
type=constraint.get("type"),
expression=constraint.get("expression"),
)
for constraint in values.get("constraints", [])
],
)
for name, values in node_values.get("columns", {}).items()
}

model = cls(
id=node_values["unique_id"],
file_path=node_values["patch_path"],
config=node_values.get("config", {}),
name=node_values["name"],
description=node_values.get("description", ""),
meta=node_values.get("meta", {}),
columns=columns,
)

return model


class ManifestLoader:
"""Load the models and tests from the manifest."""

def __init__(self, raw_manifest: dict[str, Any]):
self.raw_manifest = raw_manifest
self.raw_nodes = raw_manifest.get("nodes", {})
self.models: dict[str, Model] = {}
self.tests: dict[str, Test] = {}

# Load models first so the tests can be attached to them later.
self.load_models()
self.load_tests()

def load_models(self) -> None:
"""Load the models from the manifest."""
for node_values in self.raw_nodes.values():
if node_values.get("resource_type") == "model":
model = Model.from_node(node_values)
self.models[model.id] = model

def load_tests(self) -> None:
"""Load the tests from the manifest and attach them to the right object."""
for node_values in self.raw_nodes.values():
# Only include tests that are attached to a model.
if node_values.get("resource_type") == "test" and node_values.get(
"attached_node"
):
model = self.models.get(node_values.get("attached_node"))

if not model:
raise ValueError(
f"Model {node_values.get('attached_node')}"
f"not found, while tests are attached to it."
)

test = Test(
name=node_values.get("name"),
type=node_values.get("test_metadata").get("name"),
tags=node_values.get("tags"),
)
column_name = (
node_values.get("test_metadata").get("kwargs").get("column_name")
)

if column_name: # Test is a column-level test.
model.columns[column_name].tests.append(test)
else:
model.tests.append(test)
47 changes: 47 additions & 0 deletions src/dbt_score/rule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import functools
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable

from dbt_score.manifest import Model

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class Severity(Enum):
"""The severity/weight of a rule."""

LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4


@dataclass
class RuleViolation:
"""The violation of a rule."""

message: str | None = None


def rule(
description: str,
hint: str,
severity: Severity = Severity.MEDIUM,
) -> Callable[[Callable[[Model], RuleViolation | None]], Callable[..., None]]:
"""Rule decorator."""

def decorator_rule(
func: Callable[[Model], RuleViolation | None],
) -> Callable[..., None]:
@functools.wraps(func)
def wrapper_rule(*args: Any, **kwargs: Any) -> Any:
logger.debug("Executing `%s` with severity: %s.", func.__name__, severity)
return func(*args, **kwargs)

return wrapper_rule

return decorator_rule
Empty file added src/dbt_score/rules/__init__.py
Empty file.
93 changes: 93 additions & 0 deletions src/dbt_score/rules/example_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""All general rules."""

from ..manifest import Model
from ..rule import RuleViolation, Severity, rule


@rule(
description="A model should have an owner defined.",
hint="Define the owner of the model in the meta section.",
severity=Severity.HIGH,
)
def has_owner(model: Model) -> RuleViolation | None:
"""A model should have an owner defined."""
if "owner" not in model.meta:
return RuleViolation()

return None


@rule(description="A model should have a primary key defined.", hint="Some hint.")
def has_primary_key(model: Model) -> RuleViolation | None:
"""A model should have a primary key defined, unless it's a view."""
if not model.config.get("materialized") == "picnic_view":
has_pk = False
for column in model.columns.values():
if "primary_key" in [constraint.type for constraint in column.constraints]:
has_pk = True
break

if not has_pk:
return RuleViolation()

return None


@rule(
description="Primary key columns should have a uniqueness test defined.",
hint="Some hint.",
)
def primary_key_has_uniqueness_test(model: Model) -> RuleViolation | None:
"""Primary key columns should have a uniqueness test defined."""
columns_with_pk = []
if not model.config.get("materialized") == "picnic_view":
for column_name, column in model.columns.items():
if "primary_key" in [constraint.type for constraint in column.constraints]:
columns_with_pk.append(column_name)

tests = (
model.columns[columns_with_pk[0]].tests
if len(columns_with_pk) == 1
else model.tests
)

if columns_with_pk and "unique" not in [test.type for test in tests]:
return RuleViolation()

return None


@rule(
description="All columns of a model should have a description.", hint="Some hint."
)
def columns_have_description(model: Model) -> RuleViolation | None:
"""All columns of a model should have a description."""
invalid_columns = [
column_name
for column_name, column in model.columns.items()
if not column.description
]
if invalid_columns:
return RuleViolation(
message=f"The following columns lack a description: "
f"{', '.join(invalid_columns)}."
)

return None


@rule(description="A model should have at least one test defined.", hint="Some hint.")
def has_test(model: Model) -> RuleViolation | None:
"""A model should have at least one model-level and one column-level test.
This does not include singular tests, which are tests defined in a separate .sql
file and not linked to the model in the metadata.
"""
column_tests = []
for column in model.columns.values():
column_tests.extend(column.tests)

if len(model.tests) == 0 or len(column_tests) == 0:
return RuleViolation()

return None
20 changes: 20 additions & 0 deletions src/dbt_score/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Utility functions."""

import json
from pathlib import Path
from typing import Any


class JsonOpenError(RuntimeError):
"""Raised when there is an error opening a JSON file."""

pass


def get_json(json_filename: str) -> Any:
"""Get JSON from a file."""
try:
file_content = Path(json_filename).read_text(encoding="utf-8")
return json.loads(file_content)
except Exception as e:
raise JsonOpenError(f"Error opening {json_filename}.") from e

0 comments on commit c243c5b

Please sign in to comment.