Skip to content

Commit

Permalink
fsspec integration (#63)
Browse files Browse the repository at this point in the history
* Initial fsspec integration

* Add _cat_ranges

* Add failing test

* Add pipe_file

* Add test fixtures

* Simple file override

* silghtly more

* lint

* Update obstore/python/obstore/fsspec.py

Co-authored-by: Kyle Barron <[email protected]>

* add conftest

* tests and docstrings

* make fs not cachable

* start/end

* in cat also

* Try mixed ranges

* Allow None ranges

* overwrite test

* fix

* revive subclass

* xfails

* xfails didn't stick

* give reason

* Update obstore/python/obstore/fsspec.py

* Update obstore/python/obstore/fsspec.py

* update for signature

* Update obstore/python/obstore/fsspec.py

---------

Co-authored-by: Kyle Barron <[email protected]>
Co-authored-by: Kyle Barron <[email protected]>
  • Loading branch information
3 people authored Nov 13, 2024
1 parent f34ba9b commit 6ec315f
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 63 deletions.
179 changes: 179 additions & 0 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Fsspec integration.
The underlying `object_store` Rust crate [cautions](https://docs.rs/object_store/latest/object_store/#why-not-a-filesystem-interface) against relying too strongly on stateful filesystem representations of object stores:
> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems.
>
> This design provides the following advantages:
>
> - All operations are atomic, and readers cannot observe partial and/or failed writes
> - Methods map directly to object store APIs, providing both efficiency and predictability
> - Abstracts away filesystem and operating system specific quirks, ensuring portability
> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads
Where possible, implementations should use the underlying `obstore` APIs
directly. Only where this is not possible should users fall back to this fsspec
integration.
"""

from __future__ import annotations

import asyncio
from collections import defaultdict
from typing import Any, Coroutine, Dict, List, Tuple

import fsspec.asyn
import fsspec.spec

import obstore as obs


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
"""An fsspec implementation based on a obstore Store"""

cachable = False

def __init__(
self,
store: obs.store.ObjectStore,
*args,
asynchronous: bool = False,
loop=None,
batch_size: int | None = None,
):
"""Construct a new AsyncFsspecStore
store: a configured instance of one of the store classes in objstore.store
asynchronous: id this instance meant to be be called using the async API? This
should only be set to true when running within a coroutine
loop: since both fsspec/python and tokio/rust may be using loops, this should
be kept `None` for now, and will not be used.
batch_size: some operations on many files will batch their requests; if you
are seeing timeouts, you may want to set this number smaller than the defaults,
which are determined in fsspec.asyn._get_batch_size
"""

self.store = store
super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

async def _cp_file(self, path1, path2, **kwargs):
return await obs.copy_async(self.store, path1, path2)

async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
if start is None and end is None:
resp = await obs.get_async(self.store, path)
return await resp.bytes_async()

return await obs.get_range_async(self.store, path, start=start, end=end)

async def _cat_ranges(
self,
paths: List[str],
starts: List[int] | int,
ends: List[int] | int,
max_gap=None,
batch_size=None,
on_error="return",
**kwargs,
):
if isinstance(starts, int):
starts = [starts] * len(paths)
if isinstance(ends, int):
ends = [ends] * len(paths)
if not len(paths) == len(starts) == len(ends):
raise ValueError

per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list)
for idx, (path, start, end) in enumerate(zip(paths, starts, ends)):
per_file_requests[path].append((start, end, idx))

futs: List[Coroutine[Any, Any, List[bytes]]] = []
for path, ranges in per_file_requests.items():
offsets = [r[0] for r in ranges]
ends = [r[1] for r in ranges]
fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends)
futs.append(fut)

result = await asyncio.gather(*futs)

output_buffers: List[bytes] = [b""] * len(paths)
for per_file_request, buffers in zip(per_file_requests.items(), result):
path, ranges = per_file_request
for buffer, ranges_ in zip(buffers, ranges):
initial_index = ranges_[2]
output_buffers[initial_index] = buffer.as_bytes()

return output_buffers

async def _put_file(self, lpath, rpath, **kwargs):
with open(lpath, "rb") as f:
await obs.put_async(self.store, rpath, f)

async def _get_file(self, rpath, lpath, **kwargs):
with open(lpath, "wb") as f:
resp = await obs.get_async(self.store, rpath)
async for buffer in resp.stream():
f.write(buffer)

async def _info(self, path, **kwargs):
head = await obs.head_async(self.store, path)
return {
# Required of `info`: (?)
"name": head["path"],
"size": head["size"],
"type": "directory" if head["path"].endswith("/") else "file",
# Implementation-specific keys
"e_tag": head["e_tag"],
"last_modified": head["last_modified"],
"version": head["version"],
}

async def _ls(self, path, detail=True, **kwargs):
result = await obs.list_with_delimiter_async(self.store, path)
objects = result["objects"]
prefs = result["common_prefixes"]
if detail:
return [
{
"name": object["path"],
"size": object["size"],
"type": "file",
"e_tag": object["e_tag"],
}
for object in objects
] + [{"name": object, "size": 0, "type": "directory"} for object in prefs]
else:
return sorted([object["path"] for object in objects] + prefs)

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)


class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

def read(self, length: int = -1):
"""Return bytes from the remote file
length: if positive, returns up to this many bytes; if negative, return all
remaining byets.
"""
if length < 0:
data = self.fs.cat_file(self.path, self.loc, self.size)
self.loc = self.size
else:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev-dependencies = [
"arro3-core>=0.4.2",
"black>=24.10.0",
"boto3>=1.35.38",
"fsspec>=2024.10.0",
"griffe-inherited-docstrings>=1.0.1",
"ipykernel>=6.29.5",
"maturin>=1.7.4",
Expand All @@ -21,6 +22,7 @@ dev-dependencies = [
"moto[s3,server]>=5.0.18",
"pandas>=2.2.3",
"pip>=24.2",
"pyarrow>=17.0.0",
"pytest-asyncio>=0.24.0",
"pytest>=8.3.3",
]
Expand All @@ -41,3 +43,7 @@ select = [
"F401", # Allow unused imports in __init__.py files
"F403", # unable to detect undefined names
]

[tool.pytest.ini_options]
addopts = "-v"
testpaths = ["tests"]
53 changes: 53 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import boto3
import pytest
import urllib3
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

from obstore.store import S3Store

TEST_BUCKET_NAME = "test"


# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html
@pytest.fixture()
def moto_server_uri():
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address="localhost", port=0)
server.start()
if hasattr(server, "get_host_and_port"):
host, port = server.get_host_and_port()
else:
host, port = server._server.server_address
uri = f"http://{host}:{port}"
yield uri
server.stop()


@pytest.fixture()
def s3(moto_server_uri: str):
client = boto3.client(
"s3",
config=Config(signature_version=UNSIGNED),
region_name="us-east-1",
endpoint_url=moto_server_uri,
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
yield moto_server_uri
urllib3.request(method="post", url=f"{moto_server_uri}/moto-api/reset")


@pytest.fixture()
def s3_store(s3):
return S3Store.from_url(
f"s3://{TEST_BUCKET_NAME}/",
config={
"AWS_ENDPOINT_URL": s3,
"AWS_REGION": "us-east-1",
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
},
)
65 changes: 4 additions & 61 deletions tests/store/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,17 @@
import boto3
import pytest
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

import obstore as obs
from obstore.store import S3Store

TEST_BUCKET_NAME = "test"


# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html
@pytest.fixture(scope="module")
def moto_server_uri():
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address="localhost", port=0)
server.start()
host, port = server.get_host_and_port()
uri = f"http://{host}:{port}"
yield uri
server.stop()


@pytest.fixture()
def s3(moto_server_uri: str):
client = boto3.client(
"s3",
config=Config(signature_version=UNSIGNED),
region_name="us-east-1",
endpoint_url=moto_server_uri,
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
return moto_server_uri


# @pytest.fixture(autouse=True)
# def reset_s3_fixture(moto_server_uri):
# import requests

# # We reuse the MotoServer for all tests
# # But we do want a clean state for every test
# try:
# requests.post(f"{moto_server_uri}/moto-api/reset")
# except:
# pass


@pytest.fixture()
def store(s3):
return S3Store.from_url(
f"s3://{TEST_BUCKET_NAME}/",
config={
"AWS_ENDPOINT_URL": s3,
"AWS_REGION": "us-east-1",
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
},
)


@pytest.mark.asyncio
async def test_list_async(store: S3Store):
list_result = await obs.list(store).collect_async()
async def test_list_async(s3_store: S3Store):
list_result = await obs.list(s3_store).collect_async()
assert any("afile" in x["path"] for x in list_result)


@pytest.mark.asyncio
async def test_get_async(store: S3Store):
resp = await obs.get_async(store, "afile")
async def test_get_async(s3_store: S3Store):
resp = await obs.get_async(s3_store, "afile")
buf = await resp.bytes_async()
assert buf == b"hello world"
Loading

0 comments on commit 6ec315f

Please sign in to comment.