From 591645c81e9b1fd296cc485a3aa847b791ae80d1 Mon Sep 17 00:00:00 2001 From: Ryan Eakman <6326532+eakmanrq@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:08:18 -0800 Subject: [PATCH] feat: don't force db connect if using serverless (#3781) --- docs/integrations/engines/databricks.md | 10 +-- sqlmesh/__init__.py | 3 + sqlmesh/core/config/connection.py | 21 +++--- sqlmesh/core/engine_adapter/base.py | 10 ++- sqlmesh/core/engine_adapter/databricks.py | 86 ++++++++++++++--------- sqlmesh/utils/errors.py | 4 ++ 6 files changed, 85 insertions(+), 49 deletions(-) diff --git a/docs/integrations/engines/databricks.md b/docs/integrations/engines/databricks.md index c31a45261..32157bc60 100644 --- a/docs/integrations/engines/databricks.md +++ b/docs/integrations/engines/databricks.md @@ -14,9 +14,9 @@ SQLMesh connects to Databricks with the [Databricks SQL Connector](https://docs. The SQL Connector is bundled with SQLMesh and automatically installed when you include the `databricks` extra in the command `pip install "sqlmesh[databricks]"`. -The SQL Connector has all the functionality needed for SQLMesh to execute SQL models on Databricks and Python models locally (the default SQLMesh approach). +The SQL Connector has all the functionality needed for SQLMesh to execute SQL models on Databricks and Python models that do not return PySpark DataFrames. -The SQL Connector does not support Databricks Serverless Compute. If you require Serverless Compute then you must use the Databricks Connect library. +If you have Python models returning PySpark DataFrames, check out the [Databricks Connect](#databricks-connect-1) section. ### Databricks Connect @@ -229,7 +229,9 @@ If you want Databricks to process PySpark DataFrames in SQLMesh Python models, t SQLMesh **DOES NOT** include/bundle the Databricks Connect library. You must [install the version of Databricks Connect](https://docs.databricks.com/en/dev-tools/databricks-connect/python/install.html) that matches the Databricks Runtime used in your Databricks cluster. -SQLMesh's Databricks Connect implementation supports Databricks Runtime 13.0 or higher. If SQLMesh detects that you have Databricks Connect installed, then it will use it for all Python models (both Pandas and PySpark DataFrames). +If SQLMesh detects that you have Databricks Connect installed, then it will automatically configure the connection and use it for all Python models that return a Pandas or PySpark DataFrame. + +To have databricks-connect installed but ignored by SQLMesh, set `disable_databricks_connect` to `true` in the connection configuration. Databricks Connect can execute SQL and DataFrame operations on different clusters by setting the SQLMesh `databricks_connect_*` connection options. For example, these options could configure SQLMesh to run SQL on a [Databricks SQL Warehouse](https://docs.databricks.com/sql/admin/create-sql-warehouse.html) while still routing DataFrame operations to a normal Databricks Cluster. @@ -259,7 +261,7 @@ The only relevant SQLMesh configuration parameter is the optional `catalog` para | `databricks_connect_server_hostname` | Databricks Connect Only: Databricks Connect server hostname. Uses `server_hostname` if not set. | string | N | | `databricks_connect_access_token` | Databricks Connect Only: Databricks Connect access token. Uses `access_token` if not set. | string | N | | `databricks_connect_cluster_id` | Databricks Connect Only: Databricks Connect cluster ID. Uses `http_path` if not set. Cannot be a Databricks SQL Warehouse. | string | N | -| `databricks_connect_use_serverless` | Databricks Connect Only: Use a serverless cluster for Databricks Connect. If using serverless then SQL connector is disabled since Serverless is not supported for SQL Connector | bool | N | +| `databricks_connect_use_serverless` | Databricks Connect Only: Use a serverless cluster for Databricks Connect instead of `databricks_connect_cluster_id`. | bool | N | | `force_databricks_connect` | When running locally, force the use of Databricks Connect for all model operations (so don't use SQL Connector for SQL models) | bool | N | | `disable_databricks_connect` | When running locally, disable the use of Databricks Connect for all model operations (so use SQL Connector for all models) | bool | N | | `disable_spark_session` | Do not use SparkSession if it is available (like when running in a notebook). | bool | N | diff --git a/sqlmesh/__init__.py b/sqlmesh/__init__.py index 8a132fef0..4377c6d2d 100644 --- a/sqlmesh/__init__.py +++ b/sqlmesh/__init__.py @@ -141,6 +141,9 @@ def configure_logging( log_limit: int = c.DEFAULT_LOG_LIMIT, log_file_dir: t.Optional[t.Union[str, Path]] = None, ) -> None: + # Remove noisy grpc logs that are not useful for users + os.environ["GRPC_VERBOSITY"] = os.environ.get("GRPC_VERBOSITY", "NONE") + logger = logging.getLogger() debug = force_debug or debug_mode_enabled() diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 02b61b193..7ca1389b4 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -623,6 +623,12 @@ class DatabricksConnectionConfig(ConnectionConfig): @model_validator(mode="before") def _databricks_connect_validator(cls, data: t.Any) -> t.Any: + # SQLQueryContextLogger will output any error SQL queries even if they are in a try/except block. + # Disabling this allows SQLMesh to determine what should be shown to the user. + # Ex: We describe a table to see if it exists and therefore that execution can fail but we don't need to show + # the user since it is expected if the table doesn't exist. Without this change the user would see the error. + logging.getLogger("SQLQueryContextLogger").setLevel(logging.CRITICAL) + if not isinstance(data, dict): return data @@ -641,10 +647,6 @@ def _databricks_connect_validator(cls, data: t.Any) -> t.Any: data.get("auth_type"), ) - if databricks_connect_use_serverless: - data["force_databricks_connect"] = True - data["disable_databricks_connect"] = False - if (not server_hostname or not http_path or not access_token) and ( not databricks_connect_use_serverless and not auth_type ): @@ -666,11 +668,12 @@ def _databricks_connect_validator(cls, data: t.Any) -> t.Any: data["databricks_connect_access_token"] = access_token if not data.get("databricks_connect_server_hostname"): data["databricks_connect_server_hostname"] = f"https://{server_hostname}" - if not databricks_connect_use_serverless: - if not data.get("databricks_connect_cluster_id"): - if t.TYPE_CHECKING: - assert http_path is not None - data["databricks_connect_cluster_id"] = http_path.split("/")[-1] + if not databricks_connect_use_serverless and not data.get( + "databricks_connect_cluster_id" + ): + if t.TYPE_CHECKING: + assert http_path is not None + data["databricks_connect_cluster_id"] = http_path.split("/")[-1] if auth_type: from databricks.sql.auth.auth import AuthType diff --git a/sqlmesh/core/engine_adapter/base.py b/sqlmesh/core/engine_adapter/base.py index 527674008..9d3d81760 100644 --- a/sqlmesh/core/engine_adapter/base.py +++ b/sqlmesh/core/engine_adapter/base.py @@ -43,7 +43,11 @@ from sqlmesh.utils import columns_to_types_all_known, random_id from sqlmesh.utils.connection_pool import create_connection_pool from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column -from sqlmesh.utils.errors import SQLMeshError, UnsupportedCatalogOperationError +from sqlmesh.utils.errors import ( + SQLMeshError, + UnsupportedCatalogOperationError, + MissingDefaultCatalogError, +) from sqlmesh.utils.pandas import columns_to_types_from_df if t.TYPE_CHECKING: @@ -186,7 +190,9 @@ def default_catalog(self) -> t.Optional[str]: return None default_catalog = self._default_catalog or self.get_current_catalog() if not default_catalog: - raise SQLMeshError("Could not determine a default catalog despite it being supported.") + raise MissingDefaultCatalogError( + "Could not determine a default catalog despite it being supported." + ) return default_catalog @property diff --git a/sqlmesh/core/engine_adapter/databricks.py b/sqlmesh/core/engine_adapter/databricks.py index 184e1e319..02608703d 100644 --- a/sqlmesh/core/engine_adapter/databricks.py +++ b/sqlmesh/core/engine_adapter/databricks.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import os import typing as t import pandas as pd @@ -17,7 +16,7 @@ from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter from sqlmesh.core.node import IntervalUnit from sqlmesh.core.schema_diff import SchemaDiffer -from sqlmesh.utils.errors import SQLMeshError +from sqlmesh.utils.errors import SQLMeshError, MissingDefaultCatalogError if t.TYPE_CHECKING: from sqlmesh.core._typing import SchemaName, TableName @@ -92,17 +91,6 @@ def _use_spark_session(self) -> bool: ) ) - @property - def use_serverless(self) -> bool: - from sqlmesh import RuntimeEnv - from sqlmesh.utils import str_to_bool - - if not self._use_spark_session: - return False - return ( - RuntimeEnv.get().is_databricks and str_to_bool(os.environ.get("IS_SERVERLESS", "False")) - ) or bool(self._extra_config["databricks_connect_use_serverless"]) - @property def is_spark_session_cursor(self) -> bool: from sqlmesh.engines.spark.db_api.spark_session import SparkSessionCursor @@ -124,12 +112,17 @@ def spark(self) -> PySparkSession: from databricks.connect import DatabricksSession if self._spark is None: + connect_kwargs = dict( + host=self._extra_config["databricks_connect_server_hostname"], + token=self._extra_config["databricks_connect_access_token"], + ) + if "databricks_connect_use_serverless" in self._extra_config: + connect_kwargs["serverless"] = True + else: + connect_kwargs["cluster_id"] = self._extra_config["databricks_connect_cluster_id"] + self._spark = ( - DatabricksSession.builder.remote( - host=self._extra_config["databricks_connect_server_hostname"], - token=self._extra_config["databricks_connect_access_token"], - cluster_id=self._extra_config["databricks_connect_cluster_id"], - ) + DatabricksSession.builder.remote(**connect_kwargs) .userAgent("sqlmesh") .getOrCreate() ) @@ -157,14 +150,8 @@ def _df_to_source_queries( def query_factory() -> Query: temp_table = self._get_temp_table(target_table or "spark", table_only=True) - if self.use_serverless: - # Global temp views are not supported on Databricks Serverless - # This also means we can't mix Python SQL Connection and DB Connect since they wouldn't - # share the same temp objects. - df.createOrReplaceTempView(temp_table.sql(dialect=self.dialect)) # type: ignore - else: - df.createOrReplaceGlobalTempView(temp_table.sql(dialect=self.dialect)) # type: ignore - temp_table.set("db", "global_temp") + df.createOrReplaceTempView(temp_table.sql(dialect=self.dialect)) + self._connection_pool.set_attribute("requires_spark_session_temp_objects", True) return exp.select(*self._casted_columns(columns_to_types)).from_(temp_table) if self._use_spark_session: @@ -199,28 +186,50 @@ def fetchdf( return df.toPandas() return df + def _execute( + self, + sql: str, + **kwargs: t.Any, + ) -> None: + if self._connection_pool.get_attribute("requires_spark_session_temp_objects"): + self._fetch_native_df(sql) + else: + super()._execute(sql, **kwargs) + + def _end_session(self) -> None: + """End the existing session.""" + self._connection_pool.set_attribute("requires_spark_session_temp_objects", False) + def get_current_catalog(self) -> t.Optional[str]: - # Update the Dataframe API if we have a spark session + pyspark_catalog = None + sql_connector_catalog = None if self._use_spark_session: from py4j.protocol import Py4JError from pyspark.errors.exceptions.connect import SparkConnectGrpcException try: # Note: Spark 3.4+ Only API - return self.spark.catalog.currentCatalog() + pyspark_catalog = self.spark.catalog.currentCatalog() except (Py4JError, SparkConnectGrpcException): pass - result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) - if result: - return result[0] - return None + if not self.is_spark_session_cursor: + result = self.fetchone(exp.select(self.CURRENT_CATALOG_EXPRESSION)) + sql_connector_catalog = result[0] if result else None + if ( + self._use_spark_session + and not self.is_spark_session_cursor + and pyspark_catalog != sql_connector_catalog + ): + raise SQLMeshError( + f"Current catalog mismatch between Databricks SQL Connector and Databricks-Connect: `{sql_connector_catalog}` != `{pyspark_catalog}`. Set `catalog` connection property to make them the same." + ) + return pyspark_catalog or sql_connector_catalog def set_current_catalog(self, catalog_name: str) -> None: # Since Databricks splits commands across the Dataframe API and the SQL Connector # (depending if databricks-connect is installed and a Dataframe is used) we need to ensure both - # are set to the same catalog since they maintain their default catalog seperately + # are set to the same catalog since they maintain their default catalog separately self.execute(exp.Use(this=exp.to_identifier(catalog_name), kind="CATALOG")) - # Update the Dataframe API is we have a spark session if self._use_spark_session: from py4j.protocol import Py4JError from pyspark.errors.exceptions.connect import SparkConnectGrpcException @@ -257,6 +266,15 @@ def clone_table( def wap_supported(self, table_name: TableName) -> bool: return False + @property + def default_catalog(self) -> t.Optional[str]: + try: + return super().default_catalog + except MissingDefaultCatalogError as e: + raise MissingDefaultCatalogError( + "Could not determine default catalog. Define the connection property `catalog` since it can't be inferred from your connection. See SQLMesh Databricks documentation for details" + ) from e + def _build_table_properties_exp( self, catalog_name: t.Optional[str] = None, diff --git a/sqlmesh/utils/errors.py b/sqlmesh/utils/errors.py index d0a3c3840..000dbd8dc 100644 --- a/sqlmesh/utils/errors.py +++ b/sqlmesh/utils/errors.py @@ -159,6 +159,10 @@ class PythonModelEvalError(SQLMeshError): pass +class MissingDefaultCatalogError(SQLMeshError): + pass + + def raise_config_error( msg: str, location: t.Optional[str | Path] = None,