Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add union structure family #668

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft

Conversation

danielballan
Copy link
Member

@danielballan danielballan commented Feb 26, 2024

This builds on commits from #661 and should be merged after it. [Update: #661 is in, and this has been rebased.]

Problem statement

This PR is designed to solve the same problem that the pandas BlockManager12 solves: presenting data in a flat namespace to the user, but enabling groups of items in that namespace to be transparently backed by shared data structures, for better performance.

For example, data coming from Bluesky includes data stored directly in the Event documents and large data written externally by detectors as arrays. The data in the Event documents is a great fit for tabular storage and transfer formats (e.g. Feather, Parquet, even CSV...). The externally-written data is not; it is better stored and transferred in large N-dimensional array formats like Zarr, HDF5, or a simple C-ordered buffer.

Users focused on science would like to smooth over these details. That is, we want to store and (often) move the data like this:

data
├── table
│   ├── motor_readback
│   ├── motor_setpoint
├── image_array

But offer a way to model it to the user in a flat namespace:

data
├── motor_readback
├── motor_setpoint
├── image_array

When writing (especially appending) the client will want to use the former view, so both views need to be available.

Solution

This PR adds a new structure family, union. The name is inspired by AwkwardArray UnionForm. It holds a heterogenous mixture of structures (e.g. tables and arrays). It enables the columns of the table and the arrays to be explored from a flat namespace. Name collisions are forbidden. But it also describes the underlying structures individually, enabling them to be read or written separately.

To the user, this behaves much like a Container structure, would:

In [2]: c['x']
Out[2]: <UnionClient {'A', 'B', 'C'}>

I can, for example, access fields by key and download data:

In [3]: c['x']['A']
Out[3]: <ArrayClient shape=(3,) chunks=((3,),) dtype=int64>

In [4]: c['x']['A'][:]
Out[4]: array([1, 2, 3])

In [5]: c['x']['C']
Out[5]: <ArrayClient shape=(5, 5) chunks=((5,), (5,)) dtype=float64>

In [6]: c['x']['C'][:]
Out[6]: 
array([[1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.]])

Digging a little deeper, we can see a difference from Containers. The union shows that A and B are backed by a table (coincidentally named "table", could be anything) while C is standalone array.

In [8]: c['x'].contents  # The name `contents` is up for discussion...
Out[8]: <UnionContents {'table', 'C'}>

In [9]: c['x'].contents['table']
Out[9]: <DataFrameClient ['A', 'B']>

In [10]: c['x'].contents['C']
Out[10]: <ArrayClient shape=(5, 5) chunks=((5,), (5,)) dtype=float64>

In [11]: c['x'].contents['table'].read()
Out[11]: 
   A  B
0  1  4
1  2  5
2  3  6

In [12]: c['x'].contents['C'].read()
Out[12]: 
array([[1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.],
       [1., 1., 1., 1., 1.]])

The structure of the union node reveals more detail; expand to view:

In [16]: from dataclasses import asdict

In [17]: asdict(c['x'].structure())
Out[17]: 
{'contents': [{'data_source_id': 1,
   'structure_family': 'table',
   'structure': {'arrow_schema': 'data:application/vnd.apache.arrow.file;base64,/////+gCAAAQAAAAAAAKAA4ABgAFAAgACgAAAAABBAAQAAAAAAAKAAwAAAAEAAgACgAAAEACAAAEAAAAAQAAAAwAAAAIAAwABAAIAAgAAAAIAAAAEAAAAAYAAABwYW5kYXMAAAkCAAB7ImluZGV4X2NvbHVtbnMiOiBbeyJraW5kIjogInJhbmdlIiwgIm5hbWUiOiBudWxsLCAic3RhcnQiOiAwLCAic3RvcCI6IDMsICJzdGVwIjogMX1dLCAiY29sdW1uX2luZGV4ZXMiOiBbeyJuYW1lIjogbnVsbCwgImZpZWxkX25hbWUiOiBudWxsLCAicGFuZGFzX3R5cGUiOiAidW5pY29kZSIsICJudW1weV90eXBlIjogIm9iamVjdCIsICJtZXRhZGF0YSI6IHsiZW5jb2RpbmciOiAiVVRGLTgifX1dLCAiY29sdW1ucyI6IFt7Im5hbWUiOiAiQSIsICJmaWVsZF9uYW1lIjogIkEiLCAicGFuZGFzX3R5cGUiOiAiaW50NjQiLCAibnVtcHlfdHlwZSI6ICJpbnQ2NCIsICJtZXRhZGF0YSI6IG51bGx9LCB7Im5hbWUiOiAiQiIsICJmaWVsZF9uYW1lIjogIkIiLCAicGFuZGFzX3R5cGUiOiAiaW50NjQiLCAibnVtcHlfdHlwZSI6ICJpbnQ2NCIsICJtZXRhZGF0YSI6IG51bGx9XSwgImNyZWF0b3IiOiB7ImxpYnJhcnkiOiAicHlhcnJvdyIsICJ2ZXJzaW9uIjogIjE0LjAuMiJ9LCAicGFuZGFzX3ZlcnNpb24iOiAiMi4wLjMifQAAAAIAAABEAAAABAAAANT///8AAAECEAAAABQAAAAEAAAAAAAAAAEAAABCAAAAxP///wAAAAFAAAAAEAAUAAgABgAHAAwAAAAQABAAAAAAAAECEAAAABwAAAAEAAAAAAAAAAEAAABBAAAACAAMAAgABwAIAAAAAAAAAUAAAAA=',
    'npartitions': 1,
    'columns': ['A', 'B'],
    'resizable': False},
   'name': 'table'},
  {'data_source_id': 2,
   'structure_family': 'array',
   'structure': {'data_type': {'endianness': 'little',
     'kind': 'f',
     'itemsize': 8},
    'chunks': [[5], [5]],
    'shape': [5, 5],
    'dims': None,
    'resizable': False},
   'name': 'C'}],
 'all_keys': ['A', 'B', 'C']}

Unlike container, the union structure always describes its full contents inline. It does not support paginating through its contents. It is not designed to scale beyond ~1000 fields.

This script shows how the union was constructed. Code like this will rarely be user-facing; envision it wrapped in a utility that consumes Bluesky documents and writes and registers the relevant data into Tiled.

import numpy
import pandas

from tiled.client import from_profile
from tiled.structures.array import ArrayStructure
from tiled.structures.data_source import DataSource
from tiled.structures.table import TableStructure

c = from_profile("local", api_key="secret")

df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
arr = numpy.ones((5, 5))

s1 = TableStructure.from_pandas(df)
s2 = ArrayStructure.from_array(arr)
x = c.create_union(
    [
        DataSource(structure_family="table", structure=s1, name="table"),
        DataSource(structure_family="array", structure=s2, name="C"),
    ],
    key="x",
)
x.contents["table"].write(df)
x.contents["C"].write(arr)

The requests look like:

INFO:     127.0.0.1:59404 - "POST /api/v1/metadata/ HTTP/1.1" 200 OK
INFO:     127.0.0.1:59404 - "PUT /api/v1/table/full/x?data_source=table HTTP/1.1" 200 OK
INFO:     127.0.0.1:59404 - "PUT /api/v1/array/full/x?data_source=C HTTP/1.1" 200 OK

The query parameter ?data_source={name} is used to address a specific component backing the node.

Review of abstraction levels

  1. Everything is just a node and we blissfully ignore anything about data sources.
c['x']['A']
  1. We look at data source names denoting how fields are grouped in the underlying storage, but we still ignore everything about storage formats and other storage details.
c['x'].contents
c['x'].structure()
  1. We look at low-level storage details, encoded in DataSource and Asset.
c['x'].data_sources()

To Do

  • Implement read() on UnionClient (i.e. c['x']) itself, which could pull each data source in turn and return an xarray.Dataset.
  • Implement GET /union/full/{path} to enable bulk download. This will work similar to container export.

Footnotes

  1. https://uwekorn.com/2020/05/24/the-one-pandas-internal.html

  2. https://wesmckinney.com/blog/a-roadmap-for-rich-scientific-data-structures-in-python/

@padraic-shafer
Copy link
Contributor

I like the features of this PR.

I wonder whether we can consolidate some naming consistency around the behaviors of:

  • contents: c['x'].contents returns the names of the Union's data_sources
  • data_sources(): c['x'].data_sources() returns file-level details of the data_sources backing the Union's members
  • data_source: URL parameter containing the name of a Union member data_source, as in PUT /api/v1/table/full/x?data_source=table

On the other hand, this might already be as simple as it gets, and I just need a minute to get comfortable with the usage. :)

@padraic-shafer
Copy link
Contributor

  • contents: c['x'].contents returns the names of the Union's data_sources

This is perhaps a bit too reductive a statement, as really this contains structure and named sources and can iterate through the members.

@padraic-shafer
Copy link
Contributor

Even though the contents map 1:1 to a data_source, it might be cleaner to not use them interchangeably. I will refer to the contents here as parts for brevity and to avoid possible confusion with Container. I'm not attached to the name parts.

Then the above could be used something like this...maybe?

client['x'].parts   # <UnionParts {'table', 'C'}>
client['x'].data_sources()   # low-level storage details, encoded in DataSource and Asset
client['x']['C'].data_sources()   # one-element list, or would data_source() be better?

PUT /api/v1/array/full/x?part=C ...BODY  # Refer to the union member, rather than its data source

@danielballan
Copy link
Member Author

danielballan commented Feb 26, 2024

I like that suggestion very well, and I like the name part.

In the future (months away) I hazily foresee enabling Tiled to track multiple versions of data:

  • replicas stored "on prem" or in the cloud
  • copies with different file formats and/or chunking to make a range of use cases fast

This is why data_sources() is a one-element list, in anticipation of there being more than one someday, and wanting to leave room for that.

But, this also underlines why separate part and data_source could be important: they happen to be 1:1 today but may not always be.

@danielballan
Copy link
Member Author

I also like that giving a distinct name to this concept helps clarify which abstraction level you are operating at. Referring to a part moves you from (1) to (2) but until you mention a data_source you have not crossed into (3).

@danielballan
Copy link
Member Author

danielballan commented Feb 27, 2024

Rebased on main after merging #661. The renaming of data_source to part, in the places discussed has been done.

The to-dos...

Implement read() on UnionClient (i.e. c['x']) itself, which could pull each data source in turn and return an xarray.Dataset.
Implement GET /union/full/{path} to enable bulk download. This will work similar to container export.

I would at least like the validate this branch by connecting it to be a Bluesky document stream before merging.

are, I believe, strictly additive and could be done in separate PRs or in this PR.

Copy link
Contributor

@padraic-shafer padraic-shafer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be very useful.

I've noted a few comments/questions. Additionally, should this 404 in PUT /awkward/full be handled by passing StructureFamily.awkward to SecureEntry, as you've updated for the other routes?

tiled/tiled/server/router.py

Lines 1328 to 1338 in a407021

@router.put("/awkward/full/{path:path}")
async def put_awkward_full(
request: Request,
entry=SecureEntry(scopes=["write:data"]),
deserialization_registry=Depends(get_deserialization_registry),
):
body = await request.body()
if entry.structure_family != StructureFamily.awkward:
raise HTTPException(
status_code=404, detail="This route is not applicable to this node."
)

tiled/catalog/adapter.py Outdated Show resolved Hide resolved
tiled/catalog/adapter.py Show resolved Hide resolved
tiled/catalog/adapter.py Show resolved Hide resolved
tiled/server/links.py Show resolved Hide resolved
tiled/server/links.py Outdated Show resolved Hide resolved
Copy link
Contributor

@dylanmcreynolds dylanmcreynolds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a random thought on naming. What you're building sounds very close to a view in SQL terminology...something that acts like a flat table but is backed by querying a subset of fields one or more joined tables.

Copy link
Contributor

@dylanmcreynolds dylanmcreynolds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to expand on that thought...would it be useful to have a view rather than a union? If I know ahead of time that there will be cases during data analysis that will require 100 of the 1000 fields avaialble, maybe I could define the view with just those fields, and when pulling them out of tiled avoid marshalling the 900 unused fields.

@danielballan
Copy link
Member Author

Rethinking this in terms of a "view" is very interesting. Off the top of my head, I like that:

  • A "view" is a widely-recognized concept and may require less explanation.
  • This seems like it might address a separate issue noticed by @genematx, that in the current data model data and timestamps are allowed to be uneven in length. (And, in fact, in the middle of an update, they always will be.) If there were one table with data and timestamps "views" into subsets of it, that would fix the problem.

@danielballan
Copy link
Member Author

For example, maybe this is what a event stream could look like. Notice that:

  • The layout and URLs are backward compatible with what we have been doing. It simply adds a new key, __name_me__ (needs a good name...).
  • It ensures that data and timestamps are the same length because they are views on the same table.
  • It exposes the "real" structures to the client, but gives a flattened view of it too.
primary
├── data  # view
│   ├── time
│   ├── motor_readback
│   ├── motor_setpoint
│   └── image_array
├── timestamps  # view
│   ├── time
│   ├── motor_readback
│   ├── motor_setpoint
│   └── image_array
├── __name_me__
│   ├── event_table  # values stream inline in Event documents
│   │   ├── time
│   │   ├── data_motor_readback
│   │   ├── data_motor_setpoint
│   │   ├── timestamps_motor_readback
│   │   └──  timestamps_motor_setpoint
│   └── image_array  # externally-written array data

@danielballan
Copy link
Member Author

Either path we take, union or view, would be speculative. We have to add this and really try it to understand how it flies. There is some risk either way.

View, in addition to being a more widely recognized concept, could solve a broader category of problems for us. BlueskyRun is very nested, and this can mean a lot of clicks in the UI. I can imagine a view (or views) of the good parts, flat. Views could be combined with containers to create a nested-but-not-THAT-nested structure. (Dare I call it a "projection"?)

I think we should keep the specification light and focused on current requirements, but I can see a lot of useful scope in this direction. Maybe I’ll start by opening a parallel PR for comparison that builds on this branch but refactors union into view.

@danielballan
Copy link
Member Author

This is why we keep @dylanmcreynolds around. :-)

@dylanmcreynolds
Copy link
Contributor

Could views be a better place to put Databroker's projections and projectors? To sum up, projection are a way add a mapping to the start document. projectors are a python function that take a run, its projection and returns an xarray with datafields mapped as specified in the projection. The major idea was to have a simple way to create multiple views from the same run. One projection could be customized for a user interface, another could be customized for a particular ontology (like nexus).

If we took this view idea even further, the definition of the view could also include information about mapping to a desired ontology.

if (catalog_adapter.structure_family == StructureFamily.union) and len(
segments[i:]
) == 1:
# All the segments but the final segment, segments[-1], resolves
Copy link
Contributor

@padraic-shafer padraic-shafer Mar 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# All the segments but the final segment, segments[-1], resolves
# All segments except the final segment, segments[-1],

@padraic-shafer
Copy link
Contributor

I think we should keep the specification light and focused on current requirements,

That sounds prudent.

... but I can see a lot of useful scope in this direction.

Do you envision that views might evolve to include keys from other nodes (ancestors, siblings, 2nd-cousin-once-removed), or is that something that should be firmly disallowed? I can imagine complications arising from access policy as well as latency/timeouts from trying to include too many keys.

@danielballan
Copy link
Member Author

danielballan commented Mar 3, 2024

I think there will be significant pressure to enable views that reach anywhere across the tree:

  • Mix raw data and analyzed
  • Experiment with alternative views outside of the main tree without making it "noisy"
  • Probably more….

The specification of a union node involves listing data sources directly. If the specification of a view involves instead referencing other nodes, I think that access control is manageable, and the scaling can be managed if we enforce reasonable limits on the number of nodes allowed in one view.

@padraic-shafer
Copy link
Contributor

padraic-shafer commented Mar 3, 2024

It ensures that data and timestamps are the same length because they are views on the same table.

More generally, would merged views only work if all parts have the same length (number of rows)?...and if so would it enforce that by:

  • rejecting data sources that don't meet that condition? --OR--
  • returning a table with number of rows equals to the shortest part (like python's zip())? --OR--
  • returning a table with number of rows equals to the longest part (like python's itertools.zip_longest())? Filler value could be None, numpy.nan, "", or probably a user-supplied value --OR--
  • any of the above, depending on a query parameter passed by the caller?

Or should views require a key to join upon, using merge behavior such as LEFT OUTER JOIN, RIGHT OUTER JOIN, FULL OUTER JOIN, INNER JOIN?

@padraic-shafer
Copy link
Contributor

Could views be a better place to put Databroker's projections and projectors?

So rather than injecting that info into the run documents, you're suggesting to instead let tiled handle that when the data gets accessed? That makes a lot of sense.

Run documents would be more "pure"--less coupled to how someone thought they should be viewed when they were recorded. When new views get dreamed up, they could be added to the tiled server config--restarting the server (or maybe registering the new view with the catalog) would allow that view to be applied to all data new and old.

@dylanmcreynolds
Copy link
Contributor

So rather than injecting that info into the run documents, you're suggesting to instead let tiled handle that when the data gets accessed? That makes a lot of sense.

Maybe. The projections schema was added to the run start document so that they could be the default projection for a particular run. If a newer version were available, the projector code could use it if asked to.

But I only know of one case where projection/projectors were used since they were developed four years ago. Maybe that's a sign? Perhaps the issue is they weren't needed much, perhaps they weren't advertised well, or perhaps the mechanism was too complicated.

I think there will be significant pressure to enable views that reach anywhere across the tree.

That's an interesting thought. I feel like that if it kept the scope in check, I'd be happy to say that a view was limited to objects of the same row/timestamp. We could call it RowView and if we decided we need something more flexible in the future, come up with a SuperView with extra powers?

@danielballan
Copy link
Member Author

danielballan commented Mar 4, 2024

To fit our present use case, we would need a view to look a lot like union, mixing tables and arrays in one namespace. There would be [edit: NO] special guarantees about the relation between the items in the namespace, nothing about their length or how to join them. (It goes without saying that the constitutive tables would each internally have the normal length guarantee that it is made of whole rows.)

I think the change from union is, the parts in a view would have their own canonical locations in the tree, as normal nodes. A view becomes an additional place to get (mixtures of…) nodes. Each views look a lot like a union would have, but their parts are pointers to first-class nodes, not to captive data sources. This enables us to separately build multiple views on the same data. And it avoids placing the canonical data in a weird structure that would require explanation (union).


As far as "projections" goes, I like that this presents clients with the result rather than instructions (in a novel descriptive language…) for rearranging a structure on the client side.

Yes, one can imagine constructing views dynamically through special adapters added to the server config—I think this is what @padraic-shafer’s last message envisions. For our present requirements I would start, though, by building them as static entities. The client specifies, via a HTTP request, "Add a view node to the database that combines array node X and columns A, B from table node Y into one namespace and present it to clients.

@padraic-shafer
Copy link
Contributor

The client specifies, via a HTTP request, "Add a view node to the database that combines array node X and columns A, B from table node Y into one namespace and present it to clients.

I think this Union PR didn't yet add the capability to read the combined result at once, right? So what do we think should happen when the outer dimension of array X differs from the length of table Y? (or equivalently when event_table has more rows than image_array has images in the earlier example?)

I might be quibbling about edge cases. But I wonder about how 'edge'y these cases are. We could of course enforce this when the view node (meaning the outer node, not the projection node) is created, and then wait to see if it runs into issues during testing.

@danielballan
Copy link
Member Author

That's an interesting thought. I feel like that if it kept the scope in check, I'd be happy to say that a view was limited to objects of the same row/timestamp. We could call it RowView and if we decided we need something more flexible in the future, come up with a SuperView with extra powers?

For our present requirements we can keep this pretty limited, and spin out further discussion on whether and how to expand it once we have something to play with.

danielballan added a commit to danielballan/tiled that referenced this pull request Mar 12, 2024
danielballan added a commit that referenced this pull request Mar 12, 2024
* Sort enum members.

* Move structure_family to the end, matching migration result.

* Sort

* Only set include_data_sources param if not default (false).

* Refactor link-writing into separate module.

* Refactor structure family check into dependency

* remove server.core.FULL_LINKS

* Clarify default data_sources for CatalogNodeAdapter

* Add structure_families to dependency check for put '/[node|table]/full'

* Create node uses structure_family of data_source

* Be robust against __doc__ being None.

* Use dask.dataframe.core, not new dask-expr.

---------

Co-authored-by: Padraic Shafer <[email protected]>
hyperrealist pushed a commit to hyperrealist/tiled that referenced this pull request Mar 18, 2024
* Sort enum members.

* Move structure_family to the end, matching migration result.

* Sort

* Only set include_data_sources param if not default (false).

* Refactor link-writing into separate module.

* Refactor structure family check into dependency

* remove server.core.FULL_LINKS

* Clarify default data_sources for CatalogNodeAdapter

* Add structure_families to dependency check for put '/[node|table]/full'

* Create node uses structure_family of data_source

* Be robust against __doc__ being None.

* Use dask.dataframe.core, not new dask-expr.

---------

Co-authored-by: Padraic Shafer <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants