From 99ec58a444d33970405e2b37b87d6e8bf989a7cc Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Mon, 20 Feb 2023 16:31:46 +0100 Subject: [PATCH 1/2] add package deltalake integration --- mara_storage/integrations/deltalake.py | 110 +++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 mara_storage/integrations/deltalake.py diff --git a/mara_storage/integrations/deltalake.py b/mara_storage/integrations/deltalake.py new file mode 100644 index 0000000..8ac4b80 --- /dev/null +++ b/mara_storage/integrations/deltalake.py @@ -0,0 +1,110 @@ +""" +Helper functions interacting with module deltalake. +""" +from functools import singledispatch +from pathlib import Path +from typing import Self, Union, Optional, Dict + +import deltalake + +from mara_storage import storages + + +@singledispatch +def deltalake_build_uri(storage, path: str): + """ + Creates a URI path for deltalake + """ + raise NotImplementedError(f'Please implement deltalake_build_uri for type "{storage.__class__.__name__}"') + + +@deltalake_build_uri.register(str) +def __(storage: str, path: str): + return deltalake_build_uri(storages.storage(storage), path=path) + + +@deltalake_build_uri.register(storages.AzureStorage) +def __(storage: storages.AzureStorage, path: str): + return f'abfs://{storage.container_name}/{path}' + + +@deltalake_build_uri.register(storages.GoogleCloudStorage) +def __(storage: storages.AzureStorage, path: str): + return f'gs://{storage.bucket_name}/{path}' + + + +@singledispatch +def deltalake_storage_options(storage): + """ + Returns the storage options required for the deltalake module. + + See as well: + https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants + + Args: + storage: The storage as alias or class. + """ + raise NotImplementedError(f'Please implement deltalake_storage_options for type "{storage.__class__.__name__}"') + + +@deltalake_storage_options.register(str) +def __(storage: str): + return deltalake_storage_options(storages.storage(storage)) + + +@deltalake_storage_options.register(storages.AzureStorage) +def __(storage: storages.AzureStorage): + return { + # See https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants + 'account_name': storage.account_name, + 'account_key': storage.account_key, + 'sas_key': storage.sas + } + + +@deltalake_storage_options.register(storages.GoogleCloudStorage) +def __(storage: storages.GoogleCloudStorage): + return { + # See https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants + 'bucket_name': storage.bucket_name, + 'service_account': storage.service_account_file + } + + +class DeltaTable(deltalake.DeltaTable): + def __new__(cls: type[Self], path: str = None, version: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, + without_files: bool = False, table_uri: Union[str, Path] = None, storage: Union[str, storages.Storage] = None, **kargs) -> Self: + """ + Create the Delta Table from a path with an optional version. + Multiple StorageBackends are currently supported: AWS S3, Azure Data Lake Storage Gen2, Google Cloud Storage (GCS) and local URI. + Depending on the storage backend used, you could provide options values using the ``storage_options`` parameter. + :param path: the path of the DeltaTable. When you want to use URI, use parameter `table_uri`. + :param version: version of the DeltaTable + :param storage_options: a dictionary of the options to use for the storage backend + :param without_files: If True, will load table without tracking files. + Some append-only applications might have no need of tracking any files. So, the + DeltaTable will be loaded with a significant memory reduction. + :param table_uri: the path of the DeltaTable. If set, parameter `path` is ignored. + :param storage: the mara storage as alias or class + """ + + if table_uri is None: + if storage: + table_uri = deltalake_build_uri(storage, path) + elif path: + table_uri = path + else: + raise ValueError('You either have to ') + + if storage_options is None: + storage_options = {} + + if storage: + storage_options = deltalake_storage_options(storage).update(storage_options) + + return DeltaTable( + table_uri=table_uri, + version=version, + storage_options=storage_options, + without_files=without_files) From 62a1d5ebef2bf6ca9ac34ece02c1e358c5e24430 Mon Sep 17 00:00:00 2001 From: Leo Schick Date: Wed, 22 Feb 2023 18:12:01 +0100 Subject: [PATCH 2/2] add mara_storage.integrations module --- mara_storage/integrations/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 mara_storage/integrations/__init__.py diff --git a/mara_storage/integrations/__init__.py b/mara_storage/integrations/__init__.py new file mode 100644 index 0000000..e69de29