-
Notifications
You must be signed in to change notification settings - Fork 15
Add support for linting and scoring dbt seeds #110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
ac94d79
de07dec
eb311d8
7c4b924
eaf268a
ede7146
b5c10f0
b8b2b14
6834470
60415f0
654a9e6
67849ea
0446991
4219d0d
4616518
8c0bfe3
49947dd
79f130b
8eb8ba4
5f2fd94
7bbe99f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,10 @@ | |
| from collections import defaultdict | ||
| from dataclasses import dataclass, field | ||
| from pathlib import Path | ||
| from typing import Any, Iterable, Literal, TypeAlias, Union | ||
| from typing import TYPE_CHECKING, Any, Iterable, List, Literal, TypeAlias, Union | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Union | ||
|
|
||
| from dbt_score.dbt_utils import dbt_ls | ||
|
|
||
|
|
@@ -154,6 +157,10 @@ def _get_columns( | |
| ] | ||
|
|
||
|
|
||
| # Type annotation for parent references | ||
| ParentType = Union["Model", "Source", "Snapshot", "Seed"] | ||
|
|
||
|
|
||
| @dataclass | ||
| class Model(HasColumnsMixin): | ||
| """Represents a dbt model. | ||
|
|
@@ -205,7 +212,7 @@ class Model(HasColumnsMixin): | |
| tests: list[Test] = field(default_factory=list) | ||
| depends_on: dict[str, list[str]] = field(default_factory=dict) | ||
| constraints: list[Constraint] = field(default_factory=list) | ||
| parents: list[Union["Model", "Source", "Snapshot"]] = field(default_factory=list) | ||
| parents: List[ParentType] = field(default_factory=list) | ||
| _raw_values: dict[str, Any] = field(default_factory=dict) | ||
| _raw_test_values: list[dict[str, Any]] = field(default_factory=list) | ||
|
|
||
|
|
@@ -245,6 +252,7 @@ def from_node( | |
| Constraint.from_raw_values(constraint) | ||
| for constraint in node_values["constraints"] | ||
| ], | ||
| parents=[], # Will be populated later | ||
| _raw_values=node_values, | ||
| _raw_test_values=test_values, | ||
| ) | ||
|
|
@@ -443,7 +451,7 @@ class Snapshot(HasColumnsMixin): | |
| depends_on: dict[str, list[str]] = field(default_factory=dict) | ||
| strategy: str | None = None | ||
| unique_key: list[str] | None = None | ||
| parents: list[Union["Model", "Source", "Snapshot"]] = field(default_factory=list) | ||
| parents: List[ParentType] = field(default_factory=list) | ||
| _raw_values: dict[str, Any] = field(default_factory=dict) | ||
| _raw_test_values: list[dict[str, Any]] = field(default_factory=list) | ||
|
|
||
|
|
@@ -477,6 +485,7 @@ def from_node( | |
| .get("column_name") | ||
| ], | ||
| depends_on=node_values["depends_on"], | ||
| parents=[], # Will be populated later | ||
| _raw_values=node_values, | ||
| _raw_test_values=test_values, | ||
| ) | ||
|
|
@@ -486,11 +495,89 @@ def __hash__(self) -> int: | |
| return hash(self.unique_id) | ||
|
|
||
|
|
||
| Evaluable: TypeAlias = Model | Source | Snapshot | ||
| @dataclass | ||
| class Seed(HasColumnsMixin): | ||
| """Represents a dbt seed. | ||
|
|
||
| Attributes: | ||
| unique_id: The id of the seed, e.g. `seed.package.seed_name`. | ||
| name: The name of the seed. | ||
| relation_name: The relation name of the seed, e.g. `db.schema.seed_name`. | ||
| description: The full description of the seed. | ||
| original_file_path: The seed path, e.g. `data/seed_name.csv`. | ||
| config: The config of the seed. | ||
| meta: The meta of the seed. | ||
| columns: The list of columns of the seed. | ||
| package_name: The package name of the seed. | ||
| database: The database name of the seed. | ||
| schema: The schema name of the seed. | ||
| alias: The alias of the seed. | ||
| patch_path: The yml path of the seed, e.g. `seeds.yml`. | ||
| tags: The list of tags attached to the seed. | ||
| tests: The list of tests attached to the seed. | ||
| _raw_values: The raw values of the seed (node) in the manifest. | ||
| _raw_test_values: The raw test values of the seed (node) in the manifest. | ||
| """ | ||
|
|
||
| unique_id: str | ||
| name: str | ||
| relation_name: str | ||
| description: str | ||
| original_file_path: str | ||
| config: dict[str, Any] | ||
| meta: dict[str, Any] | ||
| columns: list[Column] | ||
| package_name: str | ||
| database: str | ||
| schema: str | ||
| alias: str | None = None | ||
| patch_path: str | None = None | ||
| tags: list[str] = field(default_factory=list) | ||
| tests: list[Test] = field(default_factory=list) | ||
| _raw_values: dict[str, Any] = field(default_factory=dict) | ||
| _raw_test_values: list[dict[str, Any]] = field(default_factory=list) | ||
|
|
||
| @classmethod | ||
| def from_node( | ||
| cls, node_values: dict[str, Any], test_values: list[dict[str, Any]] | ||
| ) -> "Seed": | ||
| """Create a seed object from a node and its tests in the manifest.""" | ||
| return cls( | ||
| unique_id=node_values["unique_id"], | ||
| name=node_values["name"], | ||
| relation_name=node_values["relation_name"], | ||
| description=node_values["description"], | ||
| original_file_path=node_values["original_file_path"], | ||
| config=node_values["config"], | ||
| meta=node_values["meta"], | ||
| columns=cls._get_columns(node_values, test_values), | ||
| package_name=node_values["package_name"], | ||
| database=node_values["database"], | ||
| schema=node_values["schema"], | ||
| alias=node_values["alias"], | ||
| patch_path=node_values["patch_path"], | ||
| tags=node_values["tags"], | ||
| tests=[ | ||
| Test.from_node(test) | ||
| for test in test_values | ||
| if not test.get("test_metadata", {}) | ||
| .get("kwargs", {}) | ||
| .get("column_name") | ||
| ], | ||
| _raw_values=node_values, | ||
| _raw_test_values=test_values, | ||
| ) | ||
|
|
||
| def __hash__(self) -> int: | ||
| """Compute a unique hash for a seed.""" | ||
| return hash(self.unique_id) | ||
|
|
||
|
|
||
| Evaluable: TypeAlias = Model | Source | Snapshot | Seed | ||
|
|
||
|
|
||
| class ManifestLoader: | ||
| """Load the models, sources, snapshots and tests from the manifest.""" | ||
| """Load the models, sources, snapshots, seeds and tests from the manifest.""" | ||
|
|
||
| def __init__(self, file_path: Path, select: Iterable[str] | None = None): | ||
| """Initialize the ManifestLoader. | ||
|
|
@@ -516,17 +603,21 @@ def __init__(self, file_path: Path, select: Iterable[str] | None = None): | |
| self.tests: dict[str, list[dict[str, Any]]] = defaultdict(list) | ||
| self.sources: dict[str, Source] = {} | ||
| self.snapshots: dict[str, Snapshot] = {} | ||
| self.seeds: dict[str, Seed] = {} | ||
|
|
||
| self._reindex_tests() | ||
| self._load_models() | ||
| self._load_sources() | ||
| self._load_snapshots() | ||
| self._load_seeds() | ||
| self._populate_parents() | ||
|
|
||
| if select: | ||
| self._filter_evaluables(select) | ||
|
|
||
| if (len(self.models) + len(self.sources) + len(self.snapshots)) == 0: | ||
| if ( | ||
| len(self.models) + len(self.sources) + len(self.snapshots) + len(self.seeds) | ||
| ) == 0: | ||
| logger.warning("Nothing to evaluate!") | ||
|
|
||
| def _load_models(self) -> None: | ||
|
|
@@ -550,6 +641,44 @@ def _load_snapshots(self) -> None: | |
| snapshot = Snapshot.from_node(node_values, self.tests.get(node_id, [])) | ||
| self.snapshots[node_id] = snapshot | ||
|
|
||
| def _load_seeds(self) -> None: | ||
| """Load the seeds from the manifest.""" | ||
| for node_id, node_values in self.raw_nodes.items(): | ||
| if node_values.get("resource_type") == "seed": | ||
| seed = Seed.from_node(node_values, self.tests.get(node_id, [])) | ||
| self.seeds[node_id] = seed | ||
|
|
||
| def _add_parent_if_exists( | ||
| self, node: Union[Model, Snapshot], parent_id: str | ||
| ) -> None: | ||
| """Add a parent reference to the node if the parent exists. | ||
|
|
||
| Args: | ||
| node: The model or snapshot to add parent to | ||
| parent_id: The ID of the potential parent node | ||
| """ | ||
| # Check each potential parent collection | ||
| if parent_id in self.models: | ||
| node.parents.append(self.models[parent_id]) | ||
| elif parent_id in self.sources: | ||
| node.parents.append(self.sources[parent_id]) | ||
| elif parent_id in self.snapshots: | ||
| node.parents.append(self.snapshots[parent_id]) | ||
| elif parent_id in self.seeds: | ||
| node.parents.append(self.seeds[parent_id]) | ||
|
|
||
| def _populate_parents(self) -> None: | ||
|
||
| """Populate models and snapshots with references to their parent objects.""" | ||
| # For models: add parent object references | ||
| for _, model in self.models.items(): | ||
| for node_id in model.depends_on.get("nodes", []): | ||
| self._add_parent_if_exists(model, node_id) | ||
|
|
||
| # For snapshots: add parent object references | ||
| for _, snapshot in self.snapshots.items(): | ||
| for node_id in snapshot.depends_on.get("nodes", []): | ||
| self._add_parent_if_exists(snapshot, node_id) | ||
|
|
||
| def _reindex_tests(self) -> None: | ||
| """Index tests based on their associated evaluable.""" | ||
| for node_values in self.raw_nodes.values(): | ||
|
|
@@ -566,17 +695,6 @@ def _reindex_tests(self) -> None: | |
| ): | ||
| self.tests[node_unique_id].append(node_values) | ||
|
|
||
| def _populate_parents(self) -> None: | ||
| """Populate `parents` for all models and snapshots.""" | ||
| for node in list(self.models.values()) + list(self.snapshots.values()): | ||
| for parent_id in node.depends_on.get("nodes", []): | ||
| if parent_id in self.models: | ||
| node.parents.append(self.models[parent_id]) | ||
| elif parent_id in self.snapshots: | ||
| node.parents.append(self.snapshots[parent_id]) | ||
| elif parent_id in self.sources: | ||
| node.parents.append(self.sources[parent_id]) | ||
|
|
||
| def _filter_evaluables(self, select: Iterable[str]) -> None: | ||
| """Filter evaluables like dbt's --select.""" | ||
| single_model_select = re.compile(r"[a-zA-Z0-9_]+") | ||
|
|
@@ -594,3 +712,56 @@ def _filter_evaluables(self, select: Iterable[str]) -> None: | |
| k: s for k, s in self.sources.items() if s.selector_name in selected | ||
| } | ||
| self.snapshots = {k: s for k, s in self.snapshots.items() if s.name in selected} | ||
| self.seeds = {k: s for k, s in self.seeds.items() if s.name in selected} | ||
|
|
||
| # Helper methods to find entities by name | ||
| def get_model_by_name(self, name: str) -> Model | None: | ||
| """Get a model by name.""" | ||
| for model in self.models.values(): | ||
| if model.name == name: | ||
| return model | ||
| return None | ||
|
|
||
| def get_source_by_name(self, name: str) -> Source | None: | ||
| """Get a source by name.""" | ||
| for source in self.sources.values(): | ||
| if source.name == name: | ||
| return source | ||
| return None | ||
|
|
||
| def get_source_by_selector_name(self, selector_name: str) -> Source | None: | ||
| """Get a source by selector name (source_name.table_name).""" | ||
| for source in self.sources.values(): | ||
| if source.selector_name == selector_name: | ||
| return source | ||
| return None | ||
|
|
||
| def get_snapshot_by_name(self, name: str) -> Snapshot | None: | ||
| """Get a snapshot by name.""" | ||
| for snapshot in self.snapshots.values(): | ||
| if snapshot.name == name: | ||
| return snapshot | ||
| return None | ||
|
|
||
| def get_seed_by_name(self, name: str) -> Seed | None: | ||
| """Get a seed by name.""" | ||
| for seed in self.seeds.values(): | ||
| if seed.name == name: | ||
| return seed | ||
| return None | ||
|
|
||
| def get_first_model(self) -> Model | None: | ||
| """Get the first model in the collection, if any.""" | ||
| return next(iter(self.models.values())) if self.models else None | ||
|
|
||
| def get_first_source(self) -> Source | None: | ||
| """Get the first source in the collection, if any.""" | ||
| return next(iter(self.sources.values())) if self.sources else None | ||
|
|
||
| def get_first_snapshot(self) -> Snapshot | None: | ||
| """Get the first snapshot in the collection, if any.""" | ||
| return next(iter(self.snapshots.values())) if self.snapshots else None | ||
|
|
||
| def get_first_seed(self) -> Seed | None: | ||
| """Get the first seed in the collection, if any.""" | ||
| return next(iter(self.seeds.values())) if self.seeds else None | ||
|
||
Uh oh!
There was an error while loading. Please reload this page.