Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ Write the date in place of the "Unreleased" in the case a new version is release

## Unreleased

### Added

- A script to remove the 'streams' node as part of the updating BlueskyRun catalogs.

### Fixed

- Column names in `TableStructure` are explicitly converted to strings.
Expand Down
127 changes: 127 additions & 0 deletions scripts/remove-streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
The script in `main.sql` is designed to migrate a catalog of Bluesky runs created with TiledWriter of versions
prior to 1.14.5 to the more recent structure. Specifically, the dedicated 'streams' namespace (container) has been
abolished in favor of placing the individual streams containers (e.g. 'primary') directly under the BlueskyRun
root node.

The scripts identifies all nodes named 'streams' that are direct descendant of a node with `BlueskyRun` spec and
that have children of `BlueskyEventStream` specs (or no children at all). The children of the 'streams' node are
moved in the hierarchy one level up and the emptied 'streams' node are deleted.

One can test the script as follows.

1. Start local PG instance with podman.

```sh
podman run --rm --name tiled-test-postgres -p 5432:5432 -e POSTGRES_PASSWORD=secret -d docker.io/postgres:16
```

2. Initialize and empty catalog and populate it with example data.

```sh
./setup.sh
```

3. Run the processing script, grafting the children of 'streams' nodes onto their parents.

```sh
psql postgresql://postgres:secret@localhost:5432/dst -f main.sql
```

Given the original database:

```
catalog=# select id, key, parent,specs from nodes;
id | key | parent | specs
----+---------+--------+--------------------------------------------------------------------------------------------
0 | | | []
1 | runs | 0 | [{"name": "CatalogOfBlueskyRuns", "version": "3.0"}]
2 | run_1 | 1 | [{"name": "BlueskyRun", "version": "3.0"}]
3 | streams | 2 | []
4 | primary | 3 | [{"name": "BlueskyEventStream", "version": "3.0"}, {"name": "composite", "version": null}]
5 | arr1 | 4 | []
6 | arr2 | 4 | []
(7 rows)

catalog=# select * from nodes_closure;
ancestor | descendant | depth
----------+------------+-------
0 | 0 | 0
1 | 1 | 0
0 | 1 | 1
2 | 2 | 0
1 | 2 | 1
0 | 2 | 2
3 | 3 | 0
2 | 3 | 1
1 | 3 | 2
0 | 3 | 3
4 | 4 | 0
3 | 4 | 1
2 | 4 | 2
1 | 4 | 3
0 | 4 | 4
5 | 5 | 0
4 | 5 | 1
3 | 5 | 2
2 | 5 | 3
1 | 5 | 4
0 | 5 | 5
6 | 6 | 0
4 | 6 | 1
3 | 6 | 2
2 | 6 | 3
1 | 6 | 4
0 | 6 | 5
(27 rows)
```

The resulting nodes and closure table should look like this:
```
catalog=# select id, key, parent,specs from nodes;
id | key | parent | specs
----+---------+--------+--------------------------------------------------------------------------------------------
0 | | | []
1 | runs | 0 | [{"name": "CatalogOfBlueskyRuns", "version": "3.0"}]
2 | run_1 | 1 | [{"name": "BlueskyRun", "version": "3.0"}]
5 | arr1 | 4 | []
6 | arr2 | 4 | []
4 | primary | 2 | [{"name": "BlueskyEventStream", "version": "3.0"}, {"name": "composite", "version": null}]
(6 rows)

catalog=# select * from nodes_closure;
ancestor | descendant | depth
----------+------------+-------
0 | 0 | 0
1 | 1 | 0
0 | 1 | 1
2 | 2 | 0
1 | 2 | 1
0 | 2 | 2
4 | 4 | 0
5 | 5 | 0
4 | 5 | 1
6 | 6 | 0
4 | 6 | 1
0 | 5 | 4
1 | 5 | 3
2 | 5 | 2
0 | 4 | 3
1 | 4 | 2
2 | 4 | 1
0 | 6 | 4
1 | 6 | 3
2 | 6 | 2
(20 rows)
```

4. Review; try to read and write new data.

```bash
pixi run python review.py
```

5. Clean up.

```sh
./clean.sh
```
4 changes: 4 additions & 0 deletions scripts/remove-streams/clean.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POSTGRESQL_URI=postgresql://postgres:secret@localhost:5432
CONTAINER_ID=$(docker ps --format "{{.ID}} {{.Image}}" | grep postgres | awk '{print $1}' | head -n1)
docker exec -i ${CONTAINER_ID} psql ${POSTGRESQL_URI} -U postgres < clean.sql
rm -rf /tmp/tiled-catalog-data
2 changes: 2 additions & 0 deletions scripts/remove-streams/clean.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
drop database catalog;
drop database storage;
156 changes: 156 additions & 0 deletions scripts/remove-streams/main.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
-- Tree Migration Script: Remove 'streams' namespaced nodes from BlueskyRuns

-- Move children of every node named 'streams' up to the parent of that 'streams' node,
-- update closure table accordingly, and then delete the 'streams' nodes.
-- Run on PostgreSQL. Test on a copy first.

BEGIN;

-- =============================================================================
-- (1) Identify the 'streams' nodes that belong to BlueskyRun parents and whose children are all BlueskyEventStream
-- =============================================================================

WITH candidate_streams AS (
SELECT
s.id AS stream_id,
s.parent AS parent_id,
p.id AS parent_node_id,
p.specs AS parent_specs
FROM nodes s
JOIN nodes p ON s.parent = p.id
WHERE s.key = 'streams'
AND EXISTS (
SELECT 1
FROM jsonb_array_elements(p.specs) AS spec
WHERE spec->>'name' = 'BlueskyRun'
)
),
streams_with_good_children AS (
SELECT
cs.stream_id AS stream_id,
cs.parent_id AS parent_id
FROM candidate_streams cs
WHERE NOT EXISTS (
SELECT 1
FROM nodes c
WHERE c.parent = cs.stream_id
AND NOT EXISTS (
SELECT 1
FROM jsonb_array_elements(c.specs) AS spec
WHERE spec->>'name' = 'BlueskyEventStream'
)
)
)
-- From here on, use only these filtered 'streams' nodes.
-- Store them into a temp table for re-use in multiple statements.
SELECT * INTO TEMP TABLE selected_streams FROM streams_with_good_children;

-- Debugging check: see which streams we’re about to process (Optional; comment out for production)
DO $$
BEGIN
RAISE NOTICE 'Number of selected ''streams'' nodes to be removed: %',
(SELECT COUNT(*) FROM selected_streams);
END$$;


-- ============================================================
-- (1b) Rename the selected 'streams' nodes to avoid transient key conflicts.
-- ============================================================

UPDATE nodes
SET key = '_streams_to_be_deleted'
WHERE id IN (SELECT stream_id FROM selected_streams);


-- =============================================================================
-- (2) Conflict detection — prevent name collisions under new parents.
-- =============================================================================

DO $$
DECLARE
conflict RECORD;
BEGIN
FOR conflict IN
SELECT c.id AS child_id, c.key AS child_key, s.parent_id
FROM selected_streams s
JOIN nodes c ON c.parent = s.stream_id
JOIN nodes existing
ON existing.parent IS NOT DISTINCT FROM s.parent_id
AND existing.key = c.key
AND existing.id <> c.id
LOOP
RAISE EXCEPTION
'Aborting: moving child "%" (id=%) would conflict with existing node of same key under parent id=%',
conflict.child_key, conflict.child_id, conflict.parent_id;
END LOOP;
RAISE NOTICE 'No name conflicts detected. Proceeding with migration.';
END$$;


-- ====================================================================
-- 3) Reparent children: set each child.parent = stream.parent
-- ====================================================================

DO $$
DECLARE moved_count integer;
BEGIN
UPDATE nodes n
SET parent = s.parent_id
FROM selected_streams s
WHERE n.parent = s.stream_id;

GET DIAGNOSTICS moved_count = ROW_COUNT;
RAISE NOTICE 'Reparented % node(s) (children of selected streams).', moved_count;
END$$;

-- ====================================================================
-- 4) Build the set of all descendants of the streams nodes (including the stream node itself).
-- We'll use this set to adjust closure rows.
-- ====================================================================

CREATE TEMP TABLE descendants_to_move AS
SELECT DISTINCT c.descendant
FROM selected_streams s
JOIN nodes_closure c ON c.ancestor = s.stream_id;

DO $$
DECLARE dcount integer;
BEGIN
SELECT COUNT(*) INTO dcount FROM descendants_to_move;
RAISE NOTICE 'Total distinct descendant node(s) to move = %.', dcount;
END$$;

-- ====================================================================
-- 5) Adjust the closure table: for any (ancestor, descendant) where
-- descendant IN the set of descendants AND ancestor NOT IN the
-- set of descendants, reduce depth by 1.
-- ====================================================================
DO $$
DECLARE adjust_count integer;
BEGIN
UPDATE nodes_closure nc
SET depth = nc.depth - 1
FROM descendants_to_move d
WHERE nc.descendant = d.descendant
AND nc.ancestor NOT IN (SELECT descendant FROM descendants_to_move)
AND nc.depth > 0; -- defensive: only decrement positive depths

GET DIAGNOSTICS adjust_count = ROW_COUNT;
RAISE NOTICE 'Adjusted depth (decremented by 1) for % closure rows.', adjust_count;
END$$;

-- ====================================================================
-- 6) Delete the selected stream nodes. Their nodes_closure rows that
-- reference them will be removed by ON DELETE CASCADE on the FK.
-- ====================================================================
DO $$
DECLARE del_count integer;
BEGIN
DELETE FROM nodes
WHERE id IN (SELECT stream_id FROM selected_streams);

GET DIAGNOSTICS del_count = ROW_COUNT;
RAISE NOTICE 'Deleted % stream node(s).', del_count;
END$$;

COMMIT;
23 changes: 23 additions & 0 deletions scripts/remove-streams/pixi.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[workspace]
authors = ["Daniel Allan & Eugene M <[email protected]>"]
channels = ["conda-forge"]
name = "remove-streams"
platforms = ["osx-64", "linux-64"]
version = "0.1.0"

[tasks]
# Clean up the storage location; proceed if no files are found
clean = {cmd = "/bin/rm -rf storage/* || true"}
# Start tiled server from a config file
serve = {cmd = "tiled serve config config.yml", depends-on=["clean"]}

[dependencies]
python = ">=3.12,<3.13"
ipython = ">=9.4.0,<10"
ophyd = ">=1.10.7,<2"

[pypi-dependencies]
sparse = ">=0.17.0, <0.18"
tiled = { version = ">=0.1.0, <0.2", extras = ["all"] }
bluesky-tiled-plugins = ">=2.0.0b68, <3"
bluesky = "==1.14.5,<2"
Loading