Skip to content

psqlpy adapter #597

Open
Open
@sharkguto

Description

@sharkguto

Hi,

I have code an adapter for using psqlpy driver , as following code:

from databases.backends.postgres import PostgresBackend, ConnectionBackend
from databases.core import DatabaseURL
import psqlpy
from typing import Any, Dict, List, Optional
import asyncio


class PSQLPyConnectionBackend(ConnectionBackend):
    def __init__(self, pool: psqlpy.ConnectionPool):
        self._pool = pool
        self._connection = None
        self._lock = asyncio.Lock()

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.release()

    async def acquire(self) -> None:
        async with self._lock:
            if self._connection is None:
                self._connection = await self._pool.connection()

    async def release(self) -> None:
        async with self._lock:
            if self._connection is not None:
                self._connection = None  # Pool-managed, no explicit close needed

    async def fetch_all(self, query: str, values: Optional[Dict] = None) -> List[Any]:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)

        return cursor.result()

    async def fetch_one(
        self, query: str, values: Optional[Dict] = None
    ) -> Optional[Any]:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)

        return cursor.result()

    async def execute(self, query: str, values: Optional[Dict] = None) -> None:
        if self._connection is None:
            await self.acquire()
        await self._connection.execute(query, values or {})

    async def execute_many(self, query: str, values: List[Dict]) -> None:
        if self._connection is None:
            await self.acquire()
        for value in values:
            await self._connection.execute(query, value)

    async def iterate(self, query: str, values: Optional[Dict] = None) -> Any:
        if self._connection is None:
            await self.acquire()
        query_str = str(query)
        params = values if isinstance(values, (dict, list, tuple)) else []
        cursor = await self._connection.execute(query_str, params)
        async for row in cursor:
            yield dict(row)


class PSQLPyBackend(PostgresBackend):
    def __init__(self, database_url: DatabaseURL, **options: Any) -> None:
        super().__init__(database_url, **options)
        self._pool: Optional[psqlpy.ConnectionPool] = None
        self._url = database_url
        self._options = options

    async def connect(self) -> None:
        if self._pool is None:
            print("Connecting with PSQLPyBackend")
            self._pool = psqlpy.ConnectionPool(
                username=str(self._url.username or "postgres"),
                password=str(self._url.password or ""),
                host=str(self._url.hostname or "localhost"),
                port=int(self._url.port or 5432),
                db_name=str(self._url.database or ""),
                max_db_pool_size=self._options.get("max_size", 20),
            )

    async def disconnect(self) -> None:
        if self._pool is not None:
            print("Disconnecting PSQLPyBackend")
            await self._pool.close()
            self._pool = None

    def connection(self) -> PSQLPyConnectionBackend:
        if self._pool is None:
            raise RuntimeError("Database not connected. Call connect() first.")
        return PSQLPyConnectionBackend(self._pool)

how to use:

.....

Database.SUPPORTED_BACKENDS["psqlpy"] = "velejar.utils.psqlpy_adapter:PSQLPyBackend"

SQLALCHEMY_DATABASE_URI = f"psqlpy://{POSTGRES_USER}:{urllib.parse.quote_plus(POSTGRES_PASSWORD)}@{POSTGRES_SERVER}/{POSTGRES_DB}"

database = Database(
    SQLALCHEMY_DATABASE_URI,
    min_size=1,
    max_size=POSTGRES_POOL_SIZE,
)

It would be great if you guys implement this new driver: https://github.com/psqlpy-python/psqlpy

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions