Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,80 @@
# Environment and secrets
.env
.env.*
.secrets
.venv
venv/
ENV/

# Data and models (IMPORTANT - this saves GBs!)
data/
data**
cache/
*code-workspace*
**cache**
persist/
models/
results/
basic_auth.ini

# Python artifacts
__pycache__/
*.py[cod]
*$py.class
*.so
*.pyc
.Python
build/
dist/
*.egg-info/
*.egg
.pytest_cache/
.coverage
.cache

# Notebooks and test files
**.ipynb
**.pkl
test.py
.ipynb_checkpoints/

# IDE and editor
.vscode/
.vscode-server/
*code-workspace*
.idea/
.spyderproject/
.spyproject/
.ropeproject/

# Logs and databases
*.log
.egg-info
*.db
*.sqlite3
db.sqlite3-journal
pip-log.txt

# Git
.git/
.gitignore
.github/

# Docker files (no need to copy these into image)
Dockerfile*
docker-compose*.yml
.dockerignore

# Documentation
docs/
*.md
!README.md

# OS files
.DS_Store
*.swp
*.swo

# Other
workspace/
.dotnet/
.gnupg/
.ssh/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ COPY . /app/work
RUN pip install --upgrade pip
RUN pip install .[lse]

ENV HOME /app/work
ENV HOME=/app/work

CMD ["python", "frontend.py"]
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

tiled:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:v0.1.0a118
image: ghcr.io/bluesky/tiled:main
container_name: tiled-server
ports:
- "127.0.0.1:8000:8000"
Expand Down Expand Up @@ -131,6 +131,7 @@ services:
LIVE_TILED_API_KEY: '${LIVE_TILED_API_KEY}'
RESULTS_TILED_URI: '${RESULTS_TILED_URI}'
RESULTS_TILED_API_KEY: '${RESULTS_TILED_API_KEY}'
TILED_REPLAY_PREFIX: 'beamlines/bl931/processed' # NEW: Add this line
# Prefect
PREFECT_API_URL: '${PREFECT_API_URL}'
FLOW_NAME: '${FLOW_NAME}'
Expand All @@ -151,7 +152,7 @@ services:
MODE: "development"
USER: ${USER}
# Live mode
WEBSOCKET_URL: ${WEBSOCKET_URL:-ws://localhost:8765/lse}
WEBSOCKET_URL: ${WEBSOCKET_URL:-ws://localhost:8765/lse_operator}
# MLflow
MLFLOW_TRACKING_URI: '${MLFLOW_TRACKING_URI}'
MLFLOW_TRACKING_USERNAME: '${MLFLOW_TRACKING_USERNAME}'
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,8 @@ arroyo = [
"torchvision==0.17.2",
"transformers==4.47.1",
"umap-learn",
"joblib==1.4.2"
"joblib==1.4.2",
"mlflow==2.22.0",
"mlex_utils[all]@git+https://github.com/mlexchange/mlex_utils.git",
"numpy<2.0.0"
]
2 changes: 2 additions & 0 deletions settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ lse_operator:
tiled_publisher: # Added new section for TiledResultsPublisher
root_segments:
- lse_live_results
tiled_prefix: beamlines/bl931/processed # Prefix path for feature vectors
local_image_publisher: # New section for TiledLocalImagePublisher
container_name: live_data_cache # Container for storing images
tiled_prefix: beamlines/bl931/processed # Prefix path for XPS images

lse_reducer:
demo_mode: true
Expand Down
16 changes: 14 additions & 2 deletions src/arroyo_reduction/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,19 @@ async def process(self, message: SASMessage) -> None:
logger.info("Received Start Message")
await self.publish(message)
elif isinstance(message, RawFrameEvent):
await self.publish(message)
# NEW: Check if models are selected before publishing RawFrameEvent
if self.redis_model_store is not None:
autoencoder_model = self.redis_model_store.get_autoencoder_model()
dimred_model = self.redis_model_store.get_dimred_model()

if not autoencoder_model or not dimred_model:
logger.info(f"In offline mode - skipping write image {message.frame_number}")
else:
await self.publish(message)
else:
# If redis not available, publish anyway (default behavior)
await self.publish(message)

result = await self.dispatch(message)
if result is not None: # Only publish if we got a valid result
await self.publish(result)
Expand Down Expand Up @@ -73,7 +85,7 @@ async def dispatch(self, message: RawFrameEvent) -> LatentSpaceEvent:
self._flush_sent = True
logger.info("Sent flush signal when entering offline mode")

logger.info(f"In offline mode - skipping frame {message.frame_number}")
logger.info(f"In offline mode - skipping dispatch frame {message.frame_number}")
return None
else:
# NEW: Reset flush flag when back in live mode
Expand Down
2 changes: 1 addition & 1 deletion src/arroyo_reduction/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class LSEWSResultPublisher(Publisher):
connected_clients = set()
current_start_message = None

def __init__(self, host: str = "localhost", port: int = 8765, path="/lse"):
def __init__(self, host: str = "localhost", port: int = 8765, path="/lse_operator"):

super().__init__()
self.host = host
Expand Down
91 changes: 68 additions & 23 deletions src/arroyo_reduction/tiled_results_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
class TiledResultsPublisher(Publisher):
"""Publisher that saves latent space vectors to a Tiled server."""

def __init__(self, tiled_uri=None, tiled_api_key=None, root_segments=None):
def __init__(self, tiled_uri=None, tiled_api_key=None, root_segments=None, tiled_prefix=None):
super().__init__()
self.tiled_uri = tiled_uri or RESULTS_TILED_URI
self.tiled_api_key = tiled_api_key or RESULTS_TILED_API_KEY
self.tiled_prefix = tiled_prefix # NEW: Add prefix support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do tiled_prefix and root_segments differ?

Copy link
Contributor Author

@xiaoyachong xiaoyachong Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example tiled_prefix: /beamlines/bl931/processed/
Example root_segments: lse_live_results

The tiled file structure is like:

{tiled_base_uri}/{tiled_prefix}/
│
└── {root_segments}/
    └── {USER}/
        └── {daily_run_YYYY-MM-DD}/
            └── {experiment_name}/
                └── {UUID}/
                    ├── feature_vectors (table for saving feature vector results)
                    └── xps_averaged_heatmaps (3D array for saving heatmaps)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the map, that's helps. I see now.

I wonder whether we should remove {USER}. Multiple users may want to browse the results, and the date seems to be the a more significant than who was taking it.

To keep one single collection from growing too large, what do you think of creating subfolders by year, month than day?

Copy link
Contributor Author

@xiaoyachong xiaoyachong Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion.

The {USER} component is currently used as a restriction in experiment_replay, ensuring that users can only replay their own experiments.
@taxe10 I’m not sure whether we should remove this — perhaps Tanny can share her thoughts.

Regarding the {daily_run_YYYY-MM-DD} container node, we could split it into three hierarchical layers:

{tiled_base_uri}/{tiled_prefix}/
│
└── {root_segments}/
    └── {USER}/
        └── {YYYY}/                    
            └── {MM}/                 
                └── {DD}/             
                    └── {experiment_name}/
                        └── {UUID}/
                            ├── feature_vectors (table for saving feature vector results)
                            └── xps_averaged_heatmaps (3D array for saving heatmaps)

self.root_segments = root_segments or ["lse_live_results"]
self.user = USER # Get user from environment
self.client = None
Expand Down Expand Up @@ -72,8 +73,22 @@ def _start_sync(self):
try:
self.client = from_uri(self.tiled_uri, api_key=self.tiled_api_key)

# NEW: Navigate to prefix first if specified - ERROR if it doesn't exist
container = self.client
if self.tiled_prefix:
prefix_segments = self.tiled_prefix.split('/')
for segment in prefix_segments:
if segment: # Skip empty strings
if segment in container:
logger.info(f"Using existing prefix container: {segment}")
container = container[segment]
else:
# Create the prefix path if it doesn't exist
logger.info(f"Creating prefix container: {segment}")
container = container.create_container(segment)

# Navigate to the root container and create the hierarchy
self._setup_containers_sync()
self._setup_containers_sync(container)

# List all existing tables in the daily container
if self.daily_container is not None:
Expand All @@ -90,7 +105,8 @@ def _start_sync(self):
logger.info(f"Examples of existing UUIDs: {', '.join(examples)}")

logger.info(f"Connected to Tiled server at {self.tiled_uri}")
logger.info(f"Using container path: {'/'.join(self.root_segments)}/{self.user}/{DAILY_RUN_ID}")
prefix_path = f"{self.tiled_prefix}/" if self.tiled_prefix else ""
logger.info(f"Using container path: {prefix_path}{'/'.join(self.root_segments)}/{self.user}/{DAILY_RUN_ID}")
except Exception as e:
logger.error(f"Error in _start_sync: {e}")
import traceback
Expand All @@ -114,11 +130,13 @@ def _extract_uuid_from_url(self, url):
logger.debug(f"No UUID found in URL, using default: {self.default_table_name}")
return self.default_table_name

def _setup_containers_sync(self):
def _setup_containers_sync(self, starting_container=None):
"""Set up the container structure with USER level (synchronous version)."""
try:
# Navigate through root_segments
container = self.client
# NEW: Start from provided container or client
container = starting_container if starting_container is not None else self.client

# Navigate through root_segments (these we can create)
for segment in self.root_segments:
if segment in container:
logger.info(f"Using existing container: {segment}")
Expand Down Expand Up @@ -232,19 +250,29 @@ def _publish_sync(self, message):
# Get experiment container
experiment_container = self._get_experiment_container(experiment_name)

# Check if this UUID already exists
# NEW: Check if UUID container exists (not UUID/feature_vectors table)
if uuid in experiment_container:
logger.debug(f"Skipping vector for existing UUID: {uuid}")
return None
uuid_container = experiment_container[uuid]
# Check if feature_vectors table exists inside UUID container
if "feature_vectors" in uuid_container:
logger.debug(f"Skipping vector for existing UUID: {uuid}")
return None

# Check if this is a new UUID
uuid_to_write = None

if self.current_uuid is not None and uuid != self.current_uuid and self.current_uuid in self.uuid_dataframes:
# We have a new UUID, so write the data for the previous UUID (if it's not an existing UUID)
if self.current_uuid not in experiment_container and not self.uuid_dataframes[self.current_uuid].empty:
logger.info(f"New UUID detected, marking previous UUID for writing: {self.current_uuid}")
uuid_to_write = self.current_uuid
# We have a new UUID, so write the data for the previous UUID
if not self.uuid_dataframes[self.current_uuid].empty:
# Check if the previous UUID's feature_vectors already exists
prev_uuid_container = experiment_container.get(self.current_uuid)
should_write = True
if prev_uuid_container and "feature_vectors" in prev_uuid_container:
should_write = False

if should_write:
logger.info(f"New UUID detected, marking previous UUID for writing: {self.current_uuid}")
uuid_to_write = self.current_uuid

# Update current UUID
self.current_uuid = uuid
Expand Down Expand Up @@ -300,10 +328,12 @@ def _write_table_to_tiled_sync(self, table_key):
# Get experiment container instead of using daily_container
experiment_container = self._get_experiment_container(self.current_experiment_name)

# Check if this UUID already exists
# NEW: Check if UUID container exists, and if feature_vectors table exists inside it
if table_key in experiment_container:
logger.info(f"Skipping write for existing UUID: {table_key}")
return
uuid_container = experiment_container[table_key]
if "feature_vectors" in uuid_container:
logger.info(f"Skipping write for existing UUID: {table_key} (feature_vectors already exists)")
return

# Get the DataFrame for this UUID
df = self.uuid_dataframes.get(table_key)
Expand All @@ -312,19 +342,25 @@ def _write_table_to_tiled_sync(self, table_key):
return

# Log DataFrame info for debugging
logger.info(f"Writing {len(df)} vectors to new table '{table_key}' in {self.user}/{DAILY_RUN_ID}/{self.current_experiment_name}")
logger.info(f"Writing {len(df)} vectors to new table '{table_key}/feature_vectors' in {self.user}/{DAILY_RUN_ID}/{self.current_experiment_name}")

# Check if DataFrame is empty
if df.empty:
logger.warning(f"DataFrame for {table_key} is empty, nothing to write")
return

# Simply write the DataFrame to Tiled
# NEW: Create UUID container if it doesn't exist
if table_key not in experiment_container:
logger.info(f"Creating UUID container: {table_key}")
experiment_container.create_container(table_key)

uuid_container = experiment_container[table_key]

# Write the DataFrame as "feature_vectors" inside the UUID container
try:
# Use write_dataframe with the UUID as the key
experiment_container.write_dataframe(df, key=table_key)
uuid_container.write_dataframe(df, key="feature_vectors")

logger.info(f"Successfully wrote {len(df)} vectors to '{table_key}'")
logger.info(f"Successfully wrote {len(df)} vectors to '{table_key}/feature_vectors'")

# Add this UUID to our set of existing UUIDs
self.existing_uuids.add(table_key)
Expand Down Expand Up @@ -373,10 +409,16 @@ def _stop_sync(self):

# Check if the current UUID needs writing
if (self.current_uuid is not None and
self.current_uuid not in experiment_container and
self.current_uuid in self.uuid_dataframes and
not self.uuid_dataframes[self.current_uuid].empty):

# Check if UUID container and feature_vectors table already exist
if self.current_uuid in experiment_container:
uuid_container = experiment_container[self.current_uuid]
if "feature_vectors" in uuid_container:
logger.info(f"UUID {self.current_uuid} already has feature_vectors, skipping write")
return None

return self.current_uuid

return None
Expand All @@ -389,4 +431,7 @@ def _stop_sync(self):
@classmethod
def from_settings(cls, settings):
"""Create a TiledResultsPublisher from settings."""
return cls(root_segments=settings.get("root_segments"))
return cls(
root_segments=settings.get("root_segments"),
tiled_prefix=settings.get("tiled_prefix") # NEW: Pass prefix from settings
)
Loading