-
Notifications
You must be signed in to change notification settings - Fork 162
Feat: Introduce ExtendedGcsFileSystem for Zonal Bucket gRPC Read Path #707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 58 commits
ecbec0e
d506a60
9a70933
eb209f5
0b606b1
409c86e
8a4ca85
c5a9eae
e95495f
95440d5
dfcbb7f
f3d1031
20b36fc
c9f569a
e5fc935
50d30b3
cbf00b3
a202579
5064a97
ac276f6
dbecb12
37a495b
cba7d27
4ff9fe4
389a4b0
133b4fa
31b2a2f
e36b7ed
25cd0ef
8425464
75fecce
2b6af9c
b648df4
f71f4e8
1c99137
1099375
d834b07
bfd513f
efabe35
2ed3cc6
957d7b5
cd222cb
17618d5
064c286
8b2a8d9
b1f0117
b254a14
7983528
cdf574a
b411bef
c47b53f
e37d0fb
11af000
c4cf777
5714012
fccd43a
11dd722
a2b5077
0c3df8e
0e9c2ab
c3ad9b2
a8945c2
774b53d
90d0cf4
8672c28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| [settings] | ||
| profile = black | ||
| known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,pytest,requests,setuptools |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| import logging | ||
| from enum import Enum | ||
|
|
||
| from fsspec import asyn | ||
| from google.api_core import exceptions as api_exceptions | ||
| from google.api_core import gapic_v1 | ||
| from google.api_core.client_info import ClientInfo | ||
| from google.cloud import storage_control_v2 | ||
| from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient | ||
|
|
||
| from . import __version__ as version | ||
| from . import zb_hns_utils | ||
| from .core import GCSFile, GCSFileSystem | ||
| from .zonal_file import ZonalFile | ||
|
|
||
| logger = logging.getLogger("gcsfs") | ||
|
|
||
| USER_AGENT = "python-gcsfs" | ||
|
|
||
|
|
||
| class BucketType(Enum): | ||
| ZONAL_HIERARCHICAL = "ZONAL_HIERARCHICAL" | ||
| HIERARCHICAL = "HIERARCHICAL" | ||
| NON_HIERARCHICAL = "NON_HIERARCHICAL" | ||
| UNKNOWN = "UNKNOWN" | ||
|
|
||
|
|
||
| gcs_file_types = { | ||
| BucketType.ZONAL_HIERARCHICAL: ZonalFile, | ||
| BucketType.NON_HIERARCHICAL: GCSFile, | ||
| BucketType.HIERARCHICAL: GCSFile, | ||
| BucketType.UNKNOWN: GCSFile, | ||
| } | ||
|
|
||
|
|
||
| class GCSFileSystemAdapter(GCSFileSystem): | ||
| """ | ||
| This class will be used when experimental_zb_hns_support is set to true for all bucket types. | ||
| GCSFileSystemAdapter is a subclass of GCSFileSystem that adds specialized | ||
| logic to support Zonal and Hierarchical buckets. | ||
| """ | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| kwargs.pop("experimental_zb_hns_support", None) | ||
| super().__init__(*args, **kwargs) | ||
| self.grpc_client = None | ||
| self.storage_control_client = None | ||
| # initializing grpc and storage control client for Hierarchical and | ||
| # zonal bucket operations | ||
| self.grpc_client = asyn.sync(self.loop, self._create_grpc_client) | ||
| self._storage_control_client = asyn.sync( | ||
| self.loop, self._create_control_plane_client | ||
| ) | ||
| self._storage_layout_cache = {} | ||
|
|
||
| async def _create_grpc_client(self): | ||
| if self.grpc_client is None: | ||
| return AsyncGrpcClient( | ||
| client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), | ||
| ).grpc_client | ||
| else: | ||
| return self.grpc_client | ||
|
|
||
| async def _create_control_plane_client(self): | ||
| # Initialize the storage control plane client for bucket | ||
| # metadata operations | ||
| client_info = gapic_v1.client_info.ClientInfo( | ||
| user_agent=f"{USER_AGENT}/{version}" | ||
| ) | ||
| return storage_control_v2.StorageControlAsyncClient( | ||
| credentials=self.credentials.credentials, client_info=client_info | ||
| ) | ||
|
|
||
| async def _get_bucket_type(self, bucket): | ||
| if bucket in self._storage_layout_cache: | ||
| return self._storage_layout_cache[bucket] | ||
| try: | ||
|
|
||
| # Bucket name details | ||
| bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout" | ||
| # Make the request to get bucket type | ||
| response = await self._storage_control_client.get_storage_layout( | ||
| name=bucket_name_value | ||
| ) | ||
|
|
||
| if response.location_type == "zone": | ||
| return BucketType.ZONAL_HIERARCHICAL | ||
| else: | ||
| # This should be updated to include HNS in the future | ||
| return BucketType.NON_HIERARCHICAL | ||
| except api_exceptions.NotFound: | ||
| print(f"Error: Bucket {bucket} not found or you lack permissions.") | ||
| return BucketType.UNKNOWN | ||
| except Exception as e: | ||
| logger.error( | ||
| f"Could not determine bucket type for bucket name {bucket}: {e}" | ||
| ) | ||
| # Default to UNKNOWN | ||
| return BucketType.UNKNOWN | ||
|
|
||
| _sync_get_bucket_type = asyn.sync_wrapper(_get_bucket_type) | ||
|
|
||
| def _open( | ||
| self, | ||
| path, | ||
| mode="rb", | ||
| block_size=None, | ||
| cache_options=None, | ||
| acl=None, | ||
| consistency=None, | ||
| metadata=None, | ||
| autocommit=True, | ||
| fixed_key_metadata=None, | ||
| generation=None, | ||
| **kwargs, | ||
| ): | ||
| """ | ||
| Open a file. | ||
| """ | ||
| bucket, _, _ = self.split_path(path) | ||
| bucket_type = self._sync_get_bucket_type(bucket) | ||
| self._storage_layout_cache[bucket] = bucket_type | ||
| return gcs_file_types[bucket_type]( | ||
| self, | ||
| path, | ||
| mode, | ||
| block_size, | ||
| cache_options=cache_options, | ||
| consistency=consistency, | ||
| metadata=metadata, | ||
| acl=acl, | ||
| autocommit=autocommit, | ||
| fixed_key_metadata=fixed_key_metadata, | ||
| generation=generation, | ||
| **kwargs, | ||
| ) | ||
|
|
||
| # Replacement method for _process_limits to support new params (offset and length) for MRD. | ||
| async def _process_limits_to_offset_and_length(self, path, start, end): | ||
| """ | ||
| Calculates the read offset and length from start and end parameters. | ||
|
|
||
| Args: | ||
| path (str): The path to the file. | ||
| start (int | None): The starting byte position. | ||
| end (int | None): The ending byte position. | ||
|
|
||
| Returns: | ||
| tuple: A tuple containing (offset, length). | ||
|
|
||
| Raises: | ||
| ValueError: If the calculated range is invalid. | ||
| """ | ||
| size = None | ||
|
|
||
| if start is None: | ||
| offset = 0 | ||
| elif start < 0: | ||
| size = size or (await self._info(path))["size"] | ||
| offset = size + start | ||
| else: | ||
| offset = start | ||
|
|
||
| if end is None: | ||
| size = size or (await self._info(path))["size"] | ||
| effective_end = size | ||
| elif end < 0: | ||
| size = size or (await self._info(path))["size"] | ||
| effective_end = size + end | ||
| else: | ||
| effective_end = end | ||
|
|
||
| size = size or (await self._info(path))["size"] | ||
| if offset < 0: | ||
| raise ValueError(f"Calculated start offset ({offset}) cannot be negative.") | ||
| if effective_end < offset: | ||
| raise ValueError( | ||
| f"Calculated end position ({effective_end}) cannot be before start offset ({offset})." | ||
| ) | ||
| elif effective_end == offset: | ||
| length = 0 # Handle zero-length slice | ||
| elif effective_end > size: | ||
| length = max(0, size - offset) # Clamp and ensure non-negative | ||
| else: | ||
| length = effective_end - offset # Normal case | ||
|
|
||
| return offset, length | ||
|
|
||
| sync_process_limits_to_offset_and_length = asyn.sync_wrapper( | ||
| _process_limits_to_offset_and_length | ||
| ) | ||
|
|
||
| async def _is_zonal_bucket(self, bucket): | ||
| bucket_type = await self._get_bucket_type(bucket) | ||
| self._storage_layout_cache[bucket] = bucket_type | ||
| return bucket_type == BucketType.ZONAL_HIERARCHICAL | ||
|
|
||
| async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
| """ | ||
| Fetch a file's contents as bytes. | ||
| """ | ||
| mrd = kwargs.pop("mrd", None) | ||
| mrd_created = False | ||
|
|
||
| # A new MRD is required when read is done directly by the | ||
| # GCSFilesystem class without creating a GCSFile object first. | ||
| if mrd is None: | ||
| bucket, object_name, generation = self.split_path(path) | ||
| # Fall back to default implementation if not a zonal bucket | ||
| if not await self._is_zonal_bucket(bucket): | ||
| return await super()._cat_file(path, start=start, end=end, **kwargs) | ||
|
|
||
| mrd = await zb_hns_utils.create_mrd( | ||
| self.grpc_client, bucket, object_name, generation | ||
| ) | ||
| mrd_created = True | ||
|
|
||
| offset, length = await self._process_limits_to_offset_and_length( | ||
| path, start, end | ||
| ) | ||
| try: | ||
| return await zb_hns_utils.download_range( | ||
| offset=offset, length=length, mrd=mrd | ||
| ) | ||
| finally: | ||
| # Explicit cleanup if we created the MRD and it has a close method | ||
| if mrd_created: | ||
| await mrd.close() |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,8 @@ | |||||||
| import shlex | ||||||||
| import subprocess | ||||||||
| import time | ||||||||
| from contextlib import nullcontext | ||||||||
| from unittest.mock import patch | ||||||||
|
|
||||||||
| import fsspec | ||||||||
| import pytest | ||||||||
|
|
@@ -91,10 +93,9 @@ def docker_gcs(): | |||||||
| def gcs_factory(docker_gcs): | ||||||||
| params["endpoint_url"] = docker_gcs | ||||||||
|
|
||||||||
| def factory(default_location=None): | ||||||||
| def factory(**kwargs): | ||||||||
| GCSFileSystem.clear_instance_cache() | ||||||||
| params["default_location"] = default_location | ||||||||
| return fsspec.filesystem("gcs", **params) | ||||||||
| return fsspec.filesystem("gcs", **params, **kwargs) | ||||||||
|
|
||||||||
| return factory | ||||||||
|
|
||||||||
|
|
@@ -125,6 +126,48 @@ def gcs(gcs_factory, populate=True): | |||||||
| pass | ||||||||
|
|
||||||||
|
|
||||||||
| @pytest.fixture | ||||||||
| def gcs_adapter(gcs_factory, populate=True): | ||||||||
| # Check if we are running against a real GCS endpoint | ||||||||
| is_real_gcs = ( | ||||||||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| os.environ.get("STORAGE_EMULATOR_HOST") == "https://storage.googleapis.com" | ||||||||
| ) | ||||||||
|
|
||||||||
| patch_manager = ( | ||||||||
| patch("google.auth.default", return_value=(None, "fake-project")) | ||||||||
| if not is_real_gcs | ||||||||
| else nullcontext() | ||||||||
| ) | ||||||||
|
|
||||||||
| with patch_manager: | ||||||||
| gcs_adapter = gcs_factory(experimental_zb_hns_support=True) | ||||||||
ankitaluthra1 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||
| try: | ||||||||
| # Only create/delete/populate the bucket if we are NOT using the real GCS endpoint | ||||||||
| if not is_real_gcs: | ||||||||
| try: | ||||||||
| gcs_adapter.rm(TEST_BUCKET, recursive=True) | ||||||||
| except FileNotFoundError: | ||||||||
| pass | ||||||||
| try: | ||||||||
| gcs_adapter.mkdir(TEST_BUCKET) | ||||||||
| except Exception: | ||||||||
| pass | ||||||||
| if populate: | ||||||||
| gcs_adapter.pipe( | ||||||||
| {TEST_BUCKET + "/" + k: v for k, v in allfiles.items()} | ||||||||
| ) | ||||||||
| gcs_adapter.invalidate_cache() | ||||||||
| yield gcs_adapter | ||||||||
|
||||||||
| finally: | ||||||||
| try: | ||||||||
| # Only remove the bucket/contents if we are NOT using the real GCS | ||||||||
| if not is_real_gcs: | ||||||||
| gcs_adapter.rm(gcs_adapter.find(TEST_BUCKET), recursive=True) | ||||||||
| gcs_adapter.rm(TEST_BUCKET) | ||||||||
|
||||||||
| gcs_adapter.rm(gcs_adapter.find(TEST_BUCKET), recursive=True) | |
| gcs_adapter.rm(TEST_BUCKET) | |
| gcs_adapter.rm(TEST_BUCKET, recursive=True) |
Uh oh!
There was an error while loading. Please reload this page.