Skip to content

Real time data analytics ingestion with C++ / kafka / clickhouse to learn more about finance basics

License

Notifications You must be signed in to change notification settings

Hichemchir/market_data_flow

Repository files navigation

Real-Time Financial Data Analytics Pipeline

High-performance real-time market data pipeline demonstrating modern data engineering practices with C++, Python, Apache Kafka, and ClickHouse. Ingests live stock market trades from Finnhub WebSocket API and processes them into a columnar time-series database for analytics.

Architecture

Finnhub WebSocket → C++ Ingestor → Kafka → Python Processor → ClickHouse
                        (IXWebSocket)    (Topic: stock_trades)   (Materialized Views)

Components

  1. C++ Ingestor (ingestor-cpp/)

    • WebSocket client connecting to Finnhub real-time stock trades API
    • High-performance message producer using librdkafka
    • Snappy compression for network efficiency
    • Handles 8 major symbols: AAPL, MSFT, GOOGL, TSLA, NVDA, AMZN, META, SPY
  2. Apache Kafka

    • Message broker with topic stock_trades
    • 3 partitions for scalability
    • Snappy compression enabled
  3. Python Processor (processor-python/)

    • Kafka consumer with batch processing (100 trades/batch)
    • Type-safe data models using Pydantic
    • Structured logging with structlog
    • Resilient error handling and graceful shutdown
  4. ClickHouse

    • Columnar database optimized for analytical queries
    • Materialized views for real-time OHLCV aggregation (1-minute candles)
    • Daily statistics aggregation
    • Sub-second query performance on millions of trades

Prerequisites

Quick Start

1. Environment Setup

Create a .env file in the project root:

# Finnhub API Configuration
FINNHUB_API_KEY=your_api_key_here

# Kafka Configuration (optional, defaults provided)
KAFKA_BOOTSTRAP_SERVERS=localhost:9092

# Processor Configuration (optional)
PROCESSOR_CLICKHOUSE_HOST=localhost
PROCESSOR_CLICKHOUSE_PORT=8123
PROCESSOR_CLICKHOUSE_DATABASE=stock_data
PROCESSOR_CLICKHOUSE_TABLE=trades
PROCESSOR_BATCH_SIZE=100

2. Infrastructure Startup

# Start Kafka, Zookeeper, ClickHouse, and Kafka UI
docker-compose up -d

# Verify all services are running
docker-compose ps

ClickHouse schema is automatically initialized on startup from clickhouse-config/clickhouse-schema-stocks.sql.

3. Build and Run C++ Ingestor

Local build (macOS/Linux):

cd ingestor-cpp

# Install librdkafka (macOS)
brew install librdkafka

# Build
mkdir -p build && cd build
cmake ..
make

# Run
./ingestor

Docker:

docker-compose up finnhub-ingestor

4. Run Python Processor

cd processor-python

# Create virtual environment and install dependencies
python -m venv venv
source venv/bin/activate  
pip install -e ".[dev]"

# Run processor
python -m src.main

Data Verification

ClickHouse Queries

Connect to ClickHouse CLI:

docker exec -it clickhouse clickhouse-client

Example analytical queries:

-- Select database
USE stock_data;

-- Count total trades
SELECT count() FROM trades;

-- Trades by symbol
SELECT symbol, count() as trade_count, avg(price) as avg_price
FROM trades
GROUP BY symbol
ORDER BY trade_count DESC;

-- 1-minute OHLCV candles
SELECT *
FROM ohlcv_1m
WHERE symbol = 'AAPL'
ORDER BY period_start DESC
LIMIT 10;

-- Daily statistics
SELECT *
FROM daily_stats
WHERE symbol IN ('AAPL', 'TSLA', 'NVDA')
ORDER BY date DESC, symbol;

Kafka Verification

Access Kafka UI at http://localhost:8080 to monitor:

  • Topic: stock_trades
  • Message throughput
  • Consumer lag
  • Partition distribution

Project Structure

market_data_flow/
├── ingestor-cpp/           # C++ WebSocket to Kafka ingestor
│   ├── src/
│   │   └── main.cpp        # Main ingestor logic
│   ├── CMakeLists.txt      # Build configuration
│   └── Dockerfile
├── processor-python/       # Python Kafka to ClickHouse processor
│   ├── src/
│   │   ├── main.py         # Entry point
│   │   ├── kafka_consumer.py
│   │   ├── clickhouse_writer.py
│   │   └── models.py       # Pydantic data models
│   └── pyproject.toml
├── clickhouse-config/
│   └── clickhouse-schema-stocks.sql  # Database schema
└── docker-compose.yml      # Infrastructure orchestration

Performance Characteristics

  • Latency: Sub-100ms from WebSocket message to ClickHouse insert
  • Throughput: Handles 1000+ trades/second
  • Batch Processing: 100 trades per ClickHouse insert for efficiency
  • Compression: Snappy compression reduces Kafka bandwidth by ~40%
  • Query Performance: Sub-second aggregations on millions of trades

About

Real time data analytics ingestion with C++ / kafka / clickhouse to learn more about finance basics

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published