Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/tetra_rp/cli/commands/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ def clean_command(
task, description=f"Removing {resource.__class__.__name__}..."
)

# Remove resource (this will also clean up remotely if needed)
resource_manager.remove_resource(uid)
# Remove resource from tracking (does not delete remote resources)
resource_manager._remove_resource(uid)

progress.advance(task)
time.sleep(0.1) # Small delay for visual feedback
Expand Down
93 changes: 7 additions & 86 deletions src/tetra_rp/cli/commands/undeploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from ...core.resources.base import DeployableResource
from ...core.resources.resource_manager import ResourceManager
from ...core.api.runpod import RunpodGraphQLClient

console = Console()

Expand Down Expand Up @@ -130,47 +129,6 @@ def list_command():
)


async def _delete_endpoint(endpoint_id: str, resource_id: str, name: str) -> dict:
"""Delete an endpoint via RunPod API and remove from tracking.

Args:
endpoint_id: RunPod endpoint ID
resource_id: Local resource ID for tracking
name: Human-readable name

Returns:
Dict with success status and message
"""
try:
async with RunpodGraphQLClient() as client:
result = await client.delete_endpoint(endpoint_id)

if result.get("success"):
# Remove from tracking
manager = ResourceManager()
manager.remove_resource(resource_id)
return {
"success": True,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Successfully deleted endpoint '{name}' ({endpoint_id})",
}
else:
return {
"success": False,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Failed to delete endpoint '{name}' ({endpoint_id})",
}
except Exception as e:
return {
"success": False,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Error deleting endpoint '{name}': {str(e)}",
}


def _cleanup_stale_endpoints(
resources: Dict[str, DeployableResource], manager: ResourceManager
) -> None:
Expand Down Expand Up @@ -219,7 +177,7 @@ def _cleanup_stale_endpoints(
removed_count = 0
for resource_id, resource in inactive:
try:
manager.remove_resource(resource_id)
manager._remove_resource(resource_id)
removed_count += 1
console.print(
f"[green]✓[/green] Removed [cyan]{resource.name}[/cyan] from tracking"
Expand Down Expand Up @@ -359,24 +317,11 @@ def _undeploy_by_name(name: str, resources: dict):
raise typer.Exit(0)

# Delete endpoints
manager = ResourceManager()
with console.status("Deleting endpoint(s)..."):
results = []
for resource_id, resource in matches:
endpoint_id = getattr(resource, "id", None)
if not endpoint_id:
results.append(
{
"success": False,
"name": resource.name,
"endpoint_id": "N/A",
"message": f"Skipped '{resource.name}': No endpoint ID found",
}
)
continue

result = asyncio.run(
_delete_endpoint(endpoint_id, resource_id, resource.name)
)
result = asyncio.run(manager.undeploy_resource(resource_id, resource.name))
results.append(result)

# Show results
Expand Down Expand Up @@ -437,24 +382,12 @@ def _undeploy_all(resources: dict):
raise typer.Exit(0)

# Delete all endpoints
manager = ResourceManager()
with console.status(f"Deleting {len(resources)} endpoint(s)..."):
results = []
for resource_id, resource in resources.items():
endpoint_id = getattr(resource, "id", None)
name = getattr(resource, "name", "N/A")

if not endpoint_id:
results.append(
{
"success": False,
"name": name,
"endpoint_id": "N/A",
"message": f"Skipped '{name}': No endpoint ID found",
}
)
continue

result = asyncio.run(_delete_endpoint(endpoint_id, resource_id, name))
result = asyncio.run(manager.undeploy_resource(resource_id, name))
results.append(result)

# Show results
Expand Down Expand Up @@ -534,24 +467,12 @@ def _interactive_undeploy(resources: dict):
raise typer.Exit(0)

# Delete selected endpoints
manager = ResourceManager()
with console.status(f"Deleting {len(selected_resources)} endpoint(s)..."):
results = []
for resource_id, resource in selected_resources:
endpoint_id = getattr(resource, "id", None)
name = getattr(resource, "name", "N/A")

if not endpoint_id:
results.append(
{
"success": False,
"name": name,
"endpoint_id": "N/A",
"message": f"Skipped '{name}': No endpoint ID found",
}
)
continue

result = asyncio.run(_delete_endpoint(endpoint_id, resource_id, name))
result = asyncio.run(manager.undeploy_resource(resource_id, name))
results.append(result)

# Show results
Expand Down
9 changes: 9 additions & 0 deletions src/tetra_rp/core/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ def is_deployed(self) -> bool:
async def deploy(self) -> "DeployableResource":
"""Deploy the resource."""
raise NotImplementedError("Subclasses should implement this method.")

@abstractmethod
async def undeploy(self) -> bool:
"""Undeploy/delete the resource.

Returns:
True if successfully undeployed, False otherwise
"""
raise NotImplementedError("Subclasses should implement this method.")
15 changes: 15 additions & 0 deletions src/tetra_rp/core/resources/network_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,18 @@ async def deploy(self) -> "DeployableResource":
except Exception as e:
log.error(f"{self} failed to deploy: {e}")
raise

async def undeploy(self) -> bool:
"""
Undeploy network volume.

Returns:
True if successfully undeployed, False otherwise

Raises:
NotImplementedError: NetworkVolume undeploy is not yet supported
"""
raise NotImplementedError(
f"{self.__class__.__name__} undeploy is not yet supported. "
"Network volumes must be manually deleted via RunPod UI or API."
)
88 changes: 79 additions & 9 deletions src/tetra_rp/core/resources/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import cloudpickle
import logging
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from pathlib import Path

from ..exceptions import RunpodAPIKeyError
Expand Down Expand Up @@ -64,14 +64,13 @@ def _save_resources(self) -> None:
log.error(f"Failed to save resources to {RESOURCE_STATE_FILE}: {e}")
raise

def add_resource(self, uid: str, resource: DeployableResource):
"""Add a resource to the manager."""
def _add_resource(self, uid: str, resource: DeployableResource):
"""Add a resource to the manager (protected method for internal use)."""
self._resources[uid] = resource
self._save_resources()

# function to check if resource still exists remotely, else remove it
def remove_resource(self, uid: str):
"""Remove a resource from the manager."""
def _remove_resource(self, uid: str):
"""Remove a resource from the manager (protected method for internal use)."""
if uid not in self._resources:
log.warning(f"Resource {uid} not found for removal")
return
Expand Down Expand Up @@ -126,11 +125,11 @@ async def get_or_deploy_resource(
if existing := self._resources.get(uid):
if not existing.is_deployed():
log.warning(f"{existing} is no longer valid, redeploying.")
self.remove_resource(uid)
self._remove_resource(uid)
# Don't recursive call - deploy directly within the lock
deployed_resource = await self._deploy_with_error_context(config)
log.info(f"URL: {deployed_resource.url}")
self.add_resource(uid, deployed_resource)
self._add_resource(uid, deployed_resource)
return deployed_resource

log.debug(f"{existing} exists, reusing.")
Expand All @@ -141,7 +140,7 @@ async def get_or_deploy_resource(
log.debug(f"Deploying new resource: {uid}")
deployed_resource = await self._deploy_with_error_context(config)
log.info(f"URL: {deployed_resource.url}")
self.add_resource(uid, deployed_resource)
self._add_resource(uid, deployed_resource)
return deployed_resource

def list_all_resources(self) -> Dict[str, DeployableResource]:
Expand All @@ -166,3 +165,74 @@ def find_resources_by_name(self, name: str) -> List[Tuple[str, DeployableResourc
if hasattr(resource, "name") and resource.name == name:
matches.append((uid, resource))
return matches

async def undeploy_resource(
self, resource_id: str, resource_name: Optional[str] = None
) -> Dict[str, Any]:
"""Undeploy a resource and remove from tracking.

This is the public interface for removing resources. It calls the resource's
undeploy() method (polymorphic) and removes from tracking on success.

Args:
resource_id: The resource ID to undeploy
resource_name: Optional human-readable name for error messages

Returns:
Dict with keys:
- success: bool indicating if undeploy succeeded
- name: resource name (if available)
- endpoint_id: resource endpoint ID (if available)
- message: status message
"""
resource = self._resources.get(resource_id)

if not resource:
return {
"success": False,
"name": resource_name or "Unknown",
"endpoint_id": "N/A",
"message": f"Resource {resource_id} not found in tracking",
}

# Get resource metadata for response
name = resource_name or getattr(resource, "name", "Unknown")
endpoint_id = getattr(resource, "id", "N/A")

try:
# Call polymorphic undeploy method
success = await resource.undeploy()

if success:
# Remove from tracking on successful undeploy
self._remove_resource(resource_id)
return {
"success": True,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Successfully undeployed '{name}' ({endpoint_id})",
}
else:
return {
"success": False,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Failed to undeploy '{name}' ({endpoint_id})",
}

except NotImplementedError as e:
# Resource type doesn't support undeploy yet
return {
"success": False,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Cannot undeploy '{name}': {str(e)}",
}
except Exception as e:
# Unexpected error during undeploy
return {
"success": False,
"name": name,
"endpoint_id": endpoint_id,
"message": f"Error undeploying '{name}': {str(e)}",
}
27 changes: 27 additions & 0 deletions src/tetra_rp/core/resources/serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,33 @@ async def deploy(self) -> "DeployableResource":
log.error(f"{self} failed to deploy: {e}")
raise

async def undeploy(self) -> bool:
"""
Undeploys (deletes) the serverless endpoint.

Returns:
True if successfully undeployed, False otherwise
"""
if not self.id:
log.warning(f"{self} has no endpoint ID, cannot undeploy")
return False

try:
async with RunpodGraphQLClient() as client:
result = await client.delete_endpoint(self.id)
success = result.get("success", False)

if success:
log.info(f"{self} successfully undeployed")
else:
log.error(f"{self} failed to undeploy")

return success

except Exception as e:
log.error(f"{self} failed to undeploy: {e}")
return False

async def run_sync(self, payload: Dict[str, Any]) -> "JobOutput":
"""
Executes a serverless endpoint request with the payload.
Expand Down
Loading
Loading