|
| 1 | +from lsprotocol.types import Range, Position |
| 2 | +import typing as t |
| 3 | + |
| 4 | +from sqlmesh.core.dialect import normalize_model_name |
| 5 | +from sqlmesh.core.model.definition import SqlModel |
| 6 | +from sqlmesh.lsp.context import LSPContext |
| 7 | +from sqlglot import exp |
| 8 | + |
| 9 | +from sqlmesh.utils.pydantic import PydanticModel |
| 10 | + |
| 11 | + |
| 12 | +class Reference(PydanticModel): |
| 13 | + range: Range |
| 14 | + uri: str |
| 15 | + |
| 16 | + |
| 17 | +def get_model_definitions_for_a_path( |
| 18 | + lint_context: LSPContext, document_uri: str |
| 19 | +) -> t.List[Reference]: |
| 20 | + """ |
| 21 | + Get the model references for a given path. |
| 22 | +
|
| 23 | + Works for models and audits. |
| 24 | + Works for targeting sql and python models. |
| 25 | +
|
| 26 | + Steps: |
| 27 | + - Get the parsed query |
| 28 | + - Find all table objects using find_all exp.Table |
| 29 | + - Match the string against all model names |
| 30 | + - Need to normalize it before matching |
| 31 | + - Try get_model before normalization |
| 32 | + - Match to models that the model refers to |
| 33 | + """ |
| 34 | + # Ensure the path is a sql model |
| 35 | + if not document_uri.endswith(".sql"): |
| 36 | + return [] |
| 37 | + |
| 38 | + # Get the model |
| 39 | + models = lint_context.map[document_uri] |
| 40 | + if not models: |
| 41 | + return [] |
| 42 | + model = lint_context.context.get_model(model_or_snapshot=models[0], raise_if_missing=False) |
| 43 | + if model is None or not isinstance(model, SqlModel): |
| 44 | + return [] |
| 45 | + |
| 46 | + # Find all possible references |
| 47 | + references = [] |
| 48 | + tables = list(model.query.find_all(exp.Table)) |
| 49 | + if len(tables) == 0: |
| 50 | + return [] |
| 51 | + |
| 52 | + read_file = open(model._path, "r").readlines() |
| 53 | + |
| 54 | + for table in tables: |
| 55 | + depends_on = model.depends_on |
| 56 | + |
| 57 | + # Normalize the table reference |
| 58 | + reference_name = table.sql(dialect=model.dialect) |
| 59 | + normalized_reference_name = normalize_model_name( |
| 60 | + reference_name, |
| 61 | + default_catalog=lint_context.context.default_catalog, |
| 62 | + dialect=model.dialect, |
| 63 | + ) |
| 64 | + if normalized_reference_name not in depends_on: |
| 65 | + continue |
| 66 | + |
| 67 | + # Get the referenced model uri |
| 68 | + referenced_model = lint_context.context.get_model( |
| 69 | + model_or_snapshot=normalized_reference_name, raise_if_missing=False |
| 70 | + ) |
| 71 | + if referenced_model is None: |
| 72 | + continue |
| 73 | + referenced_model_path = referenced_model._path |
| 74 | + # Check whether the path exists |
| 75 | + if not referenced_model_path.is_file(): |
| 76 | + continue |
| 77 | + referenced_model_uri = f"file://{referenced_model_path}" |
| 78 | + |
| 79 | + # Extract metadata for positioning |
| 80 | + table_meta = TokenPositionDetails.from_meta(table.this.meta) |
| 81 | + table_range = _range_from_token_position_details(table_meta, read_file) |
| 82 | + start_pos = table_range.start |
| 83 | + end_pos = table_range.end |
| 84 | + |
| 85 | + # If there's a catalog or database qualifier, adjust the start position |
| 86 | + catalog_or_db = table.args.get("catalog") or table.args.get("db") |
| 87 | + if catalog_or_db is not None: |
| 88 | + catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) |
| 89 | + catalog_or_db_range = _range_from_token_position_details(catalog_or_db_meta, read_file) |
| 90 | + start_pos = catalog_or_db_range.start |
| 91 | + |
| 92 | + references.append( |
| 93 | + Reference(uri=referenced_model_uri, range=Range(start=start_pos, end=end_pos)) |
| 94 | + ) |
| 95 | + |
| 96 | + return references |
| 97 | + |
| 98 | + |
| 99 | +class TokenPositionDetails(PydanticModel): |
| 100 | + """ |
| 101 | + Details about a token's position in the source code. |
| 102 | +
|
| 103 | + Attributes: |
| 104 | + line (int): The line that the token ends on. |
| 105 | + col (int): The column that the token ends on. |
| 106 | + start (int): The start index of the token. |
| 107 | + end (int): The ending index of the token. |
| 108 | + """ |
| 109 | + |
| 110 | + line: int |
| 111 | + col: int |
| 112 | + start: int |
| 113 | + end: int |
| 114 | + |
| 115 | + @staticmethod |
| 116 | + def from_meta(meta: t.Dict[str, int]) -> "TokenPositionDetails": |
| 117 | + return TokenPositionDetails( |
| 118 | + line=meta["line"], |
| 119 | + col=meta["col"], |
| 120 | + start=meta["start"], |
| 121 | + end=meta["end"], |
| 122 | + ) |
| 123 | + |
| 124 | + |
| 125 | +def _range_from_token_position_details( |
| 126 | + token_position_details: TokenPositionDetails, read_file: t.List[str] |
| 127 | +) -> Range: |
| 128 | + """ |
| 129 | + Convert a TokenPositionDetails object to a Range object. |
| 130 | +
|
| 131 | + :param token_position_details: Details about a token's position |
| 132 | + :param read_file: List of lines from the file |
| 133 | + :return: A Range object representing the token's position |
| 134 | + """ |
| 135 | + # Convert from 1-indexed to 0-indexed for line only |
| 136 | + end_line_0 = token_position_details.line - 1 |
| 137 | + end_col_0 = token_position_details.col |
| 138 | + |
| 139 | + # Find the start line and column by counting backwards from the end position |
| 140 | + start_pos = token_position_details.start |
| 141 | + end_pos = token_position_details.end |
| 142 | + |
| 143 | + # Initialize with the end position |
| 144 | + start_line_0 = end_line_0 |
| 145 | + start_col_0 = end_col_0 - (end_pos - start_pos + 1) |
| 146 | + |
| 147 | + # If start_col_0 is negative, we need to go back to previous lines |
| 148 | + while start_col_0 < 0 and start_line_0 > 0: |
| 149 | + start_line_0 -= 1 |
| 150 | + start_col_0 += len(read_file[start_line_0]) |
| 151 | + # Account for newline character |
| 152 | + if start_col_0 >= 0: |
| 153 | + break |
| 154 | + start_col_0 += 1 # For the newline character |
| 155 | + |
| 156 | + # Ensure we don't have negative values |
| 157 | + start_col_0 = max(0, start_col_0) |
| 158 | + return Range( |
| 159 | + start=Position(line=start_line_0, character=start_col_0), |
| 160 | + end=Position(line=end_line_0, character=end_col_0), |
| 161 | + ) |
0 commit comments