-
Notifications
You must be signed in to change notification settings - Fork 162
Feat: Introduce ExtendedGcsFileSystem for Zonal Bucket gRPC Read Path #707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ankitaluthra1
wants to merge
65
commits into
fsspec:main
Choose a base branch
from
ankitaluthra1:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 62 commits
Commits
Show all changes
65 commits
Select commit
Hold shift + click to select a range
ecbec0e
adds logic for handling requests from new adapter is bucket type os s…
ankitaluthra1 d506a60
adds logic for hnadling requests from new adapter is bucket type is z…
ankitaluthra1 9a70933
refactor bucket type enum to include value for HNS bucket type | Fixe…
ankitaluthra1 eb209f5
adds feature toggle behind which experimental feature support would b…
ankitaluthra1 0b606b1
updates tests to include experimental feature toggle
ankitaluthra1 409c86e
minor fix
ankitaluthra1 8a4ca85
moves mrd creation inside core.py
ankitaluthra1 c5a9eae
moves mrd creation to separate GCSFile
ankitaluthra1 e95495f
minor comment
ankitaluthra1 95440d5
Extend gcsfs to create new filesystem
suni72 dfcbb7f
Extend gcsfs and gcsfile to override methods. download happens synchr…
suni72 f3d1031
Merge pull request from ankitaluthra1/lankita-poc-zonal
ankitaluthra1 20b36fc
fixes new test in test_core.py
ankitaluthra1 c9f569a
refactors/renames GCS Filesystem Adapter
ankitaluthra1 e5fc935
Override _cat_file to handle other read methods of gcsfs
suni72 50d30b3
creates grpc client inside GCSHNSFilesystem
ankitaluthra1 cbf00b3
refactors to reuse grpc client in cat_ranges
ankitaluthra1 a202579
Move classmethods in ZonalFile to a util file
suni72 5064a97
Implement logic to process limits as offset and length
suni72 ac276f6
Add fallback logic for non zonal buckets
suni72 dbecb12
Add unit tests for zb_hns_utils
suni72 37a495b
Add test for GCSFSAdapter with read_block
suni72 cba7d27
Update _get_storage_layout to use a single return statement
suni72 4ff9fe4
Updated gcs_adapter.open to pass on correct default values to GCSFile
suni72 389a4b0
Move logic for handing 0 length in MRD to zb_hns_utils
suni72 133b4fa
Add comments for clarity
suni72 31b2a2f
Merge pull request #4 from ankitaluthra1/zb-features
suni72 e36b7ed
Updated zonal file to only create mrd for read mode.
suni72 25cd0ef
Updated gcs_adapter fixture to not setup bucket when real gcs endpoin…
suni72 8425464
Updated gcs_adapter.open to pass on correct default values to GCSFile
suni72 75fecce
Move logic for handing 0 length in MRD to zb_hns_utils
suni72 2b6af9c
Updated zonal file to only create mrd for read mode.
suni72 b648df4
Updated gcs_adapter fixture to not setup bucket when real gcs endpoin…
suni72 f71f4e8
Merge branch 'zb-ft-2' into zb-features
suni72 1c99137
Update test_read_block_zb to use subtests to avoid frequent setup run
suni72 1099375
fix: Optimizes info() and exists() methods
Mahalaxmibejugam d834b07
fix: Optimizes info() and exists() methods
Mahalaxmibejugam bfd513f
fixes lint errors
ankitaluthra1 efabe35
fixes comments
ankitaluthra1 2ed3cc6
fixes lint errors
ankitaluthra1 957d7b5
adds grpc and google-iam dependency
ankitaluthra1 cd222cb
Fix missing argument in open
suni72 17618d5
Fix: Raise NotImplementedError for modes other than read in Zonal bucket
suni72 064c286
Add ClientInfo in AsyncGrpcClient
suni72 8b2a8d9
refactors storage layout to use sdk control client
ankitaluthra1 b1f0117
fixes lint errors
ankitaluthra1 b254a14
Merge branch 'internal-main' into zb-features
ankitaluthra1 7983528
Merge pull request #5 from ankitaluthra1/zb-features
ankitaluthra1 cdf574a
Merge pull request from ankitaluthra1/internal-main
ankitaluthra1 b411bef
Merge branch 'fsspec:main' into main
ankitaluthra1 c47b53f
fixes lint errors
ankitaluthra1 e37d0fb
fixes lint errors
ankitaluthra1 11af000
fixes conda install error
ankitaluthra1 c4cf777
mocks fake test credentials for grpc client
ankitaluthra1 5714012
fixes conflicting lint rules black and isort
ankitaluthra1 fccd43a
adds missing pytest package in conda install
ankitaluthra1 11dd722
refactor get bucket type
ankitaluthra1 a2b5077
Implement Zonal Read Stream Cleanup (#7)
suni72 0c3df8e
adds GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT as env variable instead of kwargs
ankitaluthra1 0e9c2ab
removes timeout from pytest fixture coming as mark has no effect on t…
ankitaluthra1 c3ad9b2
Refactor: Rename GcsFileSystemAdapter to ExtendedGcsFileSystem & Fix …
suni72 a8945c2
Replaces __new__ with conditional import in init
ankitaluthra1 774b53d
Merge branch 'main' into main
ankitaluthra1 90d0cf4
fixes lint errors
ankitaluthra1 8672c28
simplified logic in cleanup_gcs for unit tests
suni72 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| [settings] | ||
| profile = black | ||
| known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,pytest,requests,setuptools |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| import logging | ||
| from enum import Enum | ||
|
|
||
| from fsspec import asyn | ||
| from google.api_core import exceptions as api_exceptions | ||
| from google.api_core import gapic_v1 | ||
| from google.api_core.client_info import ClientInfo | ||
| from google.cloud import storage_control_v2 | ||
| from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient | ||
|
|
||
| from gcsfs import __version__ as version | ||
| from gcsfs import zb_hns_utils | ||
| from gcsfs.core import GCSFile, GCSFileSystem | ||
| from gcsfs.zonal_file import ZonalFile | ||
|
|
||
| logger = logging.getLogger("gcsfs") | ||
|
|
||
| USER_AGENT = "python-gcsfs" | ||
|
|
||
|
|
||
| class BucketType(Enum): | ||
| ZONAL_HIERARCHICAL = "ZONAL_HIERARCHICAL" | ||
| HIERARCHICAL = "HIERARCHICAL" | ||
| NON_HIERARCHICAL = "NON_HIERARCHICAL" | ||
| UNKNOWN = "UNKNOWN" | ||
|
|
||
|
|
||
| gcs_file_types = { | ||
| BucketType.ZONAL_HIERARCHICAL: ZonalFile, | ||
| BucketType.NON_HIERARCHICAL: GCSFile, | ||
| BucketType.HIERARCHICAL: GCSFile, | ||
| BucketType.UNKNOWN: GCSFile, | ||
| } | ||
|
|
||
|
|
||
| class ExtendedGcsFileSystem(GCSFileSystem): | ||
| """ | ||
| This class will be used when GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT env variable is set to true. | ||
| ExtendedGcsFileSystem is a subclass of GCSFileSystem that adds new logic for bucket types | ||
| including zonal and hierarchical. For buckets without special properties, it forwards requests | ||
| to the parent class GCSFileSystem for default processing. | ||
| """ | ||
|
|
||
| def __init__(self, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self.grpc_client = None | ||
| self.storage_control_client = None | ||
| # initializing grpc and storage control client for Hierarchical and | ||
| # zonal bucket operations | ||
| self.grpc_client = asyn.sync(self.loop, self._create_grpc_client) | ||
| self._storage_control_client = asyn.sync( | ||
| self.loop, self._create_control_plane_client | ||
| ) | ||
| self._storage_layout_cache = {} | ||
|
|
||
| async def _create_grpc_client(self): | ||
| if self.grpc_client is None: | ||
| return AsyncGrpcClient( | ||
| client_info=ClientInfo(user_agent=f"{USER_AGENT}/{version}"), | ||
| ).grpc_client | ||
| else: | ||
| return self.grpc_client | ||
|
|
||
| async def _create_control_plane_client(self): | ||
| # Initialize the storage control plane client for bucket | ||
| # metadata operations | ||
| client_info = gapic_v1.client_info.ClientInfo( | ||
| user_agent=f"{USER_AGENT}/{version}" | ||
| ) | ||
| return storage_control_v2.StorageControlAsyncClient( | ||
| credentials=self.credentials.credentials, client_info=client_info | ||
| ) | ||
|
|
||
| async def _get_bucket_type(self, bucket): | ||
| if bucket in self._storage_layout_cache: | ||
| return self._storage_layout_cache[bucket] | ||
| try: | ||
|
|
||
| # Bucket name details | ||
| bucket_name_value = f"projects/_/buckets/{bucket}/storageLayout" | ||
| # Make the request to get bucket type | ||
| response = await self._storage_control_client.get_storage_layout( | ||
| name=bucket_name_value | ||
| ) | ||
|
|
||
| if response.location_type == "zone": | ||
| return BucketType.ZONAL_HIERARCHICAL | ||
| else: | ||
| # This should be updated to include HNS in the future | ||
| return BucketType.NON_HIERARCHICAL | ||
| except api_exceptions.NotFound: | ||
| print(f"Error: Bucket {bucket} not found or you lack permissions.") | ||
| return BucketType.UNKNOWN | ||
| except Exception as e: | ||
| logger.error( | ||
| f"Could not determine bucket type for bucket name {bucket}: {e}" | ||
| ) | ||
| # Default to UNKNOWN | ||
| return BucketType.UNKNOWN | ||
|
|
||
| _sync_get_bucket_type = asyn.sync_wrapper(_get_bucket_type) | ||
|
|
||
| def _open( | ||
| self, | ||
| path, | ||
| mode="rb", | ||
| block_size=None, | ||
| cache_options=None, | ||
| acl=None, | ||
| consistency=None, | ||
| metadata=None, | ||
| autocommit=True, | ||
| fixed_key_metadata=None, | ||
| generation=None, | ||
| **kwargs, | ||
| ): | ||
| """ | ||
| Open a file. | ||
| """ | ||
| bucket, _, _ = self.split_path(path) | ||
| bucket_type = self._sync_get_bucket_type(bucket) | ||
| self._storage_layout_cache[bucket] = bucket_type | ||
| return gcs_file_types[bucket_type]( | ||
| self, | ||
| path, | ||
| mode, | ||
| block_size, | ||
| cache_options=cache_options, | ||
| consistency=consistency, | ||
| metadata=metadata, | ||
| acl=acl, | ||
| autocommit=autocommit, | ||
| fixed_key_metadata=fixed_key_metadata, | ||
| generation=generation, | ||
| **kwargs, | ||
| ) | ||
|
|
||
| # Replacement method for _process_limits to support new params (offset and length) for MRD. | ||
| async def _process_limits_to_offset_and_length(self, path, start, end): | ||
| """ | ||
| Calculates the read offset and length from start and end parameters. | ||
|
|
||
| Args: | ||
| path (str): The path to the file. | ||
| start (int | None): The starting byte position. | ||
| end (int | None): The ending byte position. | ||
|
|
||
| Returns: | ||
| tuple: A tuple containing (offset, length). | ||
|
|
||
| Raises: | ||
| ValueError: If the calculated range is invalid. | ||
| """ | ||
| size = None | ||
|
|
||
| if start is None: | ||
| offset = 0 | ||
| elif start < 0: | ||
| size = size or (await self._info(path))["size"] | ||
| offset = size + start | ||
| else: | ||
| offset = start | ||
|
|
||
| if end is None: | ||
| size = size or (await self._info(path))["size"] | ||
| effective_end = size | ||
| elif end < 0: | ||
| size = size or (await self._info(path))["size"] | ||
| effective_end = size + end | ||
| else: | ||
| effective_end = end | ||
|
|
||
| size = size or (await self._info(path))["size"] | ||
| if offset < 0: | ||
| raise ValueError(f"Calculated start offset ({offset}) cannot be negative.") | ||
| if effective_end < offset: | ||
| raise ValueError( | ||
| f"Calculated end position ({effective_end}) cannot be before start offset ({offset})." | ||
| ) | ||
| elif effective_end == offset: | ||
| length = 0 # Handle zero-length slice | ||
| elif effective_end > size: | ||
| length = max(0, size - offset) # Clamp and ensure non-negative | ||
| else: | ||
| length = effective_end - offset # Normal case | ||
|
|
||
| return offset, length | ||
|
|
||
| sync_process_limits_to_offset_and_length = asyn.sync_wrapper( | ||
| _process_limits_to_offset_and_length | ||
| ) | ||
|
|
||
| async def _is_zonal_bucket(self, bucket): | ||
| bucket_type = await self._get_bucket_type(bucket) | ||
| self._storage_layout_cache[bucket] = bucket_type | ||
| return bucket_type == BucketType.ZONAL_HIERARCHICAL | ||
|
|
||
| async def _cat_file(self, path, start=None, end=None, **kwargs): | ||
| """ | ||
| Fetch a file's contents as bytes. | ||
| """ | ||
| mrd = kwargs.pop("mrd", None) | ||
| mrd_created = False | ||
|
|
||
| # A new MRD is required when read is done directly by the | ||
| # GCSFilesystem class without creating a GCSFile object first. | ||
| if mrd is None: | ||
| bucket, object_name, generation = self.split_path(path) | ||
| # Fall back to default implementation if not a zonal bucket | ||
| if not await self._is_zonal_bucket(bucket): | ||
| return await super()._cat_file(path, start=start, end=end, **kwargs) | ||
|
|
||
| mrd = await zb_hns_utils.create_mrd( | ||
| self.grpc_client, bucket, object_name, generation | ||
| ) | ||
| mrd_created = True | ||
|
|
||
| offset, length = await self._process_limits_to_offset_and_length( | ||
| path, start, end | ||
| ) | ||
| try: | ||
| return await zb_hns_utils.download_range( | ||
| offset=offset, length=length, mrd=mrd | ||
| ) | ||
| finally: | ||
| # Explicit cleanup if we created the MRD and it has a close method | ||
| if mrd_created: | ||
| await mrd.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.