Skip to content

Conversation

@jennsun
Copy link
Contributor

@jennsun jennsun commented Dec 12, 2025

Custom Session Implementation for short-term / long term memory on Databricks with connection pooling/token caching and rotation handled with LakebasePool capabilities

You can implement your own session memory by creating a class that follows the Session protocol:
https://openai.github.io/openai-agents-python/sessions/#custom-session-implementations

More on Session protocol:
https://openai.github.io/openai-agents-python/ref/memory/session/#agents.memory.session.Session

Before:

from agents.extensions.memory import SQLAlchemySession
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

def get_engine() -> AsyncEngine:
    """
    Get or create the async database engine.
    
    For Lakebase, this will check if the token is still valid and
    recreate the engine if the token has expired.
    """
    global _engine, _cache_ts

    # Check if we need to recreate the engine due to token expiration
    if _engine is not None and _instance_name is not None:
        with _cache_lock:
            now = time.time()
            if _cache_ts and (now - _cache_ts) >= DEFAULT_TOKEN_CACHE_DURATION_SECONDS:
                # Token expired, dispose old engine
                logger.info("Lakebase token expired, recreating engine")
                _engine.sync_engine.dispose()
                _engine = None

    if _engine is None:
        _engine = create_async_engine(
            get_database_url(),
            echo=False,  # Set to True for SQL debugging
            pool_pre_ping=True,
        )
    return _engine

    engine = get_engine()
    session = SQLAlchemySession(
        session_id=resolved_thread_id,
        engine=engine,
        create_tables=create_tables,
    )
   result = await Runner.run(agent, user_message, session=session)

By using LakebasePool in our custom Session class implementation, we do not need to create a SQLAlchemy engine from the OpenAI Agents SDK example and can leverage the token caching/connection pooling logic Lakebase Pool offers already. Our Session class will have a cached pool associated with each Session instance so there aren't new pools created per session/user does not have to manage the pool

from databricks_openai.agents.session import LakebaseSession

session = LakebaseSession(
        session_id=resolved_thread_id,
        instance_name=LAKEBASE_INSTANCE_NAME,
    )

result = await Runner.run(agent, user_message, session=session)

local testing + app testing

Notebook with smoke tests, CRUD operations tests, and integration tests:
https://eng-ml-agent-platform.staging.cloud.databricks.com/editor/notebooks/1816672511960993?o=2850744067564480#command/8078768795088187

App: https://eng-ml-agent-platform.staging.cloud.databricks.com/apps/j-openai-stateful?o=2850744067564480

image image

Ensuring agent messages are logged in lakebase instance:
image

Unit tests:
python -m pytest integrations/openai/tests/unit_tests/test_session.py -v

@jennsun jennsun changed the title [draft] add LakebaseSession to databricks-openai for Session Protocol for Memory Management Add LakebaseSession Class that follows the Session protocol for Memory Management to databricks-openai Dec 12, 2025
@jennsun jennsun changed the title Add LakebaseSession Class that follows the Session protocol for Memory Management to databricks-openai databricks-openai: Add LakebaseSession Class that follows the Session protocol for Memory Management Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants