Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions autogpt_platform/backend/.env.default
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,41 @@ DATABASE_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME
DIRECT_URL="postgresql://${DB_USER}:${DB_PASS}@${DB_HOST}:${DB_PORT}/${DB_NAME}?schema=${DB_SCHEMA}&connect_timeout=${DB_CONNECT_TIMEOUT}"
PRISMA_SCHEMA="postgres/schema.prisma"

# SQLAlchemy Configuration (for gradual migration from Prisma)
# Set to true to enable SQLAlchemy alongside Prisma (both ORMs coexist during migration)
ENABLE_SQLALCHEMY=false

# Connection Pool Configuration
# IMPORTANT: With 6 backend processes, total connections = 6 × (POOL_SIZE + MAX_OVERFLOW)
# Must stay under PostgreSQL max_connections (default: 100)
#
# Environment-specific recommendations:
# Development: POOL_SIZE=2-3, MAX_OVERFLOW=1-2 (lightweight, fast startup)
# Test/CI: POOL_SIZE=2, MAX_OVERFLOW=1 (minimal resources, parallel test safety)
# Production: POOL_SIZE=10-20, MAX_OVERFLOW=5-10 (handle real traffic and bursts)
#
# Default values below are suitable for production use:
SQLALCHEMY_POOL_SIZE=10
SQLALCHEMY_MAX_OVERFLOW=5

# Timeout Configuration
# POOL_TIMEOUT: How long to wait for an available connection from the pool (when all connections busy)
# CONNECT_TIMEOUT: How long to wait when establishing a NEW connection to PostgreSQL
#
# Environment-specific recommendations:
# Development: POOL_TIMEOUT=10-30s, CONNECT_TIMEOUT=5-10s
# Test/CI: POOL_TIMEOUT=5-10s, CONNECT_TIMEOUT=5-10s (fail fast)
# Production: POOL_TIMEOUT=30s, CONNECT_TIMEOUT=10-15s
#
# Default values below are suitable for production use:
SQLALCHEMY_POOL_TIMEOUT=30
SQLALCHEMY_CONNECT_TIMEOUT=10

# SQL Query Logging
# Set to true to log ALL SQL statements (very verbose, useful for debugging)
# Should always be false in production
SQLALCHEMY_ECHO=false

## ===== REQUIRED SERVICE CREDENTIALS ===== ##
# Redis Configuration
REDIS_HOST=localhost
Expand Down
233 changes: 233 additions & 0 deletions autogpt_platform/backend/backend/data/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
SQLAlchemy infrastructure for AutoGPT Platform.

This module provides:
1. Async engine creation with connection pooling
2. Session factory for dependency injection
3. Database lifecycle management
"""

import logging
import re
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)

from backend.util.settings import Config

logger = logging.getLogger(__name__)

# ============================================================================
# CONFIGURATION
# ============================================================================


def get_database_url() -> str:
"""
Extract database URL from environment and convert to async format.

Prisma URL: postgresql://user:pass@host:port/db?schema=platform&connect_timeout=60
Async URL: postgresql+asyncpg://user:pass@host:port/db

Returns the async-compatible URL without query parameters (handled via connect_args).
"""
prisma_url = Config().database_url

# Replace postgresql:// with postgresql+asyncpg://
async_url = prisma_url.replace("postgresql://", "postgresql+asyncpg://")

# Remove ALL query parameters (schema, connect_timeout, etc.)
# We'll handle these through connect_args instead
async_url = re.sub(r"\?.*$", "", async_url)

return async_url
Comment on lines +39 to +49
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Empty DATABASE_URL with enable_sqlalchemy=true causes unhandled ArgumentError during startup.
Severity: HIGH | Confidence: High

🔍 Detailed Analysis

When enable_sqlalchemy=true is set and the DATABASE_URL environment variable is not provided, the Config().database_url defaults to an empty string. The get_database_url() function then attempts to create an engine with this empty URL, causing SQLAlchemy to raise an ArgumentError. This ArgumentError is not caught by the existing exception handlers in database.py or rest_api.py, leading to an unhandled exception and application crash during startup.

💡 Suggested Fix

Validate that database_url is not empty before calling create_async_engine(), or add ArgumentError to the list of caught exceptions, or set a sensible fallback for database_url.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: autogpt_platform/backend/backend/data/sqlalchemy.py#L31-L49

Potential issue: When `enable_sqlalchemy=true` is set and the `DATABASE_URL` environment
variable is not provided, the `Config().database_url` defaults to an empty string. The
`get_database_url()` function then attempts to create an engine with this empty URL,
causing SQLAlchemy to raise an `ArgumentError`. This `ArgumentError` is not caught by
the existing exception handlers in `database.py` or `rest_api.py`, leading to an
unhandled exception and application crash during startup.

Did we get this right? 👍 / 👎 to inform future reviews.
Reference_id: 2841823



def get_database_schema() -> str:
"""
Extract schema name from DATABASE_URL query parameter.

Returns 'platform' by default (matches Prisma configuration).
"""
prisma_url = Config().database_url
match = re.search(r"schema=(\w+)", prisma_url)
return match.group(1) if match else "platform"


# ============================================================================
# ENGINE CREATION
# ============================================================================


def create_engine() -> AsyncEngine:
"""
Create async SQLAlchemy engine with connection pooling.

This should be called ONCE per process at startup.
The engine is long-lived and thread-safe.

Connection Pool Configuration:
- pool_size: Number of persistent connections (default: 10)
- max_overflow: Additional connections when pool exhausted (default: 5)
- pool_timeout: Seconds to wait for connection (default: 30)
- pool_pre_ping: Test connections before using (prevents stale connections)

Total max connections = pool_size + max_overflow = 15
"""
url = get_database_url()
config = Config()

engine = create_async_engine(
url,
# Connection pool configuration
pool_size=config.sqlalchemy_pool_size, # Persistent connections
max_overflow=config.sqlalchemy_max_overflow, # Burst capacity
pool_timeout=config.sqlalchemy_pool_timeout, # Wait time for connection
pool_pre_ping=True, # Validate connections before use
# Async configuration
echo=config.sqlalchemy_echo, # Log SQL statements (dev/debug only)
future=True, # Use SQLAlchemy 2.0 style
# Connection arguments (passed to asyncpg)
connect_args={
"server_settings": {
"search_path": get_database_schema(), # Use 'platform' schema
},
"timeout": config.sqlalchemy_connect_timeout, # Connection timeout
},
)

logger.info(
f"SQLAlchemy engine created: pool_size={config.sqlalchemy_pool_size}, "
f"max_overflow={config.sqlalchemy_max_overflow}, "
f"schema={get_database_schema()}"
)

return engine


# ============================================================================
# SESSION FACTORY
# ============================================================================


def create_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
"""
Create session factory for creating AsyncSession instances.

The factory is configured once, then used to create sessions on-demand.
Each session represents a single database transaction.

Args:
engine: The async engine (with connection pool)

Returns:
Session factory that creates properly configured AsyncSession instances
"""
return async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False, # Don't expire objects after commit
autoflush=False, # Manual control over when to flush
autocommit=False, # Explicit transaction control
)


# ============================================================================
# DEPENDENCY INJECTION FOR FASTAPI
# ============================================================================

# Global references (set during app startup)
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None


def initialize(engine: AsyncEngine) -> None:
"""
Initialize global engine and session factory.

Called during FastAPI lifespan startup.

Args:
engine: The async engine to use for this process
"""
global _engine, _session_factory
_engine = engine
_session_factory = create_session_factory(engine)
logger.info("SQLAlchemy session factory initialized")


@asynccontextmanager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency that provides database session.

Usage in routes:
@router.get("/users/{user_id}")
async def get_user(
user_id: int,
session: AsyncSession = Depends(get_session)
):
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()

Usage in DatabaseManager RPC methods:
@expose
async def get_user(user_id: int):
async with get_session() as session:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()

Lifecycle:
1. Request arrives
2. FastAPI calls this function (or used as context manager)
3. Session is created (borrows connection from pool)
4. Session is injected into route handler
5. Route executes (may commit/rollback)
6. Route returns
7. Session is closed (returns connection to pool)

Error handling:
- If exception occurs, session is rolled back
- Connection is always returned to pool (even on error)
"""
if _session_factory is None:
raise RuntimeError(
"SQLAlchemy not initialized. Call initialize() in lifespan context."
)

# Create session (borrows connection from pool)
async with _session_factory() as session:
try:
yield session # Inject into route handler or context manager
# If we get here, route succeeded - commit any pending changes
await session.commit()
except Exception:
# Error occurred - rollback transaction
await session.rollback()
raise
finally:
# Always close session (returns connection to pool)
await session.close()


async def dispose() -> None:
"""
Dispose of engine and close all connections.

Called during FastAPI lifespan shutdown.
Closes all connections in the pool gracefully.
"""
global _engine, _session_factory

if _engine is not None:
logger.info("Disposing SQLAlchemy engine...")
await _engine.dispose()
_engine = None
_session_factory = None
logger.info("SQLAlchemy engine disposed")
Loading
Loading