Skip to content

Commit

Permalink
Refactors pulled from bluesky#668 (bluesky#686)
Browse files Browse the repository at this point in the history
* Sort enum members.

* Move structure_family to the end, matching migration result.

* Sort

* Only set include_data_sources param if not default (false).

* Refactor link-writing into separate module.

* Refactor structure family check into dependency

* remove server.core.FULL_LINKS

* Clarify default data_sources for CatalogNodeAdapter

* Add structure_families to dependency check for put '/[node|table]/full'

* Create node uses structure_family of data_source

* Be robust against __doc__ being None.

* Use dask.dataframe.core, not new dask-expr.

---------

Co-authored-by: Padraic Shafer <[email protected]>
  • Loading branch information
2 people authored and hyperrealist committed Mar 18, 2024
1 parent cd69a84 commit 7c875b8
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 157 deletions.
7 changes: 3 additions & 4 deletions tiled/adapters/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ def read_csv(
)


read_csv.__doc__ = (
"""
read_csv.__doc__ = """
This wraps dask.dataframe.read_csv. Original docstring:
"""
+ dask.dataframe.read_csv.__doc__
""" + (
dask.dataframe.read_csv.__doc__ or ""
)
12 changes: 7 additions & 5 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def __aiter__(self):

@property
def data_sources(self):
return [DataSource.from_orm(ds) for ds in self.node.data_sources or []]
return [DataSource.from_orm(ds) for ds in (self.node.data_sources or [])]

async def asset_by_id(self, asset_id):
statement = (
Expand Down Expand Up @@ -597,12 +597,14 @@ async def create_node(
if data_source.management != Management.external:
if structure_family == StructureFamily.container:
raise NotImplementedError(structure_family)
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[structure_family]
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
data_source.parameters = {}
data_uri = str(self.context.writable_storage) + "".join(
f"/{quote_plus(segment)}" for segment in (self.segments + [key])
)
init_storage = DEFAULT_INIT_STORAGE[structure_family]
init_storage = DEFAULT_INIT_STORAGE[data_source.structure_family]
assets = await ensure_awaitable(
init_storage, data_uri, data_source.structure
)
Expand Down Expand Up @@ -1307,9 +1309,9 @@ def specs_array_to_json(specs):


STRUCTURES = {
StructureFamily.container: CatalogContainerAdapter,
StructureFamily.array: CatalogArrayAdapter,
StructureFamily.awkward: CatalogAwkwardAdapter,
StructureFamily.table: CatalogTableAdapter,
StructureFamily.container: CatalogContainerAdapter,
StructureFamily.sparse: CatalogSparseAdapter,
StructureFamily.table: CatalogTableAdapter,
}
2 changes: 1 addition & 1 deletion tiled/catalog/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ class DataSource(Timestamped, Base):
node_id = Column(
Integer, ForeignKey("nodes.id", ondelete="CASCADE"), nullable=False
)
structure_family = Column(Enum(StructureFamily), nullable=False)
structure_id = Column(
Unicode(32), ForeignKey("structures.id", ondelete="CASCADE"), nullable=True
)
Expand All @@ -301,6 +300,7 @@ class DataSource(Timestamped, Base):
parameters = Column(JSONVariant, nullable=True)
# This relates to the mutability of the data.
management = Column(Enum(Management), nullable=False)
structure_family = Column(Enum(StructureFamily), nullable=False)

# many-to-one relationship to Structure
structure: Mapped["Structure"] = relationship(
Expand Down
5 changes: 4 additions & 1 deletion tiled/client/constructors.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,14 @@ def from_context(
and (context.http_client.auth is None)
):
context.authenticate()
params = {}
if include_data_sources:
params["include_data_sources"] = True
content = handle_error(
context.http_client.get(
item_uri,
headers={"Accept": MSGPACK_MIME_TYPE},
params={"include_data_sources": include_data_sources},
params=params,
)
).json()
else:
Expand Down
33 changes: 19 additions & 14 deletions tiled/client/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,18 @@ def __getitem__(self, keys, _ignore_inlined_contents=False):
# Lookup this key *within the search results* of this Node.
key, *tail = keys
tail = tuple(tail) # list -> tuple
params = {
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
self.item["links"]["search"],
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources,
**_queries_to_params(KeyLookup(key)),
**self._queries_as_params,
**self._sorting_params,
},
params=params,
)
).json()
self._cached_len = (
Expand Down Expand Up @@ -305,13 +307,14 @@ def __getitem__(self, keys, _ignore_inlined_contents=False):
self_link = self.item["links"]["self"]
if self_link.endswith("/"):
self_link = self_link[:-1]
params = {}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
self_link + "".join(f"/{key}" for key in keys[i:]),
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources
},
params=params,
)
).json()
except ClientError as err:
Expand Down Expand Up @@ -413,15 +416,17 @@ def _items_slice(self, start, stop, direction, _ignore_inlined_contents=False):
next_page_url = f"{self.item['links']['search']}?page[offset]={start}"
item_counter = itertools.count(start)
while next_page_url is not None:
params = {
**self._queries_as_params,
**sorting_params,
}
if self._include_data_sources:
params["include_data_sources"] = True
content = handle_error(
self.context.http_client.get(
next_page_url,
headers={"Accept": MSGPACK_MIME_TYPE},
params={
"include_data_sources": self._include_data_sources,
**self._queries_as_params,
**sorting_params,
},
params=params,
)
).json()
self._cached_len = (
Expand Down
4 changes: 2 additions & 2 deletions tiled/client/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import dask
import dask.dataframe
import dask.dataframe.core

from ..serialization.table import deserialize_arrow, serialize_arrow
from ..utils import APACHE_ARROW_FILE_MIME_TYPE, UNCHANGED
Expand Down Expand Up @@ -162,7 +162,7 @@ def read(self, columns=None):

if columns is not None:
meta = meta[columns]
ddf = dask.dataframe.DataFrame(
ddf = dask.dataframe.core.DataFrame(
dask_tasks,
name=name,
meta=meta,
Expand Down
54 changes: 15 additions & 39 deletions tiled/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from . import schemas
from .etag import tokenize
from .links import links_for_node
from .utils import record_timing

del queries
Expand Down Expand Up @@ -404,6 +405,7 @@ async def construct_resource(
depth=0,
):
path_str = "/".join(path_parts)
id_ = path_parts[-1] if path_parts else ""
attributes = {"ancestors": path_parts[:-1]}
if include_data_sources and hasattr(entry, "data_sources"):
attributes["data_sources"] = entry.data_sources
Expand Down Expand Up @@ -488,15 +490,16 @@ async def construct_resource(
for key, direction in entry.sorting
]
d = {
"id": path_parts[-1] if path_parts else "",
"id": id_,
"attributes": schemas.NodeAttributes(**attributes),
}
if not omit_links:
d["links"] = {
"self": f"{base_url}/metadata/{path_str}",
"search": f"{base_url}/search/{path_str}",
"full": f"{base_url}/container/full/{path_str}",
}
d["links"] = links_for_node(
entry.structure_family,
entry.structure(),
base_url,
path_str,
)

resource = schemas.Resource[
schemas.NodeAttributes, schemas.ContainerLinks, schemas.ContainerMeta
Expand All @@ -510,34 +513,16 @@ async def construct_resource(
entry.structure_family
]
links.update(
{
link: template.format(base_url=base_url, path=path_str)
for link, template in FULL_LINKS[entry.structure_family].items()
}
links_for_node(
entry.structure_family,
entry.structure(),
base_url,
path_str,
)
)
structure = asdict(entry.structure())
if schemas.EntryFields.structure_family in fields:
attributes["structure_family"] = entry.structure_family
if entry.structure_family == StructureFamily.sparse:
shape = structure.get("shape")
block_template = ",".join(f"{{{index}}}" for index in range(len(shape)))
links[
"block"
] = f"{base_url}/array/block/{path_str}?block={block_template}"
elif entry.structure_family == StructureFamily.array:
shape = structure.get("shape")
block_template = ",".join(
f"{{index_{index}}}" for index in range(len(shape))
)
links[
"block"
] = f"{base_url}/array/block/{path_str}?block={block_template}"
elif entry.structure_family == StructureFamily.table:
links[
"partition"
] = f"{base_url}/table/partition/{path_str}?partition={{index}}"
elif entry.structure_family == StructureFamily.awkward:
links["buffers"] = f"{base_url}/awkward/buffers/{path_str}"
if schemas.EntryFields.structure in fields:
attributes["structure"] = structure
else:
Expand Down Expand Up @@ -719,15 +704,6 @@ class WrongTypeForRoute(Exception):
pass


FULL_LINKS = {
StructureFamily.array: {"full": "{base_url}/array/full/{path}"},
StructureFamily.awkward: {"full": "{base_url}/awkward/full/{path}"},
StructureFamily.container: {"full": "{base_url}/container/full/{path}"},
StructureFamily.table: {"full": "{base_url}/table/full/{path}"},
StructureFamily.sparse: {"full": "{base_url}/array/full/{path}"},
}


def asdict(dc):
"Compat for converting dataclass or pydantic.BaseModel to dict."
if dc is None:
Expand Down
16 changes: 14 additions & 2 deletions tiled/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_root_tree():
)


def SecureEntry(scopes):
def SecureEntry(scopes, structure_families=None):
async def inner(
path: str,
request: Request,
Expand Down Expand Up @@ -116,7 +116,19 @@ async def inner(
)
except NoEntry:
raise HTTPException(status_code=404, detail=f"No such entry: {path_parts}")
return entry
# Fast path for the common successful case
if (structure_families is None) or (
entry.structure_family in structure_families
):
return entry
raise HTTPException(
status_code=404,
detail=(
f"The node at {path} has structure family {entry.structure_family} "
"and this endpoint is compatible with structure families "
f"{structure_families}"
),
)

return Security(inner, scopes=scopes)

Expand Down
53 changes: 53 additions & 0 deletions tiled/server/links.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Generate the 'links' section of the response JSON.
The links vary by structure family.
"""
from ..structures.core import StructureFamily


def links_for_node(structure_family, structure, base_url, path_str):
links = {}
links = LINKS_BY_STRUCTURE_FAMILY[structure_family](
structure_family, structure, base_url, path_str
)
links["self"] = f"{base_url}/metadata/{path_str}"
return links


def links_for_array(structure_family, structure, base_url, path_str):
links = {}
block_template = ",".join(f"{{{index}}}" for index in range(len(structure.shape)))
links["block"] = f"{base_url}/array/block/{path_str}?block={block_template}"
links["full"] = f"{base_url}/array/full/{path_str}"
return links


def links_for_awkward(structure_family, structure, base_url, path_str):
links = {}
links["buffers"] = f"{base_url}/awkward/buffers/{path_str}"
links["full"] = f"{base_url}/awkward/full/{path_str}"
return links


def links_for_container(structure_family, structure, base_url, path_str):
links = {}
links["full"] = f"{base_url}/container/full/{path_str}"
links["search"] = f"{base_url}/search/{path_str}"
return links


def links_for_table(structure_family, structure, base_url, path_str):
links = {}
links["partition"] = f"{base_url}/table/partition/{path_str}?partition={{index}}"
links["full"] = f"{base_url}/table/full/{path_str}"
return links


LINKS_BY_STRUCTURE_FAMILY = {
StructureFamily.array: links_for_array,
StructureFamily.awkward: links_for_awkward,
StructureFamily.container: links_for_container,
StructureFamily.sparse: links_for_array, # spare and array are the same
StructureFamily.table: links_for_table,
}
Loading

0 comments on commit 7c875b8

Please sign in to comment.