Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 74 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,80 @@
# Environment and secrets
.env
.env.*
.secrets
.venv
venv/
ENV/

# Data and models (IMPORTANT - this saves GBs!)
data/
data**
cache/
*code-workspace*
**cache**
persist/
models/
results/
basic_auth.ini

# Python artifacts
__pycache__/
*.py[cod]
*$py.class
*.so
*.pyc
.Python
build/
dist/
*.egg-info/
*.egg
.pytest_cache/
.coverage
.cache

# Notebooks and test files
**.ipynb
**.pkl
test.py
.ipynb_checkpoints/

# IDE and editor
.vscode/
.vscode-server/
*code-workspace*
.idea/
.spyderproject/
.spyproject/
.ropeproject/

# Logs and databases
*.log
.egg-info
*.db
*.sqlite3
db.sqlite3-journal
pip-log.txt

# Git
.git/
.gitignore
.github/

# Docker files (no need to copy these into image)
Dockerfile*
docker-compose*.yml
.dockerignore

# Documentation
docs/
*.md
!README.md

# OS files
.DS_Store
*.swp
*.swo

# Other
workspace/
.dotnet/
.gnupg/
.ssh/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ COPY . /app/work
RUN pip install --upgrade pip
RUN pip install .[lse]

ENV HOME /app/work
ENV HOME=/app/work

CMD ["python", "frontend.py"]
5 changes: 3 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

tiled:
# see the file ./tiled/deploy/config.yml for detailed configuration of tiled
image: ghcr.io/bluesky/tiled:v0.1.0a118
image: ghcr.io/bluesky/tiled:0.1.0-b13
container_name: tiled-server
ports:
- "127.0.0.1:8000:8000"
Expand Down Expand Up @@ -131,6 +131,7 @@ services:
LIVE_TILED_API_KEY: '${LIVE_TILED_API_KEY}'
RESULTS_TILED_URI: '${RESULTS_TILED_URI}'
RESULTS_TILED_API_KEY: '${RESULTS_TILED_API_KEY}'
TILED_REPLAY_PREFIX: 'beamlines/bl931/processed' # NEW: Add this line
# Prefect
PREFECT_API_URL: '${PREFECT_API_URL}'
FLOW_NAME: '${FLOW_NAME}'
Expand All @@ -151,7 +152,7 @@ services:
MODE: "development"
USER: ${USER}
# Live mode
WEBSOCKET_URL: ${WEBSOCKET_URL:-ws://localhost:8765/lse}
WEBSOCKET_URL: ${WEBSOCKET_URL:-ws://localhost:8765/lse_operator}
# MLflow
MLFLOW_TRACKING_URI: '${MLFLOW_TRACKING_URI}'
MLFLOW_TRACKING_USERNAME: '${MLFLOW_TRACKING_USERNAME}'
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,8 @@ arroyo = [
"torchvision==0.17.2",
"transformers==4.47.1",
"umap-learn",
"joblib==1.4.2"
"joblib==1.4.2",
"mlflow==2.22.0",
"mlex_utils[all]@git+https://github.com/mlexchange/mlex_utils.git",
"numpy<2.0.0"
]
2 changes: 2 additions & 0 deletions settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ lse_operator:
tiled_publisher: # Added new section for TiledResultsPublisher
root_segments:
- lse_live_results
tiled_prefix: beamlines/bl931/processed # Prefix path for feature vectors
local_image_publisher: # New section for TiledLocalImagePublisher
container_name: live_data_cache # Container for storing images
tiled_prefix: beamlines/bl931/processed # Prefix path for XPS images

lse_reducer:
demo_mode: true
Expand Down
44 changes: 39 additions & 5 deletions src/arroyo_reduction/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,46 @@ def __init__(self, reducer: Reducer):
logger.warning(f"Could not connect to Redis Model Store: {e}")
self.redis_model_store = None

def _check_models_selected(self):
"""
Synchronous helper to check if models are selected in Redis.
This will be called via asyncio.to_thread() to avoid blocking the event loop.

Returns:
tuple: (autoencoder_model, dimred_model) or (None, None) if not available
"""
if self.redis_model_store is None:
return (None, None)

try:
autoencoder_model = self.redis_model_store.get_autoencoder_model()
dimred_model = self.redis_model_store.get_dimred_model()
return (autoencoder_model, dimred_model)
except Exception as e:
logger.error(f"Error checking model selection in Redis: {e}")
return (None, None)

async def process(self, message: SASMessage) -> None:
# logger.debug("message recvd")
if isinstance(message, Start):
logger.info("Received Start Message")
await self.publish(message)
elif isinstance(message, RawFrameEvent):
await self.publish(message)
# NEW: Check if models are selected before publishing RawFrameEvent
if self.redis_model_store is not None:
# Run Redis check in thread pool to avoid blocking event loop
autoencoder_model, dimred_model = await asyncio.to_thread(
self._check_models_selected
)

if not autoencoder_model or not dimred_model:
logger.info(f"In offline mode - skipping write image {message.frame_number}")
else:
await self.publish(message)
else:
# If redis not available, publish anyway (default behavior)
await self.publish(message)

result = await self.dispatch(message)
if result is not None: # Only publish if we got a valid result
await self.publish(result)
Expand All @@ -53,9 +86,10 @@ async def dispatch(self, message: RawFrameEvent) -> LatentSpaceEvent:
try:
# Use the RedisModelStore instead of direct Redis client
if self.redis_model_store is not None:
# Check if processing is disabled (by checking if models are set)
autoencoder_model = self.redis_model_store.get_autoencoder_model()
dimred_model = self.redis_model_store.get_dimred_model()
# Run Redis check in thread pool to avoid blocking event loop
autoencoder_model, dimred_model = await asyncio.to_thread(
self._check_models_selected
)

if not autoencoder_model or not dimred_model:
# NEW: Send flush only once when entering offline mode
Expand All @@ -73,7 +107,7 @@ async def dispatch(self, message: RawFrameEvent) -> LatentSpaceEvent:
self._flush_sent = True
logger.info("Sent flush signal when entering offline mode")

logger.info(f"In offline mode - skipping frame {message.frame_number}")
logger.info(f"In offline mode - skipping dispatch frame {message.frame_number}")
return None
else:
# NEW: Reset flush flag when back in live mode
Expand Down
2 changes: 1 addition & 1 deletion src/arroyo_reduction/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class LSEWSResultPublisher(Publisher):
connected_clients = set()
current_start_message = None

def __init__(self, host: str = "localhost", port: int = 8765, path="/lse"):
def __init__(self, host: str = "localhost", port: int = 8765, path="/lse_operator"):

super().__init__()
self.host = host
Expand Down
Loading