This is a distributed multi-agent system for document processing that follows an event-driven architecture pattern. The system processes images through a complete OCR → Index → Retrieve → Report pipeline, where independent agents communicate exclusively via Kafka message queues. Each agent specializes in a specific task: OCR extraction, document indexing, information retrieval, and report generation with self-reflection capabilities.
Preferred communication style: Simple, everyday language.
The system implements a decentralized microservices architecture where agents operate independently and communicate through Kafka topics. This design ensures loose coupling, scalability, and fault tolerance.
Event-Driven Communication
- All inter-agent communication happens via Kafka message queues
- Each agent subscribes to specific topics and publishes results to downstream topics
- Messages follow a standardized schema with event type, document ID, trace ID, and payload
Agent Pipeline Structure
- OCR Agent (
agents/ocr_agent.py) - Extracts text from images using OpenCV and Tesseract - Index Agent (
agents/index_agent.py) - Stores extracted text in ChromaDB vector database - Retrieval Agent (
agents/retrieval_agent.py) - Performs semantic search across indexed documents - Report Agent (
agents/report_agent.py) - Generates structured reports using AutoGen's writer-critic pattern
Image Upload → OCR → Vector Indexing → Query Processing → Report Generation
Topic-Based Messaging
ingest.images- Image processing requestsocr.text- Extracted text resultsrag.query- Retrieval queriesrag.answer- Search results with contextreport.final- Generated reports
Vector Database (ChromaDB)
- Persistent storage for document embeddings
- Uses SentenceTransformer embeddings (all-MiniLM-L6-v2 model)
- Cosine similarity search for semantic retrieval
File System Storage
- Image uploads stored in local filesystem
- ChromaDB persistence directory for vector data
Flask Web Application (web_app.py)
- RESTful API endpoints for file upload and processing
- Real-time processing status updates
- Integration bridge to Kafka system (
kafka_web_bridge.py)
Multi-Level Testing
- Integration tests (
integration_test.py) - End-to-end pipeline validation - Mock Kafka implementation (
mock_kafka.py) - Testing without Docker dependencies - Comprehensive test suite (
test_suite.py) - Component and communication testing
AutoGen Multi-Agent Framework
- Writer-critic self-reflection loop for report quality improvement
- GPT-5 model integration for natural language generation
- Structured prompt engineering for enterprise report generation
- Apache Kafka - Primary message broker for agent communication
- Confluent Kafka Python Client - Kafka producer/consumer implementation
- Docker Compose - Kafka cluster orchestration (production deployment)
- OpenCV (cv2) - Image preprocessing and manipulation
- Tesseract OCR (pytesseract) - Optical character recognition engine
- Pillow (PIL) - Python image processing library
- ChromaDB - Vector database for document storage and similarity search
- SentenceTransformers - Embedding model for semantic text representation
- Transformers Library - Neural network models for text processing
- AutoGen - Multi-agent conversation framework
- OpenAI API - GPT-5 language model for report generation
- OpenAI Python Client - API integration library
- Flask - Python web framework for API endpoints
- Flask-CORS - Cross-origin resource sharing support
- Werkzeug - WSGI web application utilities
- Python-dotenv - Environment variable management
- Pydantic - Data validation and serialization
- UUID - Unique identifier generation for document tracking
- Mock implementations - Fallback systems for testing without full infrastructure
- File-based queues - Alternative to Kafka for development environments