Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ lse_operator:
zmq_address: tcp://sim_local_tiled:5000
vector_save:
db_path: /data/vector_save/latent_vectors.db # Path to SQLite database for saving vectors
tiled_publisher: # Added new section for TiledResultsPublisher
tiled_publisher: # Settings for TiledResultsPublisher
root_segments:
- lse_live_results
local_image_publisher: # New section for TiledLocalImagePublisher
container_name: live_data_cache # Container for storing images

lse_reducer:
demo_mode: true
Expand Down
14 changes: 10 additions & 4 deletions src/arroyo_reduction/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from .publisher import LSEWSResultPublisher
from .redis_model_store import RedisModelStore
from .vector_save import VectorSavePublisher
from .tiled_results_publisher import TiledResultsPublisher # Add import
from .tiled_results_publisher import TiledResultsPublisher
from .tiled_local_image_publisher import TiledLocalImagePublisher # Updated import

settings = Dynaconf(
envvar_prefix="",
Expand Down Expand Up @@ -59,6 +60,11 @@ async def start() -> None:
tiled_publisher = TiledResultsPublisher.from_settings(app_settings.tiled_publisher)
asyncio.create_task(tiled_publisher.start())

# Initialize the new TiledLocalImagePublisher for caching raw images
# Using existing tiled_publisher settings for now, but could be updated to use a dedicated section
local_image_publisher = TiledLocalImagePublisher.from_settings(app_settings.local_image_publisher)
asyncio.create_task(local_image_publisher.start())

# Initialize Redis model store instead of direct Redis client
logger.info("Initializing Redis Model Store")
redis_model_store = RedisModelStore(host=REDIS_HOST, port=REDIS_PORT)
Expand Down Expand Up @@ -91,12 +97,12 @@ async def start() -> None:
operator = LatentSpaceOperator.from_settings(app_settings, settings.lse_reducer)
operator.add_publisher(ws_publisher)
operator.add_publisher(vector_save_publisher)
operator.add_publisher(tiled_publisher) # Add the Tiled publisher

listener = ZMQFrameListener.from_settings(app_settings.listener, operator)
operator.add_publisher(tiled_publisher)
operator.add_publisher(local_image_publisher) # Add the local image publisher

# Start the listener
logger.info("Starting to listen for messages from arroyo_sim")
listener = ZMQFrameListener.from_settings(app_settings.listener, operator)
await listener.start()

except Exception as e:
Expand Down
50 changes: 42 additions & 8 deletions src/arroyo_reduction/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .reducer import LatentSpaceReducer, Reducer
from .schemas import LatentSpaceEvent
from .redis_model_store import RedisModelStore # Import the RedisModelStore class
from .redis_model_store import RedisModelStore

logger = logging.getLogger("arroyo_reduction.operator")

Expand All @@ -32,13 +32,19 @@ def __init__(self, proxy_socket: zmq.Socket, reducer: Reducer):
except Exception as e:
logger.warning(f"Could not connect to Redis Model Store: {e}")
self.redis_model_store = None

# Dictionary to store mappings from remote to local Tiled URLs
self.local_image_urls = {}

async def process(self, message: SASMessage) -> None:
# logger.debug("message recvd")
if isinstance(message, Start):
logger.info("Received Start Message")
await self.publish(message)
elif isinstance(message, RawFrameEvent):
# Process the RawFrameEvent message normally
# Note: All publishers will automatically receive this message through self.publish
await self.publish(message)
result = await self.dispatch(message)
if result is not None: # Only publish if we got a valid result
await self.publish(result)
Expand Down Expand Up @@ -88,16 +94,26 @@ async def dispatch(self, message: RawFrameEvent) -> LatentSpaceEvent:
current_autoencoder = self.reducer.autoencoder_model_name
current_dimred = self.reducer.dimred_model_name

# Get the local Tiled URL if available
local_tiled_url = self.get_local_url(message.tiled_url)

# If we have a local URL, log it
if local_tiled_url:
logger.debug(f"Using local Tiled URL: {local_tiled_url} instead of {message.tiled_url}")

response = LatentSpaceEvent(
tiled_url=message.tiled_url,
# Use local URL if available, otherwise use the original
tiled_url=local_tiled_url or message.tiled_url,
# Store the original URL regardless
original_tiled_url=message.tiled_url,
feature_vector=feature_vector[0].tolist(),
index=message.frame_number,
autoencoder_model=current_autoencoder, # Add autoencoder model name
dimred_model=current_dimred, # Add dimension reduction model name
timestamp=start_time, # Add start timestamp
total_processing_time=total_processing_time, # Add total processing time
autoencoder_time=timing_info.get('autoencoder_time'), # Add autoencoder processing time
dimred_time=timing_info.get('dimred_time'), # Add dimension reduction processing time
autoencoder_model=current_autoencoder,
dimred_model=current_dimred,
timestamp=start_time,
total_processing_time=total_processing_time,
autoencoder_time=timing_info.get('autoencoder_time'),
dimred_time=timing_info.get('dimred_time'),
)
return response
except Exception as e:
Expand All @@ -122,6 +138,24 @@ async def dispatch_workers(self, message: RawFrameEvent) -> LatentSpaceEvent:
except Exception as e:
logger.error(f"Error sending message to broker {e}")

def get_local_url(self, original_url):
"""Get the local URL for an original Tiled URL, if available."""
# First check our cache
if original_url in self.local_image_urls:
return self.local_image_urls[original_url]

# If not in cache, try to get it from the TiledLocalImagePublisher
for publisher in self.publishers:
if hasattr(publisher, 'get_local_url_for'):
local_url = publisher.get_local_url_for(original_url)
if local_url:
# Cache the result for future use
self.local_image_urls[original_url] = local_url
return local_url

# If no local URL is available, return None
return None

@classmethod
def from_settings(cls, settings, reducer_settings=None):
# Connect to the ZMQ Router/Dealer as a client
Expand Down
1 change: 1 addition & 0 deletions src/arroyo_reduction/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Config:

class LatentSpaceEvent(Event):
tiled_url: str
original_tiled_url: str = None # Add this field to store the original URL
feature_vector: list[float]
index: int
autoencoder_model: str = None # Add autoencoder model name
Expand Down
Loading