diff --git a/CHANGELOG.md b/CHANGELOG.md index c5ee12597..64c110d7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ Write the date in place of the "Unreleased" in the case a new version is release ## Unreleased +### Added + +- A script to remove the 'streams' node as part of the updating BlueskyRun catalogs. + ### Fixed - Column names in `TableStructure` are explicitly converted to strings. diff --git a/scripts/remove-streams/README.md b/scripts/remove-streams/README.md new file mode 100644 index 000000000..7352f1d97 --- /dev/null +++ b/scripts/remove-streams/README.md @@ -0,0 +1,127 @@ +The script in `main.sql` is designed to migrate a catalog of Bluesky runs created with TiledWriter of versions +prior to 1.14.5 to the more recent structure. Specifically, the dedicated 'streams' namespace (container) has been +abolished in favor of placing the individual streams containers (e.g. 'primary') directly under the BlueskyRun +root node. + +The scripts identifies all nodes named 'streams' that are direct descendant of a node with `BlueskyRun` spec and +that have children of `BlueskyEventStream` specs (or no children at all). The children of the 'streams' node are +moved in the hierarchy one level up and the emptied 'streams' node are deleted. + +One can test the script as follows. + +1. Start local PG instance with podman. + +```sh +podman run --rm --name tiled-test-postgres -p 5432:5432 -e POSTGRES_PASSWORD=secret -d docker.io/postgres:16 +``` + +2. Initialize and empty catalog and populate it with example data. + +```sh +./setup.sh +``` + +3. Run the processing script, grafting the children of 'streams' nodes onto their parents. + +```sh +psql postgresql://postgres:secret@localhost:5432/dst -f main.sql +``` + +Given the original database: + +``` +catalog=# select id, key, parent,specs from nodes; + id | key | parent | specs +----+---------+--------+-------------------------------------------------------------------------------------------- + 0 | | | [] + 1 | runs | 0 | [{"name": "CatalogOfBlueskyRuns", "version": "3.0"}] + 2 | run_1 | 1 | [{"name": "BlueskyRun", "version": "3.0"}] + 3 | streams | 2 | [] + 4 | primary | 3 | [{"name": "BlueskyEventStream", "version": "3.0"}, {"name": "composite", "version": null}] + 5 | arr1 | 4 | [] + 6 | arr2 | 4 | [] +(7 rows) + +catalog=# select * from nodes_closure; + ancestor | descendant | depth +----------+------------+------- + 0 | 0 | 0 + 1 | 1 | 0 + 0 | 1 | 1 + 2 | 2 | 0 + 1 | 2 | 1 + 0 | 2 | 2 + 3 | 3 | 0 + 2 | 3 | 1 + 1 | 3 | 2 + 0 | 3 | 3 + 4 | 4 | 0 + 3 | 4 | 1 + 2 | 4 | 2 + 1 | 4 | 3 + 0 | 4 | 4 + 5 | 5 | 0 + 4 | 5 | 1 + 3 | 5 | 2 + 2 | 5 | 3 + 1 | 5 | 4 + 0 | 5 | 5 + 6 | 6 | 0 + 4 | 6 | 1 + 3 | 6 | 2 + 2 | 6 | 3 + 1 | 6 | 4 + 0 | 6 | 5 +(27 rows) +``` + +The resulting nodes and closure table should look like this: +``` +catalog=# select id, key, parent,specs from nodes; + id | key | parent | specs +----+---------+--------+-------------------------------------------------------------------------------------------- + 0 | | | [] + 1 | runs | 0 | [{"name": "CatalogOfBlueskyRuns", "version": "3.0"}] + 2 | run_1 | 1 | [{"name": "BlueskyRun", "version": "3.0"}] + 5 | arr1 | 4 | [] + 6 | arr2 | 4 | [] + 4 | primary | 2 | [{"name": "BlueskyEventStream", "version": "3.0"}, {"name": "composite", "version": null}] +(6 rows) + +catalog=# select * from nodes_closure; + ancestor | descendant | depth +----------+------------+------- + 0 | 0 | 0 + 1 | 1 | 0 + 0 | 1 | 1 + 2 | 2 | 0 + 1 | 2 | 1 + 0 | 2 | 2 + 4 | 4 | 0 + 5 | 5 | 0 + 4 | 5 | 1 + 6 | 6 | 0 + 4 | 6 | 1 + 0 | 5 | 4 + 1 | 5 | 3 + 2 | 5 | 2 + 0 | 4 | 3 + 1 | 4 | 2 + 2 | 4 | 1 + 0 | 6 | 4 + 1 | 6 | 3 + 2 | 6 | 2 +(20 rows) +``` + +4. Review; try to read and write new data. + +```bash +pixi run python review.py +``` + +5. Clean up. + +```sh +./clean.sh +``` diff --git a/scripts/remove-streams/clean.sh b/scripts/remove-streams/clean.sh new file mode 100755 index 000000000..27ba00836 --- /dev/null +++ b/scripts/remove-streams/clean.sh @@ -0,0 +1,4 @@ +POSTGRESQL_URI=postgresql://postgres:secret@localhost:5432 +CONTAINER_ID=$(docker ps --format "{{.ID}} {{.Image}}" | grep postgres | awk '{print $1}' | head -n1) +docker exec -i ${CONTAINER_ID} psql ${POSTGRESQL_URI} -U postgres < clean.sql +rm -rf /tmp/tiled-catalog-data diff --git a/scripts/remove-streams/clean.sql b/scripts/remove-streams/clean.sql new file mode 100644 index 000000000..2437a3c16 --- /dev/null +++ b/scripts/remove-streams/clean.sql @@ -0,0 +1,2 @@ +drop database catalog; +drop database storage; diff --git a/scripts/remove-streams/main.sql b/scripts/remove-streams/main.sql new file mode 100644 index 000000000..ca0a23eea --- /dev/null +++ b/scripts/remove-streams/main.sql @@ -0,0 +1,156 @@ +-- Tree Migration Script: Remove 'streams' namespaced nodes from BlueskyRuns + +-- Move children of every node named 'streams' up to the parent of that 'streams' node, +-- update closure table accordingly, and then delete the 'streams' nodes. +-- Run on PostgreSQL. Test on a copy first. + +BEGIN; + +-- ============================================================================= +-- (1) Identify the 'streams' nodes that belong to BlueskyRun parents and whose children are all BlueskyEventStream +-- ============================================================================= + +WITH candidate_streams AS ( + SELECT + s.id AS stream_id, + s.parent AS parent_id, + p.id AS parent_node_id, + p.specs AS parent_specs + FROM nodes s + JOIN nodes p ON s.parent = p.id + WHERE s.key = 'streams' + AND EXISTS ( + SELECT 1 + FROM jsonb_array_elements(p.specs) AS spec + WHERE spec->>'name' = 'BlueskyRun' + ) +), +streams_with_good_children AS ( + SELECT + cs.stream_id AS stream_id, + cs.parent_id AS parent_id + FROM candidate_streams cs + WHERE NOT EXISTS ( + SELECT 1 + FROM nodes c + WHERE c.parent = cs.stream_id + AND NOT EXISTS ( + SELECT 1 + FROM jsonb_array_elements(c.specs) AS spec + WHERE spec->>'name' = 'BlueskyEventStream' + ) + ) +) +-- From here on, use only these filtered 'streams' nodes. +-- Store them into a temp table for re-use in multiple statements. +SELECT * INTO TEMP TABLE selected_streams FROM streams_with_good_children; + +-- Debugging check: see which streams we’re about to process (Optional; comment out for production) +DO $$ +BEGIN + RAISE NOTICE 'Number of selected ''streams'' nodes to be removed: %', + (SELECT COUNT(*) FROM selected_streams); +END$$; + + +-- ============================================================ +-- (1b) Rename the selected 'streams' nodes to avoid transient key conflicts. +-- ============================================================ + +UPDATE nodes +SET key = '_streams_to_be_deleted' +WHERE id IN (SELECT stream_id FROM selected_streams); + + +-- ============================================================================= +-- (2) Conflict detection — prevent name collisions under new parents. +-- ============================================================================= + +DO $$ +DECLARE + conflict RECORD; +BEGIN + FOR conflict IN + SELECT c.id AS child_id, c.key AS child_key, s.parent_id + FROM selected_streams s + JOIN nodes c ON c.parent = s.stream_id + JOIN nodes existing + ON existing.parent IS NOT DISTINCT FROM s.parent_id + AND existing.key = c.key + AND existing.id <> c.id + LOOP + RAISE EXCEPTION + 'Aborting: moving child "%" (id=%) would conflict with existing node of same key under parent id=%', + conflict.child_key, conflict.child_id, conflict.parent_id; + END LOOP; + RAISE NOTICE 'No name conflicts detected. Proceeding with migration.'; +END$$; + + +-- ==================================================================== +-- 3) Reparent children: set each child.parent = stream.parent +-- ==================================================================== + +DO $$ +DECLARE moved_count integer; +BEGIN + UPDATE nodes n + SET parent = s.parent_id + FROM selected_streams s + WHERE n.parent = s.stream_id; + + GET DIAGNOSTICS moved_count = ROW_COUNT; + RAISE NOTICE 'Reparented % node(s) (children of selected streams).', moved_count; +END$$; + +-- ==================================================================== +-- 4) Build the set of all descendants of the streams nodes (including the stream node itself). +-- We'll use this set to adjust closure rows. +-- ==================================================================== + +CREATE TEMP TABLE descendants_to_move AS +SELECT DISTINCT c.descendant +FROM selected_streams s +JOIN nodes_closure c ON c.ancestor = s.stream_id; + +DO $$ +DECLARE dcount integer; +BEGIN + SELECT COUNT(*) INTO dcount FROM descendants_to_move; + RAISE NOTICE 'Total distinct descendant node(s) to move = %.', dcount; +END$$; + +-- ==================================================================== +-- 5) Adjust the closure table: for any (ancestor, descendant) where +-- descendant IN the set of descendants AND ancestor NOT IN the +-- set of descendants, reduce depth by 1. +-- ==================================================================== +DO $$ +DECLARE adjust_count integer; +BEGIN + UPDATE nodes_closure nc + SET depth = nc.depth - 1 + FROM descendants_to_move d + WHERE nc.descendant = d.descendant + AND nc.ancestor NOT IN (SELECT descendant FROM descendants_to_move) + AND nc.depth > 0; -- defensive: only decrement positive depths + + GET DIAGNOSTICS adjust_count = ROW_COUNT; + RAISE NOTICE 'Adjusted depth (decremented by 1) for % closure rows.', adjust_count; +END$$; + +-- ==================================================================== +-- 6) Delete the selected stream nodes. Their nodes_closure rows that +-- reference them will be removed by ON DELETE CASCADE on the FK. +-- ==================================================================== +DO $$ +DECLARE del_count integer; +BEGIN + DELETE FROM nodes + WHERE id IN (SELECT stream_id FROM selected_streams); + + GET DIAGNOSTICS del_count = ROW_COUNT; + RAISE NOTICE 'Deleted % stream node(s).', del_count; +END$$; + +COMMIT; diff --git a/scripts/remove-streams/pixi.toml b/scripts/remove-streams/pixi.toml new file mode 100644 index 000000000..f52b0c569 --- /dev/null +++ b/scripts/remove-streams/pixi.toml @@ -0,0 +1,23 @@ +[workspace] +authors = ["Daniel Allan & Eugene M "] +channels = ["conda-forge"] +name = "remove-streams" +platforms = ["osx-64", "linux-64"] +version = "0.1.0" + +[tasks] +# Clean up the storage location; proceed if no files are found +clean = {cmd = "/bin/rm -rf storage/* || true"} +# Start tiled server from a config file +serve = {cmd = "tiled serve config config.yml", depends-on=["clean"]} + +[dependencies] +python = ">=3.12,<3.13" +ipython = ">=9.4.0,<10" +ophyd = ">=1.10.7,<2" + +[pypi-dependencies] +sparse = ">=0.17.0, <0.18" +tiled = { version = ">=0.1.0, <0.2", extras = ["all"] } +bluesky-tiled-plugins = ">=2.0.0b68, <3" +bluesky = "==1.14.5,<2" diff --git a/scripts/remove-streams/populate.py b/scripts/remove-streams/populate.py new file mode 100644 index 000000000..8df04fada --- /dev/null +++ b/scripts/remove-streams/populate.py @@ -0,0 +1,131 @@ +import logging +import sys +from pathlib import Path + +import bluesky.plans as bp +import h5py +import numpy as np +from bluesky import RunEngine +from bluesky.callbacks.tiled_writer import TiledWriter +from ophyd.sim import det, hw + +from tiled.catalog import from_uri +from tiled.client import Context, from_context +from tiled.server.app import build_app +from tiled.structures.core import Spec, StructureFamily +from tiled.structures.data_source import Asset, DataSource, Management + +# Create and setup a logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +logger.addHandler(handler) + +# Initialize the catalog +catalog = from_uri( + "postgresql://postgres:secret@localhost:5432/catalog", + writable_storage={ + "filesystem": "file://localhost/tmp/tiled-catalog-data", + "sql": "postgresql://postgres:secret@localhost:5432/storage", + }, +) +logger.info(f"Initialized Tiled catalog {catalog}") + +# Create some external HDF5 files to reference +hdf5_data_sources = [] +for i in range(3): + file_path = Path(f"/tmp/tiled-catalog-data/test_{i}.h5") + with h5py.File(file_path, "w") as file: + z = file.create_group("z") + y = z.create_group("y") + y.create_dataset("x", data=np.array([1, 2, 3])) + asset = Asset( + data_uri=f"file://localhost/{file_path}", + is_directory=False, + parameter="data_uris", + num=0, + ) + data_source = DataSource( + mimetype="application/x-hdf5", + assets=[asset], + structure_family=StructureFamily.container, + structure=None, + parameters={"dataset": "z/y"}, + management=Management.external, + ) + hdf5_data_sources.append(data_source) + +# A simple example with a single 'streams' node to be deleted +with Context.from_app(build_app(catalog)) as context: + client = from_context(context) + primary = ( + client.create_container( + "runs", specs=[Spec("CatalogOfBlueskyRuns", version="3.0")] + ) + .create_container("run_1", specs=[Spec("BlueskyRun", version="3.0")]) + .create_container("streams") + .create_container( + "primary", + specs=[Spec("BlueskyEventStream", version="3.0"), Spec("composite")], + ) + ) + primary.write_array(np.random.randn(3, 4), key="arr1") + primary.write_array(np.random.randn(3, 4), key="arr2") + + +# Add more data +with Context.from_app(build_app(catalog)) as context: + RE = RunEngine() + client = from_context(context) + runs_node = client["runs"] + tw = TiledWriter(runs_node) + RE.subscribe(tw) + + # 1. Some data from Bluesky (only these 6 'streams nodes should be deleted') + for i in range(3): + logger.info(f"Starting iteration {i}") + # Internal Data Collection ##### + (uid,) = RE(bp.count([det], 3)) + + # External Data Collection ##### + Path("/tmp/tiled-catalog-data").mkdir(parents=True, exist_ok=True) + (uid,) = RE(bp.count([hw(save_path="/tmp/tiled-catalog-data").img], 3)) + + # 2. Add a stream node called "streams" -- should not be deleted + stream_called_streams = runs_node[uid]["streams"].create_container( + "streams", specs=[Spec("BlueskyEventStream", version="3.0")] + ) + stream_called_streams.write_array(np.random.randn(3, 4), key="arr1") + + # 3. Create a BlueskyRun with an empty streams node -- should be deleted + empty_run = runs_node.create_container( + "empty_run", specs=[Spec("BlueskyRun", version="3.0")] + ) + empty_run.create_container("streams") + + # 4. Create a BlueskyRun with no streams node containing an array -- should not be deleted + non_empty_run = runs_node.create_container( + "non_empty_run", specs=[Spec("BlueskyRun", version="3.0")] + ) + non_empty_run.create_container("streams").write_array( + np.random.randn(3, 4), key="arr1" + ) + + # 5. Some other hierarchical data -- should not be deleted + a = client.create_container("streams") + b = a.create_container("b") + c = b.create_container("streams") + d = c.write_array([1, 2, 3], key="d") + a.update_metadata({"color": "blue"}) + + # 6. External HDF5 files + a.new( + structure_family=StructureFamily.container, + data_sources=[hdf5_data_sources[0]], + key="streams", + ) + a.new( + structure_family=StructureFamily.container, + data_sources=[hdf5_data_sources[1]], + key="hdf5_1", + ) diff --git a/scripts/remove-streams/review.py b/scripts/remove-streams/review.py new file mode 100644 index 000000000..4a5fec3da --- /dev/null +++ b/scripts/remove-streams/review.py @@ -0,0 +1,50 @@ +import logging +import sys + +import bluesky.plans as bp +from bluesky import RunEngine +from bluesky.callbacks.tiled_writer import TiledWriter +from ophyd.sim import hw + +from tiled.catalog import from_uri +from tiled.client import Context, from_context +from tiled.server.app import build_app + +# Create and setup a logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +logger.addHandler(handler) + +catalog_dst = from_uri( + "postgresql://postgres:secret@localhost:5432/catalog", + writable_storage={ + "filesystem": "file://localhost/tmp/tiled-catalog-data", + "sql": "postgresql://postgres:secret@localhost:5432/storage", + }, +) +app = build_app(catalog_dst) + + +def recursve_read(client): + for name, child in client.items(): + logger.info(f"Reading node: {name}") + if child.structure_family == "container": + recursve_read(child) + else: + result = child.read() + logger.info(f"> {result}") + + +with Context.from_app(app) as context: + client = from_context(context) + recursve_read(client) + + # Write some data + RE = RunEngine() + tw = TiledWriter(client["runs"]) + RE.subscribe(tw) + (uid,) = RE(bp.count([hw(save_path="/tmp/tiled-catalog-data").img], 3)) + +context = Context.from_app(app) +client = from_context(context) diff --git a/scripts/remove-streams/run.sh b/scripts/remove-streams/run.sh new file mode 100755 index 000000000..607c95d8a --- /dev/null +++ b/scripts/remove-streams/run.sh @@ -0,0 +1,3 @@ +POSTGRESQL_URI=postgresql://postgres:secret@localhost:5432 +CONTAINER_ID=$(docker ps --format "{{.ID}} {{.Image}}" | grep postgres | awk '{print $1}' | head -n1) +docker exec -i ${CONTAINER_ID} psql ${POSTGRESQL_URI}/catalog -U postgres < main.sql diff --git a/scripts/remove-streams/setup.sh b/scripts/remove-streams/setup.sh new file mode 100755 index 000000000..d33599132 --- /dev/null +++ b/scripts/remove-streams/setup.sh @@ -0,0 +1,6 @@ +POSTGRESQL_URI=postgresql://postgres:secret@localhost:5432 +CONTAINER_ID=$(docker ps --format "{{.ID}} {{.Image}}" | grep postgres | awk '{print $1}' | head -n1) +docker exec -i ${CONTAINER_ID} psql ${POSTGRESQL_URI} -U postgres < setup.sql +pixi run tiled catalog init ${POSTGRESQL_URI}/catalog +mkdir /tmp/tiled-catalog-data +pixi run python populate.py diff --git a/scripts/remove-streams/setup.sql b/scripts/remove-streams/setup.sql new file mode 100644 index 000000000..4ab920dd1 --- /dev/null +++ b/scripts/remove-streams/setup.sql @@ -0,0 +1,2 @@ +CREATE DATABASE catalog; +CREATE DATABASE storage;