Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6c109f6
refactor: single responsibility for initializers
raymondwjang Feb 28, 2025
95ee5aa
feat: init real cnmf-e related objects
raymondwjang Feb 28, 2025
5e97cbb
doc: update package docs
raymondwjang Mar 1, 2025
a3d45cb
docs: update blueprint
raymondwjang Mar 3, 2025
a3841d8
refactor: rename spatial -> footprint, temporal -> traces
raymondwjang Mar 3, 2025
ddbce0f
debug: axis name handling
raymondwjang Mar 3, 2025
0bac7ad
debug: buffered initialization for plugging into a streaming format
raymondwjang Mar 3, 2025
7fed9f1
feat: use a runner instead of decorator
raymondwjang Mar 5, 2025
cf37214
feat: consolidate type decl/def for runners, transformers, components…
raymondwjang Mar 5, 2025
e3eee80
chore: remove currently unused files
raymondwjang Mar 5, 2025
52ea280
refactor: rename folders
raymondwjang Mar 5, 2025
ee741aa
feat: centralize and define footprint/s, trace/s types
raymondwjang Mar 5, 2025
88fa059
tests: path changes
raymondwjang Mar 5, 2025
0160f42
refactor: rename to storemanager
raymondwjang Mar 5, 2025
0f63378
💀: refactoring design complete. missed the commit moment and now all …
raymondwjang Mar 6, 2025
f5be35c
feat: make create_many method for batch component generation
raymondwjang Mar 6, 2025
7f13cce
refactor: align component type keys with the actual class names
raymondwjang Mar 6, 2025
e387848
refactor: build collect method for the initialization steps (no updat…
raymondwjang Mar 6, 2025
4cc60e3
debug: remove deprecated type usage
raymondwjang Mar 6, 2025
76af81a
debug: remove deprecated type usage. fix typos
raymondwjang Mar 6, 2025
0333457
chore: restructure test folder
raymondwjang Mar 6, 2025
a52a920
test: test_runner_initialization passing
raymondwjang Mar 6, 2025
478afbb
tests: test_runner_dependency_resolution passing
raymondwjang Mar 6, 2025
1d90fa3
tests: test_cyclic_dependency_detection passing
raymondwjang Mar 6, 2025
2d66661
tests: test_runner.py passes
raymondwjang Mar 6, 2025
639965f
tests: test_pipe_config.py passes
raymondwjang Mar 6, 2025
4314952
tests: test_types.py passing
raymondwjang Mar 6, 2025
8543ffa
tests: test_footprints.py passing
raymondwjang Mar 6, 2025
713a195
tests: test_traces.py passes
raymondwjang Mar 6, 2025
59e524b
tests: test_meta.py passes
raymondwjang Mar 6, 2025
11699e1
tests: test_registry.py passing
raymondwjang Mar 6, 2025
1f0ef17
tests: test_footprints.py enhanced
raymondwjang Mar 6, 2025
6e0f3cf
tests: test_meta.py changed to class based
raymondwjang Mar 6, 2025
278004d
tests: test_types.py changed to class based
raymondwjang Mar 6, 2025
dd77dca
tests: test_traces.py changed to class based
raymondwjang Mar 7, 2025
e34d9a1
tests: prepare to overhaul 4 test modules
raymondwjang Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 201 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Introduction

Building a streaming CNMF package - starting with the module / class designs.
This pacakge should be able to accomplish the following:

1. Function like an ML pipeline (hopefully inherit from Sklearn classes)
2. Support streaming operations (Sklearn class instances are light, so that's good. Not sure how it marries into the
streaming operations, since it's primarily designed for batch one-shot fit / transform. Operations like partial_fit
might have to be custom written.)
3. Maybe instead of frame by frame, a few frames at a time?
4. Output / save visualizations and matrix data real time at all stages of processing
5. Store parameters and hyperparameters, and exposed so that the user can view/modify them while processing.
6. Parameters get updated real time as the new data streams in.
7. The learned / updated parameters should be able to retroactively propagate the earlier data to refine the results.
This may have to be done post-processing.
8. Plug smoothly into the batch processing side of the pacakge.

## Streaming

```python
from river import compose, base
from dataclasses import dataclass
from typing import Dict, Tuple, NamedTuple

@dataclass
class ProcessedFrame:
"""Container for frame processing results"""
footprints: np.ndarray
fluorescence_traces: np.ndarray
spikes: np.ndarray
timestamp: float

class StreamState:
"""Container for maintaining stream state"""
def __init__(self):
self.pixel_stats = None
self.component_stats = None
self.residual_buffer = None
self.num_components = 0
self.overlapping_groups = None

def update(self, result: Dict[str, Any]):
"""Update state with latest processing results"""
self.pixel_stats = result.get('pixel_stats', self.pixel_stats)
self.component_stats = result.get('component_stats', self.component_stats)
self.residual_buffer = result.get('residual_buffer', self.residual_buffer)
self.num_components = result.get('num_components', self.num_components)
self.overlapping_groups = result.get('overlapping_groups', self.overlapping_groups)

class CompleteStreamProcessor:
def __init__(self):
# Initialize component manager
self.component_manager = ComponentManager()

# Initialize preprocessor and motion stabilizer
self.preprocessor = compose.Pipeline(
('preprocessor', PreprocessorTransformer()),
('motion_stabilizer', MotionStabilizerTransformer()),
)

# Initialize components
self.initializers = compose.Pipeline(
('footprints', FootprintsInitializer(
params=FootprintsInitializerParams()
)),
('traces', TracesInitializer(
params=TracesInitializerParams()
)),
('pixel_stats', PixelStatsTransformer(
params=PixelStatsParams()
))
)

# Create processing pipeline using River's compose
self.pipeline = compose.Pipeline(
('trace_updater', TraceUpdaterTransformer()),
('deconvolver', DeconvolverTransformer()),
('component_detector', ComponentDetectorTransformer()),
('stats_tracker', StatsTrackerTransformer()),
('footprint_tracker', FootprintTrackerTransformer())
)

# Initialize state containers
self.state = StreamState()

def process_video_stream(self, video_stream):
"""Process video stream using River's incremental learning approach"""
for frame, timestamp in video_stream:
# Preprocess frame
frame = self.preprocessor.learn_transform_one({
'frame': frame,
'timestamp': timestamp
})

if not (self.initializers.footprints.is_initialized and
self.initializers.traces.is_initialized and ...):
# Still in initialization phase
self.component_manager = self.initializers.learn_transform_one(
self.component_manager,
frame
)

# Skip update phase until both initializers are ready
continue

# Update and transform pipeline with new frame
result = self.pipeline.learn_transform_one({
'timestamp': timestamp,
'state': self.state,
'frame': frame,
'component_manager': self.component_manager
})

# Update state
self.state.update(result)

yield ProcessedFrame(
footprints=result.component_manager.footprints,
fluorescence_traces=result.component_manager.traces,
spikes=result.component_manager.spikes,
timestamp=timestamp
)
```

## Example usage

```python
# Example usage:
def process_video(initial_frames: xr.DataArray, streaming_frames):
processor = CompleteStreamProcessor()

# Process streaming data
for result in processor.process_video_stream(streaming_frames):
# Handle results (visualize, save, etc.)
pass
```

## Example of transformer implementation

```python
class StreamComponent(base.Transformer):
"""Base class for all stream processing components"""
def learn_one(self, frame: Any) -> 'StreamComponent':
return self

def transform_one(self, frame: Any) -> Any:
raise NotImplementedError

class PreprocessorTransformer(StreamComponent):
def transform_one(self, X: Dict[str, Any]) -> Dict[str, Any]:
frame = X['frame']
processed_frame = self.process_frame(frame)
return {**X, 'frame': processed_frame}

def process_frame(self, frame):
# Implementation of frame preprocessing
pass

class MotionStabilizerTransformer(StreamComponent):
def transform_one(self, X: Dict[str, Any]) -> Dict[str, Any]:
frame = X['frame']
stabilized = self.process_frame(frame)
return {**X, 'frame': stabilized}

class DeconvolverTransformer(StreamComponent):
def transform_one(self, X: Dict[str, Any]) -> Dict[str, Any]:
traces = X['fluorescence_traces']
fluorescence_traces, spiking_traces = self.deconvolve_traces(
traces, gamma=0.95, lambda_=0.01, min_spike_size=0.1
)
return {
**X,
'fluorescence_traces': fluorescence_traces,
'spiking_traces': spiking_traces
}
```

## Monitoring

```python
from river import metrics


class MonitoredStreamProcessor(CompleteStreamProcessor):
def __init__(self):
super().__init__()
self.processing_time = metrics.Rolling(window_size=100)
self.memory_usage = metrics.Rolling(window_size=100)

def process_video_stream(self, video_stream):
for frame, timestamp in video_stream:
start_time = time.time()

result = yield from super().process_video_stream([(frame, timestamp)])

# Update metrics
self.processing_time.replace(time.time() - start_time)
self.memory_usage.replace(psutil.Process().memory_info().rss)

yield result
```
113 changes: 0 additions & 113 deletions docs/package_structure.md

This file was deleted.

Loading
Loading