Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/tetra_rp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ServerlessEndpoint,
runpod,
NetworkVolume,
FlashApp,
)


Expand All @@ -40,4 +41,5 @@
"ServerlessEndpoint",
"runpod",
"NetworkVolume",
"FlashApp",
]
154 changes: 154 additions & 0 deletions src/tetra_rp/build/gpu_workers/strategies/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
Base deployment strategy interface with Pydantic models.

Defines the abstract interface that all deployment strategies must implement.
"""

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, ConfigDict


class StrategyType(str, Enum):
"""Available deployment strategy types."""

IMAGE = "image" # Full Docker image build with baked code
TARBALL = "tarball" # Tarball download at runtime


class DeploymentArtifact(BaseModel):
"""
Artifact produced by deployment strategy.

This represents what gets deployed (image reference, tarball path, etc.)
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

strategy_type: StrategyType
artifact_reference: str = Field(
..., description="Main artifact reference (image name, tarball key, etc.)"
)
metadata: Dict[str, Any] = Field(
default_factory=dict, description="Strategy-specific metadata"
)
size_bytes: Optional[int] = Field(None, description="Artifact size in bytes")
code_hash: Optional[str] = Field(None, description="Hash of deployed code")


class DeployedResource(BaseModel):
"""
Result of deploying a resource to a platform.

Contains information about the deployed resource.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

success: bool
resource_id: Optional[str] = None
endpoint_url: Optional[str] = None
artifact: DeploymentArtifact
message: str
metadata: Dict[str, Any] = Field(default_factory=dict)


class DeploymentConfig(BaseModel):
"""
Base configuration for deployment strategies.

Contains common configuration shared across all strategies.
"""

model_config = ConfigDict(arbitrary_types_allowed=True)

strategy_type: StrategyType
base_image: str = Field(
default="runpod/worker-v1-tetra:latest",
description="Base Docker image for GPU workers",
)
dependencies: List[str] = Field(
default_factory=list, description="Python dependencies"
)
env_vars: Dict[str, str] = Field(
default_factory=dict, description="Environment variables"
)
platform: str = Field(default="linux/amd64", description="Docker platform")


class DeploymentStrategy(ABC):
"""
Abstract base class for deployment strategies.

All deployment strategies must implement this interface.

Strategy Pattern:
- ImageBuildStrategy: Bakes code into Docker image
- TarballStrategy: Creates tarball, downloads at runtime
- Future: S3Strategy, GitStrategy, HybridStrategy, etc.
"""

def __init__(self, config: DeploymentConfig):
"""
Initialize strategy with configuration.

Args:
config: Deployment configuration
"""
self.config = config

@abstractmethod
def _validate_config(self) -> None:
"""
Validate strategy-specific configuration.

Raises:
ValueError: If configuration is invalid
"""
pass

@abstractmethod
async def prepare_deployment(
self, func_or_class: Any, name: str
) -> DeploymentArtifact:
"""
Prepare deployment artifact (image, tarball, etc.).

Args:
func_or_class: Python function or class to deploy
name: Name for the deployment

Returns:
DeploymentArtifact with artifact reference and metadata

Raises:
RuntimeError: If preparation fails
"""
pass

@abstractmethod
async def apply_to_resource(
self, resource_config: Any, artifact: DeploymentArtifact
) -> Any:
"""
Apply deployment artifact to resource configuration.

Modifies resource_config in place to use the deployment artifact.

Args:
resource_config: Resource configuration to modify
artifact: Deployment artifact to apply

Returns:
Modified resource configuration
"""
pass

def get_strategy_type(self) -> StrategyType:
"""Get the strategy type."""
return self.config.strategy_type

def __repr__(self) -> str:
return f"{self.__class__.__name__}(strategy={self.config.strategy_type.value})"
30 changes: 16 additions & 14 deletions src/tetra_rp/cli/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys
import tarfile
import asyncio
from pathlib import Path

import typer
Expand All @@ -14,6 +15,8 @@
from rich.table import Table

from ..utils.ignore import get_file_tree, load_ignore_patterns
from ..utils.app import discover_flash_project
from tetra_rp.core.resources.app import FlashApp

console = Console()

Expand All @@ -31,6 +34,11 @@ def build_command(
output_name: str | None = typer.Option(
None, "--output", "-o", help="Custom archive name (default: archive.tar.gz)"
),
local_only: bool = typer.Option(
False,
"--local-only",
help="Only store build tarball locally and dont upload to Runpod",
),
):
"""
Build Flash application for deployment.
Expand Down Expand Up @@ -151,6 +159,11 @@ def build_command(
# Success summary
_display_build_summary(archive_path, app_name, len(files), len(requirements))

if local_only:
return

asyncio.run(upload_build(app_name, archive_path))

except KeyboardInterrupt:
console.print("\n[yellow]Build cancelled by user[/yellow]")
raise typer.Exit(1)
Expand All @@ -162,20 +175,9 @@ def build_command(
raise typer.Exit(1)


def discover_flash_project() -> tuple[Path, str]:
"""
Discover Flash project directory and app name.

Returns:
Tuple of (project_dir, app_name)

Raises:
typer.Exit: If not in a Flash project directory
"""
project_dir = Path.cwd()
app_name = project_dir.name

return project_dir, app_name
async def upload_build(app_name: str, build_path: str | Path):
app = await FlashApp.from_name(app_name)
await app.upload_build(build_path)


def validate_project_structure(project_dir: Path) -> bool:
Expand Down
94 changes: 94 additions & 0 deletions src/tetra_rp/cli/commands/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
import questionary
import asyncio
from typing import Optional

from ..utils.app import discover_flash_project
from ..utils.deployment import (
get_deployment_environments,
create_deployment_environment,
Expand All @@ -15,8 +19,98 @@
get_environment_info,
)

from tetra_rp.core.resources import FlashApp

console = Console()

deploy_app = typer.Typer(
short_help="Deploy flash application code to managed environments on Runpod"
)


@deploy_app.callback(invoke_without_command=True)
def deploy_callback(
ctx: typer.Context,
app_name: str = typer.Option(
None, "--app-name", "-a", help="Flash app name to deploy a build for"
),
env_name: str = typer.Option(
"dev",
"--env-name",
"-e",
help="Flash env name to deploy a build to. If not provided, default to 'dev'",
),
build_id: str = typer.Option(
None,
"--build-id",
"-b",
help="Flash build id to deploy to env. If not provided, default to most recently uploaded build",
),
):
if ctx.invoked_subcommand is None:
deploy_build_sync(app_name, env_name, build_id)


def deploy_build_sync(app_name: str, env_name: str, build_id: str):
"""
Convenience wrapper to run async methods from cli
"""
asyncio.run(deploy_build(app_name, env_name, build_id))


async def deploy_build(
app_name: Optional[str] = "",
env_name: Optional[str] = "dev",
build_id: Optional[str] = None,
):
target_env = env_name or "dev"
progress_columns = [
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
]

with Progress(*progress_columns, console=console) as progress:
task = progress.add_task("Preparing deployment", start=True)

if not app_name:
progress.update(task, description="Discovering Flash project...")
_, app_name = discover_flash_project()
progress.update(task, description=f"Detected Flash app '{app_name}'")

progress.update(task, description=f"Fetching Flash app '{app_name}'...")
app = await FlashApp.from_name(app_name)

if not build_id:
progress.update(task, description="Fetching available builds...")
builds = await app.list_builds()
if not builds:
progress.update(task, description="No builds found for app")
progress.stop_task(task)
raise ValueError("No builds found for app. Run 'flash build' first.")
build_id = builds[0]["id"]
progress.update(task, description=f"Using latest build {build_id}")

progress.update(
task,
description=f"Promoting build {build_id} to environment '{target_env}'...",
)
result = await app.deploy_build_to_environment(
build_id, environment_name=target_env
)
progress.update(
task,
description=f"[green]✓ Promoted build {build_id} to '{target_env}'",
)
progress.stop_task(task)

panel_content = (
f"Promoted build [bold]{build_id}[/bold] to environment [bold]{target_env}[/bold]\n"
f"Flash app: [bold]{app_name}[/bold]"
)
console.print(Panel(panel_content, title="Deployment Promotion", expand=False))

return result


def list_command():
"""Show available deployment environments."""
Expand Down
13 changes: 13 additions & 0 deletions src/tetra_rp/cli/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from tetra_rp.core.resources.app import FlashApp
import asyncio

from ..utils.skeleton import create_project_skeleton
from ..utils.conda import (
Expand All @@ -28,6 +30,11 @@
]


async def init_app(app_name: str):
app = await FlashApp.create(app_name)
await app.create_environment("dev")


def init_command(
project_name: str = typer.Argument(..., help="Project name"),
no_env: bool = typer.Option(
Expand All @@ -36,6 +43,9 @@ def init_command(
force: bool = typer.Option(
False, "--force", "-f", help="Overwrite existing directory"
),
local_only: bool = typer.Option(
False, "--local-only", help="Do not create remote app on Runpod"
),
):
"""Create new Flash project with Flash Server and GPU workers."""

Expand Down Expand Up @@ -95,6 +105,9 @@ def init_command(
"You can manually install dependencies: pip install -r requirements.txt"
)

if not local_only:
asyncio.run(init_app(project_name))

# Success output
panel_content = (
f"Flash project '[bold]{project_name}[/bold]' created successfully!\n\n"
Expand Down
Loading
Loading