Skip to content

yaronso/sensor-data-processor-app

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CSV Sensor Data Processing Service

A Python service for processing CSV sensor data with custom-built cloud service mocks.


🚀 Quick Start

# Without virtual environment:
# Install dependencies
pip install -r requirements.txt

# Option 1: Run the service without UI
python run.py

# Option 2: Run the service with Streamlit UI
python run_all.py

Access:

Upload a file:

curl -X POST "http://localhost:8000/upload" -F "file=@sample_data/sensor_data.csv"

🏛️ High-Level Architecture

System Overview

┌─────────────────────────────────────────────────────────────────────┐
│                         Client Layer                                 │
│  ┌──────────────────┐              ┌──────────────────┐            │
│  │  HTTP Client     │              │  Streamlit UI    │            │
│  │  (curl/browser)  │              │  (Port 8501)     │            │
│  └────────┬─────────┘              └────────┬─────────┘            │
└───────────┼────────────────────────────────┼──────────────────────┘
            │                                 │
            └─────────────────┬───────────────┘
                              │ HTTP/REST
                              ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      API Layer (FastAPI)                             │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │  POST /upload    │  GET /results/{id}  │  GET /status/{id}   │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                              │                                        │
│                              ▼                                        │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │               FileHandler (Async Orchestration)               │  │
│  │  • Concurrent file processing (asyncio)                       │  │
│  │  • Max X concurrent tasks (configurable)                      │  │
│  │                                                               │  │
│  └───────────────────────────────────────────────────────────────┘  │
└───────────────────────────┬──────────────┬──────────────────────────┘
                            │              │
            ┌───────────────┘              └───────────────┐
            │                                               │
            ▼                                               ▼
┌──────────────────────────┐                 ┌──────────────────────────┐
│   Business Logic Layer   │                 │   Persistence Layer      │
│  ┌────────────────────┐  │                 │  ┌────────────────────┐  │
│  │  DataProcessor     │  │                 │  │  BlobStorage       │  │
│  │  • CSV parsing     │  │                 │  │  (S3 wrapper)      │  │
│  │  • Validation      │  │                 │  │                    │  │
│  │  • Aggregation     │  │                 │  │  MetadataStorage   │  │
│  │  • Error handling  │  │                 │  │  (DynamoDB wrap)   │  │
│  └────────────────────┘  │                 │  └────────────────────┘  │
└──────────────────────────┘                 └─────────┬────────────────┘
                                                       │
                                                       │ (sync interface)
                                                       │
                                                       ▼
                            ┌──────────────────────────────────────┐
                            │       Mock Layer                     │
                            │  ┌────────────────────────────────┐  │
                            │  │  MockS3Client (in-memory)      │  │
                            │  │  • Thread-safe storage         │  │
                            │  │  • Exponential backoff retry   │  │
                            │  │                                │  │
                            │  │  MockDynamoDB (in-memory)      │  │
                            │  │  • Thread-safe storage         │  │
                            │  │  • Exponential backoff retry   │  │
                            │  └────────────────────────────────┘  │
                            └──────────────────────────────────────┘

Architecture Principles

1. Layered Architecture:

  • API Layer: HTTP interface, request validation, response formatting
  • Orchestration Layer: Async task management, concurrency control
  • Business Logic: CSV processing, validation, aggregation
  • Persistence Layer: Abstract storage interfaces (Blob & Metadata storage)
  • Mock/AWS Layer: Mocking objects simulating cloud services

2. Async Orchestration with Sync Foundations:

  • Top layer (FileHandler): Async/await for concurrent I/O operations (simulated I/O with mocks)
  • Middle layers: Sync methods (matches real AWS boto3 behavior)
  • Mocks: In-memory sync operations (microsecond latency, non-blocking)

3. Separation of Concerns:

  • FileHandler: Orchestration only
  • DataProcessor: Pure business logic (per file)
  • Storage wrappers: Abstract cloud provider details
  • Mocks: Development without cloud provider dependencies

4. Production-Ready Design:

  • Thread-safe singletons for shared resources
  • Graceful error handling at every layer
  • Exponential backoff for transient failures
  • Comprehensive logging with UTC timestamps in log files

🎯 Key Features

  • Custom cloud mocks (S3, DynamoDB) - no external libraries
  • Concurrent processing (asyncio, configurable limit)
  • Robust error handling (multi-layer with retry)
  • Type safety (Pydantic, interfaces, exception hierarchy)
  • Production-ready (thread-safe, singleton pattern, logging)
  • Comprehensive unit tests
  • Docker support
  • Web UI (Streamlit)

📋 How to Run

Running the Mocks

The mocks run automatically - no configuration needed:

  • Custom S3 mock initializes on first use (singleton pattern)
  • Custom DynamoDB mock initializes on first use (singleton pattern)
  • Both use in-memory storage (thread-safe)
  • Data resets when service restarts (no persistence)

With Virtual Environment (Recommended)

python -m venv venv
venv\Scripts\activate  # Windows
source venv/bin/activate  # Mac/Linux
pip install -r requirements.txt
python run.py

With Docker

Docker Compose provides containerized deployment for both the API and UI services.

Quick Start:

# Start both API and UI services
docker-compose -f build/docker-compose.yml up

# Start in background (detached mode)
docker-compose -f build/docker-compose.yml up -d

# Stop all services
docker-compose -f build/docker-compose.yml down

Service Options:

# Start API only
docker-compose -f build/docker-compose.yml up api

# Start UI only (requires API to be running)
docker-compose -f build/docker-compose.yml up ui

# View logs
docker-compose -f build/docker-compose.yml logs -f

What's Included:

  • API service: FastAPI backend on http://localhost:8000
  • UI service: Streamlit UI on http://localhost:8501
  • Networking: Services communicate via internal sensor-network bridge
  • Health checks: API service includes health endpoint monitoring
  • Auto-restart: Services restart automatically unless explicitly stopped
  • Environment variables: All configuration externalized for easy customization

Environment Variables: Docker Compose automatically passes environment variables to containers. You can customize:

  • PORT - API server port (default: 8000)
  • UI_PORT - Streamlit UI port (default: 8501)
  • DEBUG - Enable debug mode (default: false)
  • MAX_CONCURRENT_TASKS - Concurrency limit (default: 10)
  • LOG_LEVEL - Logging verbosity (default: INFO)

🏗️ Architecture Decisions

1. Docker Health Checks

Decision: Docker Compose performs automatic health checks on the API service

Implementation:

healthcheck:
  test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
  interval: 30s
  timeout: 10s
  retries: 3
  start_period: 40s

Behavior:

  • Docker daemon executes health check every 30 seconds from within the container
  • Requests originate from 127.0.0.1 (container localhost)
  • Container marked "Healthy" after 3 consecutive successful checks
  • UI service waits for API health status before starting (depends_on: service_healthy)

Benefits:

  • Ensures API is ready before dependent services start
  • Automatic container restart on health check failures
  • Zero-downtime deployments with orchestration tools

2. Custom Mock Implementation (No External Libraries)

Decision: Built S3 and DynamoDB mocks infrastructure

3. DynamoDB for Metadata Storage

Decision: AWS DynamoDB (mocked) for results/metadata

Rationale:

  • Cloud-managed
  • Fast key-value lookups by file_id
  • Scalable, serverless
  • Production-ready

4. Asyncio for Concurrency

Decision: Asyncio with async/await

Rationale:

  • I/O-bound workload (S3 mock, DynamoDB mock, HTTP)
  • Efficient (single process, event loop)
  • FastAPI native support
  • Handles 100+ concurrent uploads
  • Memory efficient (~1-2KB per task vs ~1-8MB per thread)

Workload Analysis:

  • 95%+ I/O-bound operations: S3 mock upload/download, DynamoDB mock read/write, HTTP requests
  • ~5% CPU work: CSV parsing and validation (~1-7ms per file, benchmarked at 1.3ms for 1000 rows)
  • Performance validation: Load tested with 500 files at 50 concurrent requests - all processed successfully

Trade-offs:

  • ✅ Optimal for I/O operations (95%+ waiting time)
  • ✅ Low memory footprint (1000 uploads ≈ 100MB)
  • ✅ High scalability (event loop > thread pool)
  • ✅ Single-threaded event loop eliminates context switching overhead
  • ⚠️ Sync mocks with time.sleep() block event loop during retries (rare scenario)
  • ⚠️ Single CPU core (not an issue for I/O-bound work)

Alternatives Considered:

Threading:

  • ❌ Rejected: 10-80x memory overhead (1-8GB for 1000 uploads vs 100MB)
  • ❌ Python GIL prevents true parallelism for CPU work
  • ❌ Only beneficial if CSV processing >100ms per file (actual: <7ms)
  • ❌ Context switching overhead would slow down I/O operations

Conclusion: Asyncio provides optimal balance of throughput, memory efficiency, and simplicity for this I/O-bound workload. FastAPI's native asyncio support enables seamless integration without mixing concurrency paradigms.

5. Exponential Backoff in Mock Layer

Decision: Retry logic in mock classes (not wrapper classes)

Rationale:

  • Real cloud SDKs (boto3) have built-in retry
  • Mocks simulate realistic cloud behavior
  • Handles transient failures automatically

Configuration:

  • Max retries: 3
  • Initial delay: 2.0s
  • Exponential base: 2.0
  • Max delay: 10.0s
  • Retry delays: 2s, 4s, 8s

6. Exception Hierarchy

Decision: Two-level hierarchy with generic base classes, examples:

BlobStorageException → S3Error → NoSuchBucket
MetadataStorageException → DynamoDBError → ResourceNotFoundException

Rationale:

  • Wrapper layer uses generic exceptions (implementation-agnostic)
  • Mock layer uses specific exceptions (S3Error, DynamoDBError)
  • Easy to add new implementations (GCS, Azure, MongoDB)

7. Pydantic for Validation

Decision: Pydantic models for CSV validation

Rationale:

  • Declarative, type-safe
  • Automatic type conversion
  • Clear error messages
  • Industry best practice

📡 API Endpoints

POST /upload

Upload CSV file, returns file_id immediately (202 Accepted)

Request:

curl -X POST "http://localhost:8000/upload" -F "file=@sample_data/sensor_data.csv"

Response (202 Accepted):

{
  "file_id": "abc123...",
  "status": "pending",
  "message": "File uploaded successfully and queued for processing"
}

GET /results/{file_id}

Get processing results and metadata

Request:

curl "http://localhost:8000/results/abc123..."

Response (200 OK):

{
  "file_id": "abc123...",
  "status": "processed",
  "filename": "sensor_data.csv",
  "upload_time": "2025-12-07T10:30:00Z",
  "processing_time": "2025-12-07T10:30:05Z",
  "total_rows": 1000,
  "valid_rows": 980,
  "invalid_rows": 20,
  "aggregated_data": {
    "sensor_001": {"avg": 25.5, "min": 20.0, "max": 30.0, "count": 100},
    "sensor_002": {"avg": 18.2, "min": 15.0, "max": 22.0, "count": 98}
  }
}

GET /status/{file_id}

Get processing status only

Request:

curl "http://localhost:8000/status/abc123..."

Response (200 OK):

{
  "file_id": "abc123...",
  "status": "processing"
}

Status values: pending, processing, processed, partial, failed


🛡️ Error Handling

Multi-layer approach:

  1. Pydantic validation - Type safety
  2. Row-level - Single row failure doesn't crash file
  3. Exponential backoff - Retries transient errors (3 attempts)
  4. Graceful degradation - Partial results stored

Status values: pending, processing, processed, partial, failed


📁 Project Structure

app/
├── cloud_services_mock/     # Custom S3 & DynamoDB mocks
├── persistence/             # Storage wrappers with interfaces
├── services/                # Business logic (CSV processing)
├── utils/                   # Validators, retry decorator, logger
├── main.py                  # FastAPI application
├── constants.py             # Constants and enums
├── exceptions.py            # Exception hierarchy
├── main.py                  # FastAPI application
├── models.py                # Models and schemas
└── config.py                # Configuration

sample_data/                 # Sample CSV files
ui/                          # Streamlit Web UI
tests/                       # Unit tests
build/                       # Docker files
load_test.py                 # Load testing client
run.py                       # Run API service
run_all.py                   # Run API + UI services
requirements.txt             # Python dependencies
README.md                    # This documentation

Note: The mock implementations include additional AWS SDK methods (list_objects_v2, scan, delete_table, list_tables, delete_file, delete_metadata) beyond current requirements. These are intentionally implemented for API completeness, future extensibility, and production parity with real AWS services.


🧪 Unit Tests

The project includes comprehensive unit tests using Python's unittest framework.

Test Coverage:

  • test_validators.py - CSV row validation (5 tests)
  • test_data_processor.py - CSV processing and aggregation (4 tests including stress test with 1000 rows)
  • test_storage.py - Mock S3 and DynamoDB operations (6 tests)
  • test_file_handler.py - Async file upload and processing (5 tests)

Run all tests:

python -m unittest discover tests -v

Run specific test file:

python -m unittest tests.test_validators
python -m unittest tests.test_data_processor
python -m unittest tests.test_file_handler

Total: 20 unit tests covering validators, data processing, storage mocks, and async workflows.


🔥 Load Testing

The project includes a load testing client to simulate high concurrency and stress test the service. Used for benchmarking and validating performance under load.

Load Test Tool:

# Basic load test: 50 files, 10 concurrent requests
python load_test.py --files 50 --concurrency 10

# Stress test: 100 large files, 20 concurrent requests
python load_test.py --files 100 --concurrency 20 --size large

# Test with error handling: include invalid rows
python load_test.py --files 30 --concurrency 10 --errors

# Wait for processing completion and show results
python load_test.py --files 50 --concurrency 15 --wait

# Extreme stress test: 500 small files
python load_test.py --files 500 --concurrency 50 --size small

Options:

  • --files N - Number of files to upload (default: 50)
  • --concurrency N - Max concurrent requests (default: 10)
  • --size - CSV file size: small (50 rows), medium (200 rows), large (1000 rows)
  • --errors - Include files with invalid rows for error testing
  • --wait - Wait for all files to be processed and show results
  • --api-url URL - Custom API URL (default: http://localhost:8000)

Metrics Provided:

  • Upload rate (files/sec)
  • Success/failure counts
  • Response times (min/avg/max)
  • Processing wait times (if --wait used)

📝 Implementation Notes

Python Version

  • Requires Python 3.12+ for optimal compatibility
  • Async/await syntax with modern asyncio patterns

Mocked Objects Design

  • Sync mocks with async orchestration: The mock classes (S3, DynamoDB) use synchronous methods wrapped with exponential backoff retry decorator, while the orchestration layer (FileHandler) uses asyncio. This design intentionally mirrors real AWS SDK behavior where boto3 clients are synchronous but applications orchestrate them asynchronously.
  • Why mocks are synchronous: Real AWS SDK (boto3) provides synchronous APIs. Keeping mocks synchronous ensures:
    • Realistic simulation of production behavior
    • Easy migration path (swap mocks with real boto3 clients)
    • In-memory operations are non-blocking (microseconds, no real I/O)
  • Production migration: When deploying with real AWS services, wrap boto3 calls with asyncio.to_thread() or use aioboto3 (async wrapper) to maintain async orchestration while calling synchronous boto3 methods
  • Thread-safe in-memory storage: All mocks use threading.Lock for concurrent access protection
  • Realistic error simulation: Mocks raise proper AWS exceptions to test error handling paths

Thread-Safety

  • Singleton pattern with locks: All shared objects (MockS3Client, MockDynamoDBResource, BlobStorage, MetadataStorage, FileHandler) implement thread-safe singleton initialization using threading.Lock
  • Current deployment: Single uvicorn worker with asyncio (single-threaded) - locks not strictly needed but included for production readiness
  • Future-proof design: Ready for multi-worker deployments (Gunicorn with threads, ThreadPoolExecutor) without code changes
  • Defensive programming: Prevents subtle bugs if deployment configuration changes
  • Minimal overhead: Lock acquisition is fast (~nanoseconds) when uncontended

Logging

  • UTC timestamps: All logs use UTC time for consistency across deployments
  • Timestamped log files: Format app_YYYY-MM-DD_HH-MM-SS.log in logs/ directory
  • Method-level tracking: Test logs include method names for debugging

Concurrency Configuration

  • Default limit: 10 concurrent tasks (configurable in config.py)
  • Rationale: Prevents resource exhaustion while maintaining throughput
  • Scalability: Can be increased for production based on infrastructure capacity

Port Configuration

  • Centralized in app/config.py: All port numbers defined as constants
  • FastAPI backend: api_port = 8000 (configurable via environment variable PORT)
  • Streamlit UI: ui_port = 8501 (configurable via environment variable UI_PORT)
  • Consistency: All scripts (run.py, run_all.py, ui/app.py) use config constants
  • Override via .env file:
    # .env file
    PORT=9001
    UI_PORT=9501

CORS (Cross-Origin Resource Sharing)

  • Why needed: Streamlit UI (port 8501) and FastAPI backend (port 8000) run on different origins
  • Browser security: Browsers block cross-origin requests by default; CORS middleware allows them
  • Development config: allowed_origins = ["*"] permits all origins for easy testing
  • Production recommendation: Restrict to specific domains:
    allowed_origins = [
        "http://localhost:8501",      # Local UI
        "https://yourdomain.com",     # Production UI
    ]
  • Docker support: Wildcard allows communication between containers with different hostnames

Sample Data

  • sample_data/sensor_data.csv - Valid data for testing
  • sample_data/sensor_data_with_errors.csv - Mixed valid/invalid rows for error handling testing

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published