Multiple DB pattern #603
-
|
Hi everyone. I'm looking to implement some of the best practices using advanced-alchemy repos/services. Curerntly, I'm using a single SQLAlchemyAsyncConfig with a custom RoutingSession to select the db_session based on a bind__key in the model. When the DI of the route for the db_session is called, it pulls the correct engine. What's the right way to do this if I'm using DI of a service/repo instead of a db_session? (minimal example) from dataclasses import dataclass, field
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlalchemy.orm import Session, Mapped, mapped_column
from advanced_alchemy.base import IdentityBase
from advanced_alchemy.mixins import UniqueMixin
from advanced_alchemy.extensions.litestar import (
SQLAlchemyAsyncConfig,
async_autocommit_before_send_handler,
)
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import String
from litestar.contrib.sqlalchemy.plugins import SQLAlchemyPlugin
from litestar.config.app import AppConfig
from litestar.plugins import InitPluginProtocol
from litestar import Litestar, Controller, get
#Settings
@dataclass
class DatabaseSettings:
...
CONN_STRING_BASE: str = field(default="mssql+aioodbc://sa:password@host/{}")
_engine_instance: dict[str, AsyncEngine] | None = None
"""SQLAlchemy engine instance generated from settings."""
@property
def engines(self) -> dict[str, AsyncEngine]:
return self.get_engines()
def get_engines(self) -> dict[str, AsyncEngine]:
if self._engine_instance is None:
_dbs = [
"db1"
"db2"
]
engines = {db: create_async_engine(self.CONN_STRING_BASE.format(db)) for db in _dbs}
engines["cache_db"] = create_async_engine("sqlite+aiosqlite:///cache.db")
engines["login_db"] = create_async_engine("sqlite+aiosqlite:///login.db")
self._engine_instance = engines
return self._engine_instance
@dataclass
class Settings:
db: DatabaseSettings= field(default_factory=DatabaseSettings)
def get_settings() -> Settings:
return Settings()
# db models
class BaseDb1(IdentityBase):
__bind_key__ = "db1"
__abstract__ = True
class Table1(BaseDb1, UniqueMixin):
__tablename__ = "table1"
data: Mapped[str] = mapped_column(String())
class BaseDb2(IdentityBase):
__bind_key__ = "db2"
__abstract__ = True
class Table2(BaseDb1, UniqueMixin):
__tablename__ = "table2"
data: Mapped[str] = mapped_column(String())
# config
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if mapper:
return get_settings().db.engines[mapper.class_.__bind_key__].sync_engine
return None
session_factory = async_sessionmaker(sync_session_class=RoutingSession, expire_on_commit=False)
alchemy_configs = SQLAlchemyAsyncConfig(
engine_instance=get_settings().db.engines["cache_db"], # Never gets called right now
session_maker=session_factory,
before_send_handler=async_autocommit_before_send_handler,
create_all=False,
)
#controller1
class Controller1(Controller):
@get("/data1")
async def get_data1(self, db_session: AsyncSession) -> str:
...
@get("/data2")
async def get_data2(self, db_session: AsyncSession) -> str:
...
# plugins
alchemy = SQLAlchemyPlugin(config=alchemy_configs)
#core
class ApplicationCore(InitPluginProtocol):
def on_app_init(self, app_config: AppConfig) -> AppConfig:
app_config.route_handlers.extend(
[
Controller1
]
)
app_config.plugins.extend(
[
alchemy
]
)
# asgi
...
def create_app():
return Litestar(plugins=[ApplicationCore()])Any help would be hugely appreciated. Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
|
Ended up creating a lib to reuse for the multitude of controllers. Can someone gut-check this? Is there a better way more in line with AA's patterns? from typing import AsyncGenerator, TypeVar
from sqlalchemy.ext.asyncio import AsyncSession
from advanced_alchemy.service import SQLAlchemyAsyncRepositoryService
from app.config.base import get_settings
T = TypeVar('T', bound=SQLAlchemyAsyncRepositoryService)
settings = get_settings()
def create_db_service_provider(service_class: type[T], db_name: str):
"""Create a service provider for a specific database."""
async def provider() -> AsyncGenerator[T, None]:
engine = settings.db.engines[db_name]
async with engine.begin() as conn:
async with AsyncSession(bind=conn, expire_on_commit=False) as session:
async with service_class.new(session=session) as service:
yield service
return provider |
Beta Was this translation helpful? Give feedback.
-
|
Figured it out. The key was to just use the binds parameter in AsyncSessionConfig: # config
-class RoutingSession(Session):
- def get_bind(self, mapper=None, clause=None):
- if mapper:
- return get_settings().db.engines[mapper.class_.__bind_key__].sync_engine
- return None
-
-session_factory = async_sessionmaker(sync_session_class=RoutingSession, expire_on_commit=False)
+session_config = AsyncSessionConfig(
+ expire_on_commit=False,
+ autoflush=False,
+ binds=settings.db.binds, # type: ignore[arg-type]
+)
alchemy_configs = SQLAlchemyAsyncConfig(
engine_instance=get_settings().db.engines["cache_db"], # Never gets called right now
- session_maker=session_factory,
+ session_config=settings.db.engines,
before_send_handler=async_autocommit_before_send_handler,
create_all=False,
) |
Beta Was this translation helpful? Give feedback.
Figured it out. The key was to just use the binds parameter in AsyncSessionConfig: