diff --git a/.gitignore b/.gitignore index 583c17ae..6cb67205 100644 --- a/.gitignore +++ b/.gitignore @@ -97,3 +97,5 @@ dbt-tut dev/ .python-version *_project/ +.user.yml +chdb_state diff --git a/CHANGELOG.md b/CHANGELOG.md index fe3d4536..84440aa5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### Release [1.8.5], 2024-11-22 +### Improvement +* Added an adapter for chDB in dbt-clickhouse [#369](https://github.com/ClickHouse/dbt-clickhouse/pull/369) and updated documentation for the new feature. + ### Release [1.8.4], 2024-09-17 ### Improvement * The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to diff --git a/README.md b/README.md index 99da7ff6..479cac70 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ your_profile_name: schema: [default] # ClickHouse database for dbt models # optional - driver: [http] # http or native. If not set this will be autodetermined based on port setting + driver: [http] # http, native or chdb. If not set this will be autodetermined based on port setting host: [localhost] port: [8123] # If not set, defaults to 8123, 8443, 9000, 9440 depending on the secure and driver settings user: [default] # User for all database operations @@ -92,6 +92,10 @@ your_profile_name: # Native (clickhouse-driver) connection settings sync_request_timeout: [5] # Timeout for server ping compress_block_size: [1048576] # Compression block size if compression is enabled + + # chDB (clikchouse-driver) path settings + chdb_state_dir: [] # The path where chdb will leave it's states + chdb_dump_dir: [] # The path where dbt-clickhouse will find the dumps to initialise chdb ``` diff --git a/dbt/adapters/clickhouse/chdbclient.py b/dbt/adapters/clickhouse/chdbclient.py new file mode 100644 index 00000000..f42054e4 --- /dev/null +++ b/dbt/adapters/clickhouse/chdbclient.py @@ -0,0 +1,181 @@ +import json +from pathlib import Path +from typing import List + +import pkg_resources +from chdb import ChdbError, session +from chdb.dbapi import converters +from dbt_common.exceptions import DbtDatabaseError + +from dbt.adapters.clickhouse import ClickHouseColumn, ClickHouseCredentials +from dbt.adapters.clickhouse.dbclient import ChClientWrapper +from dbt.adapters.clickhouse.logger import logger + +try: + driver_version = pkg_resources.get_distribution("chdb").version +except pkg_resources.ResolutionError: + driver_version = "unknown" + + +class ChDBClient(ChClientWrapper): + def query(self, sql, **kwargs): + # TODO: we might need to preprocess `sql` + try: + result = self._client.query(sql, "JSON", **kwargs) + result = CHDBResult(result=result) + result.read() + return result + except CHDBResultError as ex: + raise DbtDatabaseError( + f"reading result from chdb query using json failed: {str(ex).strip()}" + ) from ex + except ChdbError as ex: + raise DbtDatabaseError(f"chdb query failed with exception: {str(ex).strip()}") from ex + except Exception as ex: + raise DbtDatabaseError(str(ex).strip()) from ex + + def command(self, sql, **kwargs): + try: + result = self._client.query(sql, **kwargs) + if result.has_error(): + raise DbtDatabaseError(str(result.error_message.strip())) + elif result.size() == 0: + return True + else: + result = int(result.data()) + return result + except Exception as ex: + raise DbtDatabaseError(f"chdb command failed with exception: {str(ex).strip()}") from ex + + def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]: + try: + query_result = self._client.query( + f"SELECT * FROM ( \n" f"{sql} \n" f") LIMIT 0", + **kwargs, + ) + return [ + ClickHouseColumn.create(name, ch_type.name) + for name, ch_type in zip(query_result.column_names, query_result.column_types) + ] + except ChdbError as ex: + raise DbtDatabaseError( + f"chdb columns_in_query failed with exception: {str(ex).strip()}" + ) from ex + except Exception as ex: + raise DbtDatabaseError(str(ex).strip()) from ex + + def get_ch_setting(self, setting_name): + try: + result = self._client.query( + f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'", + "JSON", + ) + if result.has_error(): + raise DbtDatabaseError(str(result.error_message.strip())) + else: + result = json.loads(result.data()) + result = result["data"][0] + return (result["value"], int(result["readonly"])) if result else (None, 0) + except Exception as ex: + logger.warning("Unexpected error retrieving ClickHouse server setting", ex) + return None + + def close(self): + pass + # self._client.cleanup() + + def _create_client(self, credentials: ClickHouseCredentials): + # We want to append the path below to target_dir to have relative paths implementation in the configuration + chdb_state_dir = Path(credentials.chdb_state_dir) + + if not chdb_state_dir.exists(): + logger.debug(f"Provided chdb_state_dir doesn't exist: {chdb_state_dir}") + chdb_state_dir.mkdir(parents=True, exist_ok=True) + + session_dir = chdb_state_dir / f"{self._conn_settings['session_id']}" + logger.info(f"Provided session_dir: {session_dir}") + client = session.Session(path=session_dir.as_posix()) + + # We want to append the path below to target_dir to have relative paths implementation in the configuration + chdb_dump_dir = Path.cwd() / credentials.chdb_dump_dir + chdb_dump_files = list(chdb_dump_dir.glob("**/*.sql")) + if len(chdb_dump_files) == 0: + logger.warning(f"Provided chdb_dump_files is empty: {chdb_dump_files}") + return + + for chdb_dump_file in chdb_dump_files: + sql_content = chdb_dump_file.read_text() + try: + client.query(sql_content) + except ChdbError as ex: + raise DbtDatabaseError( + f"client creation failed with exception: {str(ex).strip()}" + ) from ex + return client + + def _set_client_database(self): + pass + + def _server_version(self): + return self._client.query("select version()").data().strip().replace('"', "") + + +class CHDBResultError(Exception): + pass + + +# TODO: This is from https://github.com/chdb-io/chdb/blob/e326128df44248b187b4f421bf6a5c796791b2dc/chdb/dbapi/connections.py#L175C1-L217C70 +# We might want to use the dbApi instead +class CHDBResult: + def __init__(self, result): + """ + :type connection: Connection + """ + self.result = result + self.affected_rows = 0 + self.insert_id = None + self.warning_count = 0 + self.message = None + self.field_count = 0 + self.description = None + self.rows = None + self.has_next = None + self.result_set = None + self.column_names = None + + def read(self): + # Handle empty responses (for instance from CREATE TABLE) + if self.result is None: + return + + if self.result.has_error(): + raise CHDBResultError(str(self.result.error_message.strip())) + + try: + data = json.loads(self.result.data()) + except Exception as error: + raise CHDBResultError("Unexpected error when loading query result in JSON") from error + + try: + self.field_count = len(data["meta"]) + description = [] + column_names = [] + for meta in data["meta"]: + fields = [meta["name"], meta["type"]] + column_names.append(meta["name"]) + description.append(tuple(fields)) + self.description = tuple(description) + self.column_names = column_names + rows = [] + for line in data["data"]: + row = [] + for i in range(self.field_count): + column_data = converters.convert_column_data( + self.description[i][1], line[self.description[i][0]] + ) + row.append(column_data) + rows.append(tuple(row)) + self.rows = tuple(rows) + self.result_set = tuple(rows) + except Exception as error: + raise CHDBResultError("Read return data err") from error diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index 47d04b8a..16865878 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -36,6 +36,8 @@ class ClickHouseCredentials(Credentials): local_db_prefix: str = '' allow_automatic_deduplication: bool = False tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False + chdb_state_dir: str = "" + chdb_dump_dir: str = "" @property def type(self): diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index ba252d03..a45c14b0 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -42,8 +42,10 @@ def get_db_client(credentials: ClickHouseCredentials): elif driver == 'native': if not port: port = 9440 if credentials.secure else 9000 + elif driver == "chdb": + logger.debug(f"using chdb driver with {credentials}") else: - raise FailedToConnectError(f'Unrecognized ClickHouse driver {driver}') + raise FailedToConnectError(f"Unrecognized ClickHouse driver {driver}") credentials.driver = driver credentials.port = port @@ -56,8 +58,20 @@ def get_db_client(credentials: ClickHouseCredentials): return ChNativeClient(credentials) except ImportError as ex: raise FailedToConnectError( - 'Native adapter required but package clickhouse-driver is not installed' + "Native adapter required but package clickhouse-driver is not installed" ) from ex + elif driver == "chdb": + try: + import chdb + + from dbt.adapters.clickhouse.chdbclient import ChDBClient + + return ChDBClient(credentials) + except ImportError as ex: + raise FailedToConnectError( + "chDB adapter required but package chdb is not installed" + ) from ex + try: import clickhouse_connect # noqa diff --git a/dbt/adapters/clickhouse/logger.py b/dbt/adapters/clickhouse/logger.py index 5ea74245..c2781e69 100644 --- a/dbt/adapters/clickhouse/logger.py +++ b/dbt/adapters/clickhouse/logger.py @@ -1,3 +1,3 @@ from dbt.adapters.events.logging import AdapterLogger -logger = AdapterLogger('dbt_clickhouse') +logger = AdapterLogger("dbt_clickhouse") diff --git a/dbt/adapters/clickhouse/nativeclient.py b/dbt/adapters/clickhouse/nativeclient.py index d0d7fdd5..bf1c492f 100644 --- a/dbt/adapters/clickhouse/nativeclient.py +++ b/dbt/adapters/clickhouse/nativeclient.py @@ -12,9 +12,9 @@ from dbt.adapters.clickhouse.logger import logger try: - driver_version = pkg_resources.get_distribution('clickhouse-driver').version + driver_version = pkg_resources.get_distribution("clickhouse-driver").version except pkg_resources.ResolutionError: - driver_version = 'unknown' + driver_version = "unknown" class ChNativeClient(ChClientWrapper): @@ -48,7 +48,7 @@ def get_ch_setting(self, setting_name): f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'" ) except clickhouse_driver.errors.Error as ex: - logger.warn('Unexpected error retrieving ClickHouse server setting', ex) + logger.warning("Unexpected error retrieving ClickHouse server setting", ex) return None return (result[0][0], result[0][1]) if result else (None, 0) @@ -88,7 +88,7 @@ def _set_client_database(self): def _server_version(self): server_info = self._client.connection.server_info return ( - f'{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}' + f"{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}" ) diff --git a/dbt/adapters/clickhouse/relation.py b/dbt/adapters/clickhouse/relation.py index 8ad3b39a..b85967a2 100644 --- a/dbt/adapters/clickhouse/relation.py +++ b/dbt/adapters/clickhouse/relation.py @@ -7,6 +7,7 @@ from dbt_common.exceptions import DbtRuntimeError from dbt_common.utils import deep_merge +from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier NODE_TYPE_SOURCE = 'source' @@ -127,6 +128,11 @@ def create_from( relation_config.config.get('engine') if relation_config.config.get('engine') else '' ) can_on_cluster = cls.get_on_cluster(cluster, materialized, engine) + if quoting.credentials.driver == "chdb": + logger.debug("Driver is chDB, forcing engine to be MergeTree") + engine = "MergeTree" + relation_config.config.engine = engine + can_on_cluster = False return cls.create( database='', diff --git a/examples/taxis/README.md b/examples/taxis/README.md index c158f7c6..5fde24db 100644 --- a/examples/taxis/README.md +++ b/examples/taxis/README.md @@ -32,7 +32,7 @@ CREATE TABLE taxis.trips ( ) ENGINE = MergeTree ORDER BY trip_id; - + SET input_format_skip_unknown_fields = 1; INSERT INTO taxis.trips @@ -62,26 +62,25 @@ FROM s3( ## Create a dbt profile entry -Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt +Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt + ```yml taxis: outputs: - dev: type: clickhouse - threads: 4 + threads: 4 host: localhost - port: 8123 + port: 8123 user: dbt_test password: dbt_password use_lw_deletes: true - schema: taxis_dbt + schema: taxis_dbt target: dev - ``` ## Run the model -`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding +`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding additional random trip_ids). diff --git a/examples/taxis/dump/taxi.sql b/examples/taxis/dump/taxi.sql new file mode 100644 index 00000000..9aa5aab6 --- /dev/null +++ b/examples/taxis/dump/taxi.sql @@ -0,0 +1,23 @@ +CREATE DATABASE taxis; + +CREATE TABLE taxis.trips ( + trip_id UInt32, + pickup_datetime DateTime, + dropoff_datetime DateTime, + pickup_longitude Nullable(Float64), + pickup_latitude Nullable(Float64), + dropoff_longitude Nullable(Float64), + dropoff_latitude Nullable(Float64), + passenger_count UInt8, + trip_distance Float32, + fare_amount Float32, + extra Float32, + tip_amount Float32, + tolls_amount Float32, + total_amount Float32, + payment_type LowCardinality(String), + pickup_ntaname LowCardinality(String), + dropoff_ntaname LowCardinality(String) +) +ENGINE = MergeTree +ORDER BY trip_id; diff --git a/examples/taxis/models/schema.yml b/examples/taxis/models/schema.yml index 5f493087..59dc5c44 100644 --- a/examples/taxis/models/schema.yml +++ b/examples/taxis/models/schema.yml @@ -4,14 +4,14 @@ models: - name: trips_inc description: NY Taxi dataset from S3 config: - materialized: incremental + materialized: table order_by: rand_trip_id unique_key: rand_trip_id - - name: trips_rand - description: Random indexes to apply to incremental materialization - config: - materialized: incremental - order_by: date_time - uniq_id: date_time - incremental_strategy: append + # - name: trips_rand + # description: Random indexes to apply to incremental materialization + # config: + # materialized: incremental + # order_by: date_time + # uniq_id: date_time + # incremental_strategy: append diff --git a/examples/taxis/models/trips_inc.sql b/examples/taxis/models/trips_inc.sql index 9b03c086..a1618b25 100644 --- a/examples/taxis/models/trips_inc.sql +++ b/examples/taxis/models/trips_inc.sql @@ -1,8 +1,5 @@ -with (select start, end from {{ ref('trips_rand') }} ORDER BY date_time DESC LIMIT 1) as range, -(select count() from {{ ref('trips_rand') }}) as run_num - -select rand() as rand_trip_id, * EXCEPT trip_id, run_num, trip_id as orig_id from {{ source('taxis_source', 'trips') }} - LEFT JOIN numbers(24) as sysnum ON 1 = 1 - where bitAnd(orig_id, 1023) between range.1 and range.2 - - +select + rand() as rand_trip_id, + *, + trip_id as orig_id +from {{ source('taxis_source', 'trips') }} diff --git a/examples/taxis/models/trips_rand.sql b/examples/taxis/models/trips_rand.sql deleted file mode 100644 index 0aa900db..00000000 --- a/examples/taxis/models/trips_rand.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT now64() as date_time, rand() % 512 as start, rand() % (1023 - start) + start as end \ No newline at end of file diff --git a/examples/taxis/profiles.yml b/examples/taxis/profiles.yml new file mode 100644 index 00000000..e492c3eb --- /dev/null +++ b/examples/taxis/profiles.yml @@ -0,0 +1,21 @@ +taxis: + outputs: + dev: + type: clickhouse + threads: 4 + host: localhost + port: 8123 + user: dbt_test + password: dbt_password + use_lw_deletes: true + schema: taxis_dbt + + chdb-dev: + type: clickhouse + driver: chdb + chdb_state_dir: "chdb_state" + chdb_dump_dir: "examples/taxis/dump" + threads: 1 + schema: taxis_dbt + + target: dev diff --git a/pyproject.toml b/pyproject.toml index 842526f6..7f630c9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ line-length = 100 skip-string-normalization = true target-version = ['py310', 'py311', 'py312'] -exclude = '(\.eggs|\.git|\.mypy_cache|\.venv|venv|env|_build|build|build|dist|)' +exclude = '(\.eggs|\.git|\.mypy_cache|\.venv|venv|env|_build|build|build|dist)' [tool.isort] line_length = 100 diff --git a/setup.py b/setup.py index 43b2151e..a1d2be65 100644 --- a/setup.py +++ b/setup.py @@ -40,37 +40,38 @@ def _dbt_clickhouse_version(): version=package_version, description=description, long_description=long_description, - long_description_content_type='text/markdown', - author='ClickHouse Inc.', - author_email='guy@clickhouse.com', - url='https://github.com/ClickHouse/dbt-clickhouse', - license='MIT', - packages=find_namespace_packages(include=['dbt', 'dbt.*']), + long_description_content_type="text/markdown", + author="ClickHouse Inc.", + author_email="guy@clickhouse.com", + url="https://github.com/ClickHouse/dbt-clickhouse", + license="MIT", + packages=find_namespace_packages(include=["dbt", "dbt.*"]), package_data={ - 'dbt': [ - 'include/clickhouse/dbt_project.yml', - 'include/clickhouse/macros/*.sql', - 'include/clickhouse/macros/**/*.sql', + "dbt": [ + "include/clickhouse/dbt_project.yml", + "include/clickhouse/macros/*.sql", + "include/clickhouse/macros/**/*.sql", ] }, install_requires=[ - f'dbt-core~={dbt_version}', - 'clickhouse-connect>=0.6.22', - 'clickhouse-driver>=0.2.6', - 'setuptools>=0.69', + f"dbt-core~={dbt_version}", + "clickhouse-connect>=0.6.22", + "clickhouse-driver>=0.2.6", + "chdb>=2.0.4", + "setuptools>=0.69", ], python_requires=">=3.8", - platforms='any', + platforms="any", classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: Microsoft :: Windows', - 'Operating System :: MacOS :: MacOS X', - 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Programming Language :: Python :: 3.11', - 'Programming Language :: Python :: 3.12', + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: Apache Software License", + "Operating System :: Microsoft :: Windows", + "Operating System :: MacOS :: MacOS X", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", ], ) diff --git a/tests/integration/adapter/chdb/test_chdb_materialization.py b/tests/integration/adapter/chdb/test_chdb_materialization.py new file mode 100644 index 00000000..ca8f7d62 --- /dev/null +++ b/tests/integration/adapter/chdb/test_chdb_materialization.py @@ -0,0 +1,65 @@ +from pathlib import Path + +import pytest +from dbt.tests.util import run_dbt + +amount_from_taxis_sql = """ +{{ + config( + materialized = "table" + ) +}} + +select sum(total_amount) as total_cost from taxis.trips + +""" + +amount_from_taxis_schema = """ +version: 2 + +models: + - name: amount_from_taxis + description: "sum of taxis trips amounts" + columns: + - name: total_cost + description: "sum of taxis trips amounts" + +""" + + +class TestChdbMaterialization: + @pytest.fixture(scope="class") + def profiles_config_update(self): + return { + "test": { + "outputs": { + "chdb-test": { + "type": "clickhouse", + "driver": "chdb", + "chdb_state_dir": "chdb_state", + # This is a temporary measure to make sure it works for everyone running the test. + # Linked to the improvement needed on dbt/adapters/clickhouse/chdbclient.py L99 + "chdb_dump_dir": str( + Path(__file__).resolve().parent.parent.parent.parent.parent + / "examples" + / "taxis" + / "dump" + ), + "threads": 1, + "schema": "taxis_dbt", + } + }, + "target": "chdb-test", + } + } + + @pytest.fixture(scope="class") + def models(self): + return { + "schema.yml": amount_from_taxis_schema, + "amount_from_taxis.sql": amount_from_taxis_sql, + } + + def test_chdb_base(self, project): + res = run_dbt() + assert len(res) > 0