diff --git a/autogpt_platform/backend/backend/TEST_DATA_README.md b/autogpt_platform/backend/backend/TEST_DATA_README.md new file mode 100644 index 000000000000..ef818df25030 --- /dev/null +++ b/autogpt_platform/backend/backend/TEST_DATA_README.md @@ -0,0 +1,150 @@ +# Test Data Scripts + +This directory contains scripts for creating and updating test data in the AutoGPT Platform database, specifically designed to test the materialized views for the store functionality. + +## Scripts + +### test_data_creator.py +Creates a comprehensive set of test data including: +- Users with profiles +- Agent graphs, nodes, and executions +- Store listings with multiple versions +- Reviews and ratings +- Library agents +- Integration webhooks +- Onboarding data +- Credit transactions + +**Image/Video Domains Used:** +- Images: `picsum.photos` (for all image URLs) +- Videos: `youtube.com` (for store listing videos) + +### test_data_updater.py +Updates existing test data to simulate real-world changes: +- Adds new agent graph executions +- Creates new store listing reviews +- Updates store listing versions +- Adds credit transactions +- Refreshes materialized views + +### check_db.py +Tests and verifies materialized views functionality: +- Checks pg_cron job status (for automatic refresh) +- Displays current materialized view counts +- Adds test data (executions and reviews) +- Creates store listings if none exist +- Manually refreshes materialized views +- Compares before/after counts to verify updates +- Provides a summary of test results + +## Materialized Views + +The scripts test three key database views: + +1. **mv_agent_run_counts**: Tracks execution counts by agent +2. **mv_review_stats**: Tracks review statistics (count, average rating) by store listing +3. **StoreAgent**: A view that combines store listing data with execution counts and ratings for display + +The materialized views (mv_agent_run_counts and mv_review_stats) are automatically refreshed every 15 minutes via pg_cron, or can be manually refreshed using the `refresh_store_materialized_views()` function. + +## Usage + +### Prerequisites + +1. Ensure the database is running: +```bash +docker compose up -d +# or for test database: +docker compose -f docker-compose.test.yaml --env-file ../.env up -d +``` + +2. Run database migrations: +```bash +poetry run prisma migrate deploy +``` + +### Running the Scripts + +#### Option 1: Use the helper script (from backend directory) +```bash +poetry run python run_test_data.py +``` + +#### Option 2: Run individually +```bash +# From backend/test directory: +# Create initial test data +poetry run python test_data_creator.py + +# Update data to test materialized view changes +poetry run python test_data_updater.py + +# From backend directory: +# Test materialized views functionality +poetry run python check_db.py + +# Check store data status +poetry run python check_store_data.py +``` + +#### Option 3: Use the shell script (from backend directory) +```bash +./run_test_data_scripts.sh +``` + +### Manual Materialized View Refresh + +To manually refresh the materialized views: +```sql +SELECT refresh_store_materialized_views(); +``` + +## Configuration + +The scripts use the database configuration from your `.env` file: +- `DATABASE_URL`: PostgreSQL connection string +- Database should have the platform schema + +## Data Generation Limits + +Configured in `test_data_creator.py`: +- 100 users +- 100 agent blocks +- 1-5 graphs per user +- 2-5 nodes per graph +- 1-5 presets per user +- 1-10 library agents per user +- 1-20 executions per graph +- 1-5 reviews per store listing version + +## Notes + +- All image URLs use `picsum.photos` for consistency with Next.js image configuration +- The scripts create realistic relationships between entities +- Materialized views are refreshed at the end of each script +- Data is designed to test both happy paths and edge cases + +## Troubleshooting + +### Reviews and StoreAgent view showing 0 + +If `check_db.py` shows that reviews remain at 0 and StoreAgent view shows 0 store agents: + +1. **No store listings exist**: The script will automatically create test store listings if none exist +2. **No approved versions**: Store listings need approved versions to appear in the StoreAgent view +3. **Check with `check_store_data.py`**: This script provides detailed information about: + - Total store listings + - Store listing versions by status + - Existing reviews + - StoreAgent view contents + - Agent graph executions + +### pg_cron not installed + +The warning "pg_cron extension is not installed" is normal in local development environments. The materialized views can still be refreshed manually using the `refresh_store_materialized_views()` function, which all scripts do automatically. + +### Common Issues + +- **Type errors with None values**: Fixed in the latest version of check_db.py by using `or 0` for nullable numeric fields +- **Missing relations**: Ensure you're using the correct field names (e.g., `StoreListing` not `storeListing` in includes) +- **Column name mismatches**: The database uses camelCase for column names (e.g., `agentGraphId` not `agent_graph_id`) \ No newline at end of file diff --git a/autogpt_platform/backend/backend/check_db.py b/autogpt_platform/backend/backend/check_db.py new file mode 100644 index 000000000000..591c519f8461 --- /dev/null +++ b/autogpt_platform/backend/backend/check_db.py @@ -0,0 +1,359 @@ +import asyncio +import random +from datetime import datetime + +from faker import Faker +from prisma import Prisma + +faker = Faker() + + +async def check_cron_job(db): + """Check if the pg_cron job for refreshing materialized views exists.""" + print("\n1. Checking pg_cron job...") + print("-" * 40) + + try: + # Check if pg_cron extension exists + extension_check = await db.query_raw("CREATE EXTENSION pg_cron;") + print(extension_check) + extension_check = await db.query_raw( + "SELECT COUNT(*) as count FROM pg_extension WHERE extname = 'pg_cron'" + ) + if extension_check[0]["count"] == 0: + print("⚠️ pg_cron extension is not installed") + return False + + # Check if the refresh job exists + job_check = await db.query_raw( + """ + SELECT jobname, schedule, command + FROM cron.job + WHERE jobname = 'refresh-store-views' + """ + ) + + if job_check: + job = job_check[0] + print("✅ pg_cron job found:") + print(f" Name: {job['jobname']}") + print(f" Schedule: {job['schedule']} (every 15 minutes)") + print(f" Command: {job['command']}") + return True + else: + print("⚠️ pg_cron job 'refresh-store-views' not found") + return False + + except Exception as e: + print(f"❌ Error checking pg_cron: {e}") + return False + + +async def get_materialized_view_counts(db): + """Get current counts from materialized views.""" + print("\n2. Getting current materialized view data...") + print("-" * 40) + + # Get counts from mv_agent_run_counts + agent_runs = await db.query_raw( + """ + SELECT COUNT(*) as total_agents, + SUM(run_count) as total_runs, + MAX(run_count) as max_runs, + MIN(run_count) as min_runs + FROM mv_agent_run_counts + """ + ) + + # Get counts from mv_review_stats + review_stats = await db.query_raw( + """ + SELECT COUNT(*) as total_listings, + SUM(review_count) as total_reviews, + AVG(avg_rating) as overall_avg_rating + FROM mv_review_stats + """ + ) + + # Get sample data from StoreAgent view + store_agents = await db.query_raw( + """ + SELECT COUNT(*) as total_store_agents, + AVG(runs) as avg_runs, + AVG(rating) as avg_rating + FROM "StoreAgent" + """ + ) + + agent_run_data = agent_runs[0] if agent_runs else {} + review_data = review_stats[0] if review_stats else {} + store_data = store_agents[0] if store_agents else {} + + print("📊 mv_agent_run_counts:") + print(f" Total agents: {agent_run_data.get('total_agents', 0)}") + print(f" Total runs: {agent_run_data.get('total_runs', 0)}") + print(f" Max runs per agent: {agent_run_data.get('max_runs', 0)}") + print(f" Min runs per agent: {agent_run_data.get('min_runs', 0)}") + + print("\n📊 mv_review_stats:") + print(f" Total listings: {review_data.get('total_listings', 0)}") + print(f" Total reviews: {review_data.get('total_reviews', 0)}") + print(f" Overall avg rating: {review_data.get('overall_avg_rating') or 0:.2f}") + + print("\n📊 StoreAgent view:") + print(f" Total store agents: {store_data.get('total_store_agents', 0)}") + print(f" Average runs: {store_data.get('avg_runs') or 0:.2f}") + print(f" Average rating: {store_data.get('avg_rating') or 0:.2f}") + + return { + "agent_runs": agent_run_data, + "reviews": review_data, + "store_agents": store_data, + } + + +async def add_test_data(db): + """Add some test data to verify materialized view updates.""" + print("\n3. Adding test data...") + print("-" * 40) + + # Get some existing data + users = await db.user.find_many(take=5) + graphs = await db.agentgraph.find_many(take=5) + + if not users or not graphs: + print("❌ No existing users or graphs found. Run test_data_creator.py first.") + return False + + # Add new executions + print("Adding new agent graph executions...") + new_executions = 0 + for graph in graphs: + for _ in range(random.randint(2, 5)): + await db.agentgraphexecution.create( + data={ + "agentGraphId": graph.id, + "agentGraphVersion": graph.version, + "userId": random.choice(users).id, + "executionStatus": "COMPLETED", + "startedAt": datetime.now(), + } + ) + new_executions += 1 + + print(f"✅ Added {new_executions} new executions") + + # Check if we need to create store listings first + store_versions = await db.storelistingversion.find_many( + where={"submissionStatus": "APPROVED"}, take=5 + ) + + if not store_versions: + print("\nNo approved store listings found. Creating test store listings...") + + # Create store listings for existing agent graphs + for i, graph in enumerate(graphs[:3]): # Create up to 3 store listings + # Create a store listing + listing = await db.storelisting.create( + data={ + "slug": f"test-agent-{graph.id[:8]}", + "agentGraphId": graph.id, + "agentGraphVersion": graph.version, + "hasApprovedVersion": True, + "owningUserId": graph.userId, + } + ) + + # Create an approved version + version = await db.storelistingversion.create( + data={ + "storeListingId": listing.id, + "agentGraphId": graph.id, + "agentGraphVersion": graph.version, + "name": f"Test Agent {i+1}", + "subHeading": faker.catch_phrase(), + "description": faker.paragraph(nb_sentences=5), + "imageUrls": [faker.image_url()], + "categories": ["productivity", "automation"], + "submissionStatus": "APPROVED", + "submittedAt": datetime.now(), + } + ) + + # Update listing with active version + await db.storelisting.update( + where={"id": listing.id}, data={"activeVersionId": version.id} + ) + + print("✅ Created test store listings") + + # Re-fetch approved versions + store_versions = await db.storelistingversion.find_many( + where={"submissionStatus": "APPROVED"}, take=5 + ) + + # Add new reviews + print("\nAdding new store listing reviews...") + new_reviews = 0 + for version in store_versions: + # Find users who haven't reviewed this version + existing_reviews = await db.storelistingreview.find_many( + where={"storeListingVersionId": version.id} + ) + reviewed_user_ids = {r.reviewByUserId for r in existing_reviews} + available_users = [u for u in users if u.id not in reviewed_user_ids] + + if available_users: + user = random.choice(available_users) + await db.storelistingreview.create( + data={ + "storeListingVersionId": version.id, + "reviewByUserId": user.id, + "score": random.randint(3, 5), + "comments": faker.text(max_nb_chars=100), + } + ) + new_reviews += 1 + + print(f"✅ Added {new_reviews} new reviews") + + return True + + +async def refresh_materialized_views(db): + """Manually refresh the materialized views.""" + print("\n4. Manually refreshing materialized views...") + print("-" * 40) + + try: + await db.execute_raw("SELECT refresh_store_materialized_views();") + print("✅ Materialized views refreshed successfully") + return True + except Exception as e: + print(f"❌ Error refreshing views: {e}") + return False + + +async def compare_counts(before, after): + """Compare counts before and after refresh.""" + print("\n5. Comparing counts before and after refresh...") + print("-" * 40) + + # Compare agent runs + print("🔍 Agent run changes:") + before_runs = before["agent_runs"].get("total_runs") or 0 + after_runs = after["agent_runs"].get("total_runs") or 0 + print( + f" Total runs: {before_runs} → {after_runs} " f"(+{after_runs - before_runs})" + ) + + # Compare reviews + print("\n🔍 Review changes:") + before_reviews = before["reviews"].get("total_reviews") or 0 + after_reviews = after["reviews"].get("total_reviews") or 0 + print( + f" Total reviews: {before_reviews} → {after_reviews} " + f"(+{after_reviews - before_reviews})" + ) + + # Compare store agents + print("\n🔍 StoreAgent view changes:") + before_avg_runs = before["store_agents"].get("avg_runs", 0) or 0 + after_avg_runs = after["store_agents"].get("avg_runs", 0) or 0 + print( + f" Average runs: {before_avg_runs:.2f} → {after_avg_runs:.2f} " + f"(+{after_avg_runs - before_avg_runs:.2f})" + ) + + # Verify changes occurred + runs_changed = (after["agent_runs"].get("total_runs") or 0) > ( + before["agent_runs"].get("total_runs") or 0 + ) + reviews_changed = (after["reviews"].get("total_reviews") or 0) > ( + before["reviews"].get("total_reviews") or 0 + ) + + if runs_changed and reviews_changed: + print("\n✅ Materialized views are updating correctly!") + return True + else: + print("\n⚠️ Some materialized views may not have updated:") + if not runs_changed: + print(" - Agent run counts did not increase") + if not reviews_changed: + print(" - Review counts did not increase") + return False + + +async def main(): + db = Prisma() + await db.connect() + + print("=" * 60) + print("Materialized Views Test") + print("=" * 60) + + try: + # Check if data exists + user_count = await db.user.count() + if user_count == 0: + print("❌ No data in database. Please run test_data_creator.py first.") + await db.disconnect() + return + + # 1. Check cron job + cron_exists = await check_cron_job(db) + + # 2. Get initial counts + counts_before = await get_materialized_view_counts(db) + + # 3. Add test data + data_added = await add_test_data(db) + refresh_success = False + + if data_added: + # Wait a moment for data to be committed + print("\nWaiting for data to be committed...") + await asyncio.sleep(2) + + # 4. Manually refresh views + refresh_success = await refresh_materialized_views(db) + + if refresh_success: + # 5. Get counts after refresh + counts_after = await get_materialized_view_counts(db) + + # 6. Compare results + await compare_counts(counts_before, counts_after) + + # Summary + print("\n" + "=" * 60) + print("Test Summary") + print("=" * 60) + print(f"✓ pg_cron job exists: {'Yes' if cron_exists else 'No'}") + print(f"✓ Test data added: {'Yes' if data_added else 'No'}") + print(f"✓ Manual refresh worked: {'Yes' if refresh_success else 'No'}") + print( + f"✓ Views updated correctly: {'Yes' if data_added and refresh_success else 'Cannot verify'}" + ) + + if cron_exists: + print( + "\n💡 The materialized views will also refresh automatically every 15 minutes via pg_cron." + ) + else: + print( + "\n⚠️ Automatic refresh is not configured. Views must be refreshed manually." + ) + + except Exception as e: + print(f"\n❌ Test failed with error: {e}") + import traceback + + traceback.print_exc() + + await db.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/autogpt_platform/backend/backend/check_store_data.py b/autogpt_platform/backend/backend/check_store_data.py new file mode 100644 index 000000000000..10aa6507baf8 --- /dev/null +++ b/autogpt_platform/backend/backend/check_store_data.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""Check store-related data in the database.""" + +import asyncio + +from prisma import Prisma + + +async def check_store_data(db): + """Check what store data exists in the database.""" + + print("============================================================") + print("Store Data Check") + print("============================================================") + + # Check store listings + print("\n1. Store Listings:") + print("-" * 40) + listings = await db.storelisting.find_many() + print(f"Total store listings: {len(listings)}") + + if listings: + for listing in listings[:5]: + print(f"\nListing ID: {listing.id}") + print(f" Name: {listing.name}") + print(f" Status: {listing.status}") + print(f" Slug: {listing.slug}") + + # Check store listing versions + print("\n\n2. Store Listing Versions:") + print("-" * 40) + versions = await db.storelistingversion.find_many(include={"StoreListing": True}) + print(f"Total store listing versions: {len(versions)}") + + # Group by submission status + status_counts = {} + for version in versions: + status = version.submissionStatus + status_counts[status] = status_counts.get(status, 0) + 1 + + print("\nVersions by status:") + for status, count in status_counts.items(): + print(f" {status}: {count}") + + # Show approved versions + approved_versions = [v for v in versions if v.submissionStatus == "APPROVED"] + print(f"\nApproved versions: {len(approved_versions)}") + if approved_versions: + for version in approved_versions[:5]: + print(f"\n Version ID: {version.id}") + print(f" Listing: {version.StoreListing.name}") + print(f" Version: {version.version}") + + # Check store listing reviews + print("\n\n3. Store Listing Reviews:") + print("-" * 40) + reviews = await db.storelistingreview.find_many( + include={"StoreListingVersion": {"include": {"StoreListing": True}}} + ) + print(f"Total reviews: {len(reviews)}") + + if reviews: + # Calculate average rating + total_score = sum(r.score for r in reviews) + avg_score = total_score / len(reviews) if reviews else 0 + print(f"Average rating: {avg_score:.2f}") + + # Show sample reviews + print("\nSample reviews:") + for review in reviews[:3]: + print(f"\n Review for: {review.StoreListingVersion.StoreListing.name}") + print(f" Score: {review.score}") + print(f" Comments: {review.comments[:100]}...") + + # Check StoreAgent view data + print("\n\n4. StoreAgent View Data:") + print("-" * 40) + + # Query the StoreAgent view + query = """ + SELECT + sa.listing_id, + sa.slug, + sa.agent_name, + sa.description, + sa.featured, + sa.runs, + sa.rating, + sa.creator_username, + sa.categories, + sa.updated_at + FROM "StoreAgent" sa + LIMIT 10; + """ + + store_agents = await db.query_raw(query) + print(f"Total store agents in view: {len(store_agents)}") + + if store_agents: + for agent in store_agents[:5]: + print(f"\nStore Agent: {agent['agent_name']}") + print(f" Slug: {agent['slug']}") + print(f" Runs: {agent['runs']}") + print(f" Rating: {agent['rating']}") + print(f" Creator: {agent['creator_username']}") + + # Check the underlying data that should populate StoreAgent + print("\n\n5. Data that should populate StoreAgent view:") + print("-" * 40) + + # Check for any APPROVED store listing versions + query = """ + SELECT COUNT(*) as count + FROM "StoreListingVersion" + WHERE "submissionStatus" = 'APPROVED' + """ + + result = await db.query_raw(query) + approved_count = result[0]["count"] if result else 0 + print(f"Approved store listing versions: {approved_count}") + + # Check for store listings with hasApprovedVersion = true + query = """ + SELECT COUNT(*) as count + FROM "StoreListing" + WHERE "hasApprovedVersion" = true AND "isDeleted" = false + """ + + result = await db.query_raw(query) + has_approved_count = result[0]["count"] if result else 0 + print(f"Store listings with approved versions: {has_approved_count}") + + # Check agent graph executions + query = """ + SELECT COUNT(DISTINCT "agentGraphId") as unique_agents, + COUNT(*) as total_executions + FROM "AgentGraphExecution" + """ + + result = await db.query_raw(query) + if result: + print("\nAgent Graph Executions:") + print(f" Unique agents with executions: {result[0]['unique_agents']}") + print(f" Total executions: {result[0]['total_executions']}") + + +async def main(): + """Main function.""" + db = Prisma() + await db.connect() + + try: + await check_store_data(db) + finally: + await db.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/autogpt_platform/backend/clean_test_db.py b/autogpt_platform/backend/clean_test_db.py new file mode 100644 index 000000000000..da155c6f75fc --- /dev/null +++ b/autogpt_platform/backend/clean_test_db.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Clean the test database by removing all data while preserving the schema. + +Usage: + poetry run python clean_test_db.py [--yes] + +Options: + --yes Skip confirmation prompt +""" + +import asyncio +import sys + +from prisma import Prisma + + +async def main(): + db = Prisma() + await db.connect() + + print("=" * 60) + print("Cleaning Test Database") + print("=" * 60) + print() + + # Get initial counts + user_count = await db.user.count() + agent_count = await db.agentgraph.count() + + print(f"Current data: {user_count} users, {agent_count} agent graphs") + + if user_count == 0 and agent_count == 0: + print("Database is already clean!") + await db.disconnect() + return + + # Check for --yes flag + skip_confirm = "--yes" in sys.argv + + if not skip_confirm: + response = input("\nDo you want to clean all data? (yes/no): ") + if response.lower() != "yes": + print("Aborted.") + await db.disconnect() + return + + print("\nCleaning database...") + + # Delete in reverse order of dependencies + tables = [ + ("UserNotificationBatch", db.usernotificationbatch), + ("NotificationEvent", db.notificationevent), + ("CreditRefundRequest", db.creditrefundrequest), + ("StoreListingReview", db.storelistingreview), + ("StoreListingVersion", db.storelistingversion), + ("StoreListing", db.storelisting), + ("AgentNodeExecutionInputOutput", db.agentnodeexecutioninputoutput), + ("AgentNodeExecution", db.agentnodeexecution), + ("AgentGraphExecution", db.agentgraphexecution), + ("AgentNodeLink", db.agentnodelink), + ("LibraryAgent", db.libraryagent), + ("AgentPreset", db.agentpreset), + ("IntegrationWebhook", db.integrationwebhook), + ("AgentNode", db.agentnode), + ("AgentGraph", db.agentgraph), + ("AgentBlock", db.agentblock), + ("APIKey", db.apikey), + ("CreditTransaction", db.credittransaction), + ("AnalyticsMetrics", db.analyticsmetrics), + ("AnalyticsDetails", db.analyticsdetails), + ("Profile", db.profile), + ("UserOnboarding", db.useronboarding), + ("User", db.user), + ] + + for table_name, table in tables: + try: + count = await table.count() + if count > 0: + await table.delete_many() + print(f"✓ Deleted {count} records from {table_name}") + except Exception as e: + print(f"⚠ Error cleaning {table_name}: {e}") + + # Refresh materialized views (they should be empty now) + try: + await db.execute_raw("SELECT refresh_store_materialized_views();") + print("\n✓ Refreshed materialized views") + except Exception as e: + print(f"\n⚠ Could not refresh materialized views: {e}") + + await db.disconnect() + + print("\n" + "=" * 60) + print("Database cleaned successfully!") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/autogpt_platform/backend/docker-compose.test.yaml b/autogpt_platform/backend/docker-compose.test.yaml index 5f2aa19eb64a..259d52c49748 100644 --- a/autogpt_platform/backend/docker-compose.test.yaml +++ b/autogpt_platform/backend/docker-compose.test.yaml @@ -1,37 +1,60 @@ +networks: + app-network: + name: app-network + shared-network: + name: shared-network + +volumes: + supabase-config: + +x-agpt-services: + &agpt-services + networks: + - app-network + - shared-network + +x-supabase-services: + &supabase-services + networks: + - app-network + - shared-network + + volumes: clamav-data: + services: - postgres-test: - image: ankane/pgvector:latest - environment: - - POSTGRES_USER=${DB_USER:-postgres} - - POSTGRES_PASSWORD=${DB_PASS:-postgres} - - POSTGRES_DB=${DB_NAME:-postgres} - - POSTGRES_PORT=${DB_PORT:-5432} - healthcheck: - test: pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB - interval: 10s - timeout: 5s - retries: 5 + + db: + <<: *supabase-services + extends: + file: ../db/docker/docker-compose.yml + service: db ports: - - "${DB_PORT:-5432}:5432" - networks: - - app-network-test - redis-test: + - ${POSTGRES_PORT}:5432 # We don't use Supavisor locally, so we expose the db directly. + + vector: + <<: *supabase-services + extends: + file: ../db/docker/docker-compose.yml + service: vector + + redis: + <<: *agpt-services image: redis:latest command: redis-server --requirepass password ports: - "6379:6379" - networks: - - app-network-test healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 5 - rabbitmq-test: + + rabbitmq: + <<: *agpt-services image: rabbitmq:management - container_name: rabbitmq-test + container_name: rabbitmq healthcheck: test: rabbitmq-diagnostics -q ping interval: 30s @@ -40,7 +63,7 @@ services: start_period: 10s environment: - RABBITMQ_DEFAULT_USER=rabbitmq_user_default - - RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 # CHANGE THIS TO A RANDOM PASSWORD IN PRODUCTION -- everywhere lol + - RABBITMQ_DEFAULT_PASS=k0VMxyIJF9S35f3x2uaw5IWAl6Y536O7 ports: - "5672:5672" - "15672:15672" diff --git a/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/migration.sql b/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/migration.sql new file mode 100644 index 000000000000..78c6a1e7890a --- /dev/null +++ b/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/migration.sql @@ -0,0 +1,254 @@ +-- This migration creates materialized views for performance optimization +-- +-- IMPORTANT: For production environments, pg_cron is REQUIRED for automatic refresh +-- Prerequisites for production: +-- 1. pg_cron extension must be installed: CREATE EXTENSION pg_cron; +-- 2. pg_cron must be configured in postgresql.conf: +-- shared_preload_libraries = 'pg_cron' +-- cron.database_name = 'your_database_name' +-- +-- For development environments without pg_cron: +-- The migration will succeed but you must manually refresh views with: +-- SELECT refresh_store_materialized_views(); + +-- Check if pg_cron extension is installed and set a flag +DO $$ +DECLARE + has_pg_cron BOOLEAN; +BEGIN + SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') INTO has_pg_cron; + + IF NOT has_pg_cron THEN + RAISE WARNING 'pg_cron extension is not installed!'; + RAISE WARNING 'Materialized views will be created but WILL NOT refresh automatically.'; + RAISE WARNING 'For production use, install pg_cron with: CREATE EXTENSION pg_cron;'; + RAISE WARNING 'For development, manually refresh with: SELECT refresh_store_materialized_views();'; + + -- For production deployments, uncomment the following line to make pg_cron mandatory: + -- RAISE EXCEPTION 'pg_cron is required for production deployments'; + END IF; + + -- Store the flag for later use in the migration + PERFORM set_config('migration.has_pg_cron', has_pg_cron::text, false); +END +$$; + +-- CreateIndex +-- Optimized: Only include owningUserId in index columns since isDeleted and hasApprovedVersion are in WHERE clause +CREATE INDEX IF NOT EXISTS "idx_store_listing_approved" ON "StoreListing"("owningUserId") WHERE "isDeleted" = false AND "hasApprovedVersion" = true; + +-- CreateIndex +-- Optimized: Only include storeListingId since submissionStatus is in WHERE clause +CREATE INDEX IF NOT EXISTS "idx_store_listing_version_status" ON "StoreListingVersion"("storeListingId") WHERE "submissionStatus" = 'APPROVED'; + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "idx_slv_categories_gin" ON "StoreListingVersion" USING GIN ("categories") WHERE "submissionStatus" = 'APPROVED'; + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "idx_slv_agent" ON "StoreListingVersion"("agentGraphId", "agentGraphVersion") WHERE "submissionStatus" = 'APPROVED'; + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "idx_store_listing_review_version" ON "StoreListingReview"("storeListingVersionId"); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "idx_agent_graph_execution_agent" ON "AgentGraphExecution"("agentGraphId"); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS "idx_profile_user" ON "Profile"("userId"); + +-- Additional performance indexes +CREATE INDEX IF NOT EXISTS "idx_store_listing_version_approved_listing" ON "StoreListingVersion"("storeListingId", "version") WHERE "submissionStatus" = 'APPROVED'; + +-- Create materialized view for agent run counts +CREATE MATERIALIZED VIEW IF NOT EXISTS "mv_agent_run_counts" AS +SELECT + "agentGraphId", + COUNT(*) AS run_count +FROM "AgentGraphExecution" +GROUP BY "agentGraphId"; + +-- CreateIndex +CREATE UNIQUE INDEX IF NOT EXISTS "idx_mv_agent_run_counts" ON "mv_agent_run_counts"("agentGraphId"); + +-- Create materialized view for review statistics +CREATE MATERIALIZED VIEW IF NOT EXISTS "mv_review_stats" AS +SELECT + sl.id AS "storeListingId", + COUNT(sr.id) AS review_count, + AVG(sr.score::numeric) AS avg_rating +FROM "StoreListing" sl +JOIN "StoreListingVersion" slv ON slv."storeListingId" = sl.id +LEFT JOIN "StoreListingReview" sr ON sr."storeListingVersionId" = slv.id +WHERE sl."isDeleted" = false + AND slv."submissionStatus" = 'APPROVED' +GROUP BY sl.id; + +-- CreateIndex +CREATE UNIQUE INDEX IF NOT EXISTS "idx_mv_review_stats" ON "mv_review_stats"("storeListingId"); + +-- DropForeignKey (if any exist on the views) +-- None needed as views don't have foreign keys + +-- DropView +DROP VIEW IF EXISTS "Creator"; + +-- DropView +DROP VIEW IF EXISTS "StoreAgent"; + +-- CreateView +CREATE OR REPLACE VIEW "StoreAgent" AS +WITH agent_versions AS ( + SELECT + "storeListingId", + array_agg(DISTINCT version::text ORDER BY version::text) AS versions + FROM "StoreListingVersion" + WHERE "submissionStatus" = 'APPROVED' + GROUP BY "storeListingId" +) +SELECT + sl.id AS listing_id, + slv.id AS "storeListingVersionId", + slv."createdAt" AS updated_at, + sl.slug, + COALESCE(slv.name, '') AS agent_name, + slv."videoUrl" AS agent_video, + COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image, + slv."isFeatured" AS featured, + p.username AS creator_username, + p."avatarUrl" AS creator_avatar, + slv."subHeading" AS sub_heading, + slv.description, + slv.categories, + COALESCE(ar.run_count, 0::bigint) AS runs, + COALESCE(rs.avg_rating, 0.0)::double precision AS rating, + COALESCE(av.versions, ARRAY[slv.version::text]) AS versions +FROM "StoreListing" sl +INNER JOIN "StoreListingVersion" slv + ON slv."storeListingId" = sl.id + AND slv."submissionStatus" = 'APPROVED' +JOIN "AgentGraph" a + ON slv."agentGraphId" = a.id + AND slv."agentGraphVersion" = a.version +LEFT JOIN "Profile" p + ON sl."owningUserId" = p."userId" +LEFT JOIN "mv_review_stats" rs + ON sl.id = rs."storeListingId" +LEFT JOIN "mv_agent_run_counts" ar + ON a.id = ar."agentGraphId" +LEFT JOIN agent_versions av + ON sl.id = av."storeListingId" +WHERE sl."isDeleted" = false + AND sl."hasApprovedVersion" = true; + +-- CreateView +CREATE OR REPLACE VIEW "Creator" AS +WITH creator_listings AS ( + SELECT + sl."owningUserId", + sl.id AS listing_id, + slv."agentGraphId", + slv.categories, + sr.score, + ar.run_count + FROM "StoreListing" sl + INNER JOIN "StoreListingVersion" slv + ON slv."storeListingId" = sl.id + AND slv."submissionStatus" = 'APPROVED' + LEFT JOIN "StoreListingReview" sr + ON sr."storeListingVersionId" = slv.id + LEFT JOIN "mv_agent_run_counts" ar + ON ar."agentGraphId" = slv."agentGraphId" + WHERE sl."isDeleted" = false + AND sl."hasApprovedVersion" = true +), +creator_stats AS ( + SELECT + cl."owningUserId", + COUNT(DISTINCT cl.listing_id) AS num_agents, + AVG(COALESCE(cl.score, 0)::numeric) AS agent_rating, + SUM(DISTINCT COALESCE(cl.run_count, 0)) AS agent_runs, + array_agg(DISTINCT cat ORDER BY cat) FILTER (WHERE cat IS NOT NULL) AS all_categories + FROM creator_listings cl + LEFT JOIN LATERAL unnest(COALESCE(cl.categories, ARRAY[]::text[])) AS cat ON true + GROUP BY cl."owningUserId" +) +SELECT + p.username, + p.name, + p."avatarUrl" AS avatar_url, + p.description, + cs.all_categories AS top_categories, + p.links, + p."isFeatured" AS is_featured, + COALESCE(cs.num_agents, 0::bigint) AS num_agents, + COALESCE(cs.agent_rating, 0.0) AS agent_rating, + COALESCE(cs.agent_runs, 0::numeric) AS agent_runs +FROM "Profile" p +LEFT JOIN creator_stats cs ON cs."owningUserId" = p."userId"; + +-- Create refresh function that works with the current schema +CREATE OR REPLACE FUNCTION refresh_store_materialized_views() +RETURNS void +LANGUAGE plpgsql +AS $$ +DECLARE + current_schema_name text; +BEGIN + -- Get the current schema + current_schema_name := current_schema(); + + -- Use CONCURRENTLY for better performance during refresh + EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %I."mv_agent_run_counts"', current_schema_name); + EXECUTE format('REFRESH MATERIALIZED VIEW CONCURRENTLY %I."mv_review_stats"', current_schema_name); + RAISE NOTICE 'Materialized views refreshed in schema % at %', current_schema_name, NOW(); +EXCEPTION + WHEN OTHERS THEN + -- Fallback to non-concurrent refresh if concurrent fails + EXECUTE format('REFRESH MATERIALIZED VIEW %I."mv_agent_run_counts"', current_schema_name); + EXECUTE format('REFRESH MATERIALIZED VIEW %I."mv_review_stats"', current_schema_name); + RAISE NOTICE 'Materialized views refreshed (non-concurrent) in schema % at % due to: %', current_schema_name, NOW(), SQLERRM; +END; +$$; + +-- Initial refresh of materialized views +SELECT refresh_store_materialized_views(); + +-- Schedule automatic refresh every 15 minutes (only if pg_cron is available) +DO $$ +DECLARE + has_pg_cron BOOLEAN; + current_schema_name text; + job_name text; +BEGIN + -- Get the flag we set earlier + has_pg_cron := current_setting('migration.has_pg_cron', true)::boolean; + + -- Get current schema name + current_schema_name := current_schema(); + + -- Create a unique job name for this schema + job_name := format('refresh-store-views-%s', current_schema_name); + + IF has_pg_cron THEN + -- Try to unschedule existing job (ignore errors if it doesn't exist) + BEGIN + PERFORM cron.unschedule(job_name); + EXCEPTION WHEN OTHERS THEN + -- Job doesn't exist, that's fine + NULL; + END; + + -- Schedule the refresh job with schema-specific command + PERFORM cron.schedule( + job_name, + '*/15 * * * *', + format('SELECT %I.refresh_store_materialized_views();', current_schema_name) + ); + RAISE NOTICE 'Scheduled automatic refresh of materialized views every 15 minutes for schema %', current_schema_name; + ELSE + RAISE WARNING '⚠️ Automatic refresh NOT configured - pg_cron is not available'; + RAISE WARNING '⚠️ You must manually refresh views with: SELECT refresh_store_materialized_views();'; + RAISE WARNING '⚠️ Or install pg_cron for automatic refresh in production'; + END IF; +END; +$$; \ No newline at end of file diff --git a/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/rollback.sql b/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/rollback.sql new file mode 100644 index 000000000000..4584dbe2d05e --- /dev/null +++ b/autogpt_platform/backend/migrations/20250604130249_optimise_store_agent_and_creator_views/rollback.sql @@ -0,0 +1,155 @@ +-- Unschedule cron job (if it exists) +DO $$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') THEN + PERFORM cron.unschedule('refresh-store-views'); + RAISE NOTICE 'Unscheduled automatic refresh of materialized views'; + END IF; +EXCEPTION + WHEN OTHERS THEN + RAISE NOTICE 'Could not unschedule cron job (may not exist): %', SQLERRM; +END; +$$; + +-- DropView +DROP VIEW IF EXISTS "Creator"; + +-- DropView +DROP VIEW IF EXISTS "StoreAgent"; + +-- CreateView (restore original StoreAgent) +CREATE VIEW "StoreAgent" AS +WITH reviewstats AS ( + SELECT sl_1.id AS "storeListingId", + count(sr.id) AS review_count, + avg(sr.score::numeric) AS avg_rating + FROM "StoreListing" sl_1 + JOIN "StoreListingVersion" slv_1 + ON slv_1."storeListingId" = sl_1.id + JOIN "StoreListingReview" sr + ON sr."storeListingVersionId" = slv_1.id + WHERE sl_1."isDeleted" = false + GROUP BY sl_1.id +), agentruns AS ( + SELECT "AgentGraphExecution"."agentGraphId", + count(*) AS run_count + FROM "AgentGraphExecution" + GROUP BY "AgentGraphExecution"."agentGraphId" +) +SELECT sl.id AS listing_id, + slv.id AS "storeListingVersionId", + slv."createdAt" AS updated_at, + sl.slug, + COALESCE(slv.name, '') AS agent_name, + slv."videoUrl" AS agent_video, + COALESCE(slv."imageUrls", ARRAY[]::text[]) AS agent_image, + slv."isFeatured" AS featured, + p.username AS creator_username, + p."avatarUrl" AS creator_avatar, + slv."subHeading" AS sub_heading, + slv.description, + slv.categories, + COALESCE(ar.run_count, 0::bigint) AS runs, + COALESCE(rs.avg_rating, 0.0)::double precision AS rating, + array_agg(DISTINCT slv.version::text) AS versions + FROM "StoreListing" sl + JOIN "StoreListingVersion" slv + ON slv."storeListingId" = sl.id + JOIN "AgentGraph" a + ON slv."agentGraphId" = a.id + AND slv."agentGraphVersion" = a.version + LEFT JOIN "Profile" p + ON sl."owningUserId" = p."userId" + LEFT JOIN reviewstats rs + ON sl.id = rs."storeListingId" + LEFT JOIN agentruns ar + ON a.id = ar."agentGraphId" + WHERE sl."isDeleted" = false + AND sl."hasApprovedVersion" = true + AND slv."submissionStatus" = 'APPROVED' + GROUP BY sl.id, slv.id, sl.slug, slv."createdAt", slv.name, slv."videoUrl", + slv."imageUrls", slv."isFeatured", p.username, p."avatarUrl", + slv."subHeading", slv.description, slv.categories, ar.run_count, + rs.avg_rating; + +-- CreateView (restore original Creator) +CREATE VIEW "Creator" AS +WITH agentstats AS ( + SELECT p_1.username, + count(DISTINCT sl.id) AS num_agents, + avg(COALESCE(sr.score, 0)::numeric) AS agent_rating, + sum(COALESCE(age.run_count, 0::bigint)) AS agent_runs + FROM "Profile" p_1 + LEFT JOIN "StoreListing" sl + ON sl."owningUserId" = p_1."userId" + LEFT JOIN "StoreListingVersion" slv + ON slv."storeListingId" = sl.id + LEFT JOIN "StoreListingReview" sr + ON sr."storeListingVersionId" = slv.id + LEFT JOIN ( + SELECT "AgentGraphExecution"."agentGraphId", + count(*) AS run_count + FROM "AgentGraphExecution" + GROUP BY "AgentGraphExecution"."agentGraphId" + ) age ON age."agentGraphId" = slv."agentGraphId" + WHERE sl."isDeleted" = false + AND sl."hasApprovedVersion" = true + AND slv."submissionStatus" = 'APPROVED' + GROUP BY p_1.username +) +SELECT p.username, + p.name, + p."avatarUrl" AS avatar_url, + p.description, + array_agg(DISTINCT cats.c) FILTER (WHERE cats.c IS NOT NULL) AS top_categories, + p.links, + p."isFeatured" AS is_featured, + COALESCE(ast.num_agents, 0::bigint) AS num_agents, + COALESCE(ast.agent_rating, 0.0) AS agent_rating, + COALESCE(ast.agent_runs, 0::numeric) AS agent_runs + FROM "Profile" p + LEFT JOIN agentstats ast + ON ast.username = p.username + LEFT JOIN LATERAL ( + SELECT unnest(slv.categories) AS c + FROM "StoreListing" sl + JOIN "StoreListingVersion" slv + ON slv."storeListingId" = sl.id + WHERE sl."owningUserId" = p."userId" + AND sl."isDeleted" = false + AND sl."hasApprovedVersion" = true + AND slv."submissionStatus" = 'APPROVED' + ) cats ON true + GROUP BY p.username, p.name, p."avatarUrl", p.description, p.links, + p."isFeatured", ast.num_agents, ast.agent_rating, ast.agent_runs; + +-- Drop function +DROP FUNCTION IF EXISTS platform.refresh_store_materialized_views(); + +-- Drop materialized views +DROP MATERIALIZED VIEW IF EXISTS "mv_review_stats"; +DROP MATERIALIZED VIEW IF EXISTS "mv_agent_run_counts"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_profile_user"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_agent_graph_execution_agent"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_store_listing_review_version"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_slv_agent"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_slv_categories_gin"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_store_listing_version_status"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_store_listing_approved"; + +-- DropIndex +DROP INDEX IF EXISTS "idx_store_listing_version_approved_listing"; \ No newline at end of file diff --git a/autogpt_platform/backend/run_test_data.py b/autogpt_platform/backend/run_test_data.py new file mode 100644 index 000000000000..b37660729227 --- /dev/null +++ b/autogpt_platform/backend/run_test_data.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +Run test data creation and update scripts in sequence. + +Usage: + poetry run python run_test_data.py +""" + +import asyncio +import subprocess +import sys +from pathlib import Path + + +def run_command(cmd: list[str], cwd: Path | None = None) -> bool: + """Run a command and return True if successful.""" + try: + result = subprocess.run( + cmd, check=True, capture_output=True, text=True, cwd=cwd + ) + if result.stdout: + print(result.stdout) + return True + except subprocess.CalledProcessError as e: + print(f"Error running command: {' '.join(cmd)}") + print(f"Error: {e.stderr}") + return False + + +async def main(): + """Main function to run test data scripts.""" + print("=" * 60) + print("Running Test Data Scripts for AutoGPT Platform") + print("=" * 60) + print() + + # Get the backend directory + backend_dir = Path(__file__).parent + test_dir = backend_dir / "test" + + # Check if we're in the right directory + if not (backend_dir / "pyproject.toml").exists(): + print("ERROR: This script must be run from the backend directory") + sys.exit(1) + + print("1. Checking database connection...") + print("-" * 40) + + # Import here to ensure proper environment setup + try: + from prisma import Prisma + + db = Prisma() + await db.connect() + print("✓ Database connection successful") + await db.disconnect() + except Exception as e: + print(f"✗ Database connection failed: {e}") + print("\nPlease ensure:") + print("1. The database services are running (docker compose up -d)") + print("2. The DATABASE_URL in .env is correct") + print("3. Migrations have been run (poetry run prisma migrate deploy)") + sys.exit(1) + + print() + print("2. Running test data creator...") + print("-" * 40) + + # Run test_data_creator.py + if run_command(["poetry", "run", "python", "test_data_creator.py"], cwd=test_dir): + print() + print("✅ Test data created successfully!") + + print() + print("3. Running test data updater...") + print("-" * 40) + + # Run test_data_updater.py + if run_command( + ["poetry", "run", "python", "test_data_updater.py"], cwd=test_dir + ): + print() + print("✅ Test data updated successfully!") + else: + print() + print("❌ Test data updater failed!") + sys.exit(1) + else: + print() + print("❌ Test data creator failed!") + sys.exit(1) + + print() + print("=" * 60) + print("Test data setup completed successfully!") + print("=" * 60) + print() + print("The materialized views have been populated with test data:") + print("- mv_agent_run_counts: Agent execution statistics") + print("- mv_review_stats: Store listing review statistics") + print() + print("You can now:") + print("1. Run tests: poetry run test") + print("2. Start the backend: poetry run serve") + print("3. View data in the database") + print() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/autogpt_platform/backend/run_tests.py b/autogpt_platform/backend/run_tests.py index 84af1d681396..e6aa189d50f6 100644 --- a/autogpt_platform/backend/run_tests.py +++ b/autogpt_platform/backend/run_tests.py @@ -13,8 +13,10 @@ def wait_for_postgres(max_retries=5, delay=5): "compose", "-f", "docker-compose.test.yaml", + "--env-file", + "../.env", "exec", - "postgres-test", + "db", "pg_isready", "-U", "postgres", @@ -51,6 +53,8 @@ def test(): "compose", "-f", "docker-compose.test.yaml", + "--env-file", + "../.env", "up", "-d", ] @@ -74,11 +78,20 @@ def test(): # to their development database, running tests would wipe their local data! test_env = os.environ.copy() - # Use environment variables if set, otherwise use defaults that match docker-compose.test.yaml - db_user = os.getenv("DB_USER", "postgres") - db_pass = os.getenv("DB_PASS", "postgres") - db_name = os.getenv("DB_NAME", "postgres") - db_port = os.getenv("DB_PORT", "5432") + # Load database configuration from .env file + dotenv_path = os.path.join(os.path.dirname(__file__), "../.env") + if os.path.exists(dotenv_path): + with open(dotenv_path) as f: + for line in f: + if line.strip() and not line.startswith("#"): + key, value = line.strip().split("=", 1) + os.environ[key] = value + + # Get database config from environment (now populated from .env) + db_user = os.getenv("POSTGRES_USER", "postgres") + db_pass = os.getenv("POSTGRES_PASSWORD", "postgres") + db_name = os.getenv("POSTGRES_DB", "postgres") + db_port = os.getenv("POSTGRES_PORT", "5432") # Construct the test database URL - this ensures we're always pointing to the test container test_env["DATABASE_URL"] = ( diff --git a/autogpt_platform/backend/schema.prisma b/autogpt_platform/backend/schema.prisma index 3a8573adebca..7e8b53bdc769 100644 --- a/autogpt_platform/backend/schema.prisma +++ b/autogpt_platform/backend/schema.prisma @@ -599,7 +599,23 @@ view Creator { agent_runs Int is_featured Boolean - // Index or unique are not applied to views + // Note: Prisma doesn't support indexes on views, but the following indexes exist in the database: + // + // Optimized indexes (partial indexes to reduce size and improve performance): + // - idx_profile_user on Profile(userId) + // - idx_store_listing_approved on StoreListing(owningUserId) WHERE isDeleted = false AND hasApprovedVersion = true + // - idx_store_listing_version_status on StoreListingVersion(storeListingId) WHERE submissionStatus = 'APPROVED' + // - idx_slv_categories_gin - GIN index on StoreListingVersion(categories) WHERE submissionStatus = 'APPROVED' + // - idx_slv_agent on StoreListingVersion(agentGraphId, agentGraphVersion) WHERE submissionStatus = 'APPROVED' + // - idx_store_listing_review_version on StoreListingReview(storeListingVersionId) + // - idx_store_listing_version_approved_listing on StoreListingVersion(storeListingId, version) WHERE submissionStatus = 'APPROVED' + // - idx_agent_graph_execution_agent on AgentGraphExecution(agentGraphId) + // + // Materialized views used (refreshed every 15 minutes via pg_cron): + // - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId + // - mv_review_stats - Pre-aggregated review statistics (count, avg rating) by storeListingId + // + // Query strategy: Uses CTEs to efficiently aggregate creator statistics leveraging materialized views } view StoreAgent { @@ -622,7 +638,30 @@ view StoreAgent { rating Float versions String[] - // Index or unique are not applied to views + // Note: Prisma doesn't support indexes on views, but the following indexes exist in the database: + // + // Optimized indexes (partial indexes to reduce size and improve performance): + // - idx_store_listing_approved on StoreListing(owningUserId) WHERE isDeleted = false AND hasApprovedVersion = true + // - idx_store_listing_version_status on StoreListingVersion(storeListingId) WHERE submissionStatus = 'APPROVED' + // - idx_slv_categories_gin - GIN index on StoreListingVersion(categories) WHERE submissionStatus = 'APPROVED' for array searches + // - idx_slv_agent on StoreListingVersion(agentGraphId, agentGraphVersion) WHERE submissionStatus = 'APPROVED' + // - idx_store_listing_review_version on StoreListingReview(storeListingVersionId) + // - idx_store_listing_version_approved_listing on StoreListingVersion(storeListingId, version) WHERE submissionStatus = 'APPROVED' + // - idx_agent_graph_execution_agent on AgentGraphExecution(agentGraphId) + // - idx_profile_user on Profile(userId) + // + // Additional indexes from earlier migrations: + // - StoreListing_agentId_owningUserId_idx + // - StoreListing_isDeleted_isApproved_idx (replaced by idx_store_listing_approved) + // - StoreListing_isDeleted_idx + // - StoreListing_agentId_key (unique on agentGraphId) + // - StoreListingVersion_agentId_agentVersion_isDeleted_idx + // + // Materialized views used (refreshed every 15 minutes via pg_cron): + // - mv_agent_run_counts - Pre-aggregated agent execution counts by agentGraphId + // - mv_review_stats - Pre-aggregated review statistics (count, avg rating) by storeListingId + // + // Query strategy: Uses CTE for version aggregation and joins with materialized views for performance } view StoreSubmission { @@ -649,6 +688,33 @@ view StoreSubmission { // Index or unique are not applied to views } +// Note: This is actually a MATERIALIZED VIEW in the database +// Refreshed automatically every 15 minutes via pg_cron (with fallback to manual refresh) +view mv_agent_run_counts { + agentGraphId String @unique + run_count Int + + // Pre-aggregated count of AgentGraphExecution records by agentGraphId + // Used by StoreAgent and Creator views for performance optimization + // Unique index created automatically on agentGraphId for fast lookups + // Refresh uses CONCURRENTLY to avoid blocking reads +} + +// Note: This is actually a MATERIALIZED VIEW in the database +// Refreshed automatically every 15 minutes via pg_cron (with fallback to manual refresh) +view mv_review_stats { + storeListingId String @unique + review_count Int + avg_rating Float + + // Pre-aggregated review statistics from StoreListingReview + // Includes count of reviews and average rating per StoreListing + // Only includes approved versions (submissionStatus = 'APPROVED') and non-deleted listings + // Used by StoreAgent view for performance optimization + // Unique index created automatically on storeListingId for fast lookups + // Refresh uses CONCURRENTLY to avoid blocking reads +} + model StoreListing { id String @id @default(uuid()) createdAt DateTime @default(now()) diff --git a/autogpt_platform/backend/backend/util/test_data_creator.py b/autogpt_platform/backend/test/test_data_creator.py similarity index 71% rename from autogpt_platform/backend/backend/util/test_data_creator.py rename to autogpt_platform/backend/test/test_data_creator.py index f3e4c4561405..7901ac7574ce 100644 --- a/autogpt_platform/backend/backend/util/test_data_creator.py +++ b/autogpt_platform/backend/test/test_data_creator.py @@ -1,3 +1,21 @@ +""" +Test Data Creator for AutoGPT Platform + +This script creates test data for the AutoGPT platform database. + +Image/Video URL Domains Used: +- Images: picsum.photos (for all image URLs - avatars, store listing images, etc.) +- Videos: youtube.com (for store listing video URLs) + +Add these domains to your Next.js config: +```javascript +// next.config.js +images: { + domains: ['picsum.photos'], +} +``` +""" + import asyncio import random from datetime import datetime @@ -14,6 +32,7 @@ AnalyticsMetricsCreateInput, APIKeyCreateInput, CreditTransactionCreateInput, + IntegrationWebhookCreateInput, ProfileCreateInput, StoreListingReviewCreateInput, UserCreateInput, @@ -53,10 +72,26 @@ def get_image(): - url = faker.image_url() - while "placekitten.com" in url: - url = faker.image_url() - return url + """Generate a consistent image URL using picsum.photos service.""" + width = random.choice([200, 300, 400, 500, 600, 800]) + height = random.choice([200, 300, 400, 500, 600, 800]) + # Use a random seed to get different images + seed = random.randint(1, 1000) + return f"https://picsum.photos/seed/{seed}/{width}/{height}" + + +def get_video_url(): + """Generate a consistent video URL using a placeholder service.""" + # Using YouTube as a consistent source for video URLs + video_ids = [ + "dQw4w9WgXcQ", # Example video IDs + "9bZkp7q19f0", + "kJQP7kiw5Fk", + "RgKAFK5djSk", + "L_jWHffIx5E", + ] + video_id = random.choice(video_ids) + return f"https://www.youtube.com/watch?v={video_id}" async def main(): @@ -147,12 +182,27 @@ async def main(): ) agent_presets.append(preset) - # Insert UserAgents - user_agents = [] - print(f"Inserting {NUM_USERS * MAX_AGENTS_PER_USER} user agents") + # Insert Profiles first (before LibraryAgents) + profiles = [] + print(f"Inserting {NUM_USERS} profiles") for user in users: - num_agents = random.randint(MIN_AGENTS_PER_USER, MAX_AGENTS_PER_USER) + profile = await db.profile.create( + data=ProfileCreateInput( + userId=user.id, + name=user.name or faker.name(), + username=faker.unique.user_name(), + description=faker.text(), + links=[faker.url() for _ in range(3)], + avatarUrl=get_image(), + ) + ) + profiles.append(profile) + # Insert LibraryAgents + library_agents = [] + print("Inserting library agents") + for user in users: + num_agents = random.randint(MIN_AGENTS_PER_USER, MAX_AGENTS_PER_USER) # Get a shuffled list of graphs to ensure uniqueness per user available_graphs = agent_graphs.copy() random.shuffle(available_graphs) @@ -162,18 +212,27 @@ async def main(): for i in range(num_agents): graph = available_graphs[i] # Use unique graph for each library agent - user_agent = await db.libraryagent.create( + + # Get creator profile for this graph's owner + creator_profile = next( + (p for p in profiles if p.userId == graph.userId), None + ) + + library_agent = await db.libraryagent.create( data={ "userId": user.id, "agentGraphId": graph.id, "agentGraphVersion": graph.version, + "creatorId": creator_profile.id if creator_profile else None, + "imageUrl": get_image() if random.random() < 0.5 else None, + "useGraphIsActiveVersion": random.choice([True, False]), "isFavorite": random.choice([True, False]), "isCreatedByUser": random.choice([True, False]), "isArchived": random.choice([True, False]), "isDeleted": random.choice([True, False]), } ) - user_agents.append(user_agent) + library_agents.append(library_agent) # Insert AgentGraphExecutions agent_graph_executions = [] @@ -325,25 +384,9 @@ async def main(): ) ) - # Insert Profiles - profiles = [] - print(f"Inserting {NUM_USERS} profiles") - for user in users: - profile = await db.profile.create( - data=ProfileCreateInput( - userId=user.id, - name=user.name or faker.name(), - username=faker.unique.user_name(), - description=faker.text(), - links=[faker.url() for _ in range(3)], - avatarUrl=get_image(), - ) - ) - profiles.append(profile) - # Insert StoreListings store_listings = [] - print(f"Inserting {NUM_USERS} store listings") + print("Inserting store listings") for graph in agent_graphs: user = random.choice(users) slug = faker.slug() @@ -360,7 +403,7 @@ async def main(): # Insert StoreListingVersions store_listing_versions = [] - print(f"Inserting {NUM_USERS} store listing versions") + print("Inserting store listing versions") for listing in store_listings: graph = [g for g in agent_graphs if g.id == listing.agentGraphId][0] version = await db.storelistingversion.create( @@ -369,7 +412,7 @@ async def main(): "agentGraphVersion": graph.version, "name": graph.name or faker.sentence(nb_words=3), "subHeading": faker.sentence(), - "videoUrl": faker.url(), + "videoUrl": get_video_url() if random.random() < 0.3 else None, "imageUrls": [get_image() for _ in range(3)], "description": faker.text(), "categories": [faker.word() for _ in range(3)], @@ -388,7 +431,7 @@ async def main(): store_listing_versions.append(version) # Insert StoreListingReviews - print(f"Inserting {NUM_USERS * MAX_REVIEWS_PER_VERSION} store listing reviews") + print("Inserting store listing reviews") for version in store_listing_versions: # Create a copy of users list and shuffle it to avoid duplicates available_reviewers = users.copy() @@ -411,26 +454,92 @@ async def main(): ) ) - # Update StoreListingVersions with submission status (StoreListingSubmissions table no longer exists) - print(f"Updating {NUM_USERS} store listing versions with submission status") - for version in store_listing_versions: - reviewer = random.choice(users) - status: prisma.enums.SubmissionStatus = random.choice( - [ - prisma.enums.SubmissionStatus.PENDING, - prisma.enums.SubmissionStatus.APPROVED, - prisma.enums.SubmissionStatus.REJECTED, - ] - ) - await db.storelistingversion.update( - where={"id": version.id}, - data={ - "submissionStatus": status, - "Reviewer": {"connect": {"id": reviewer.id}}, - "reviewComments": faker.text(), - "reviewedAt": datetime.now(), - }, - ) + # Insert UserOnboarding for some users + print("Inserting user onboarding data") + for user in random.sample( + users, k=int(NUM_USERS * 0.7) + ): # 70% of users have onboarding data + completed_steps = [] + possible_steps = list(prisma.enums.OnboardingStep) + # Randomly complete some steps + if random.random() < 0.8: + num_steps = random.randint(1, len(possible_steps)) + completed_steps = random.sample(possible_steps, k=num_steps) + + try: + await db.useronboarding.create( + data={ + "userId": user.id, + "completedSteps": completed_steps, + "notificationDot": random.choice([True, False]), + "notified": ( + random.sample(completed_steps, k=min(3, len(completed_steps))) + if completed_steps + else [] + ), + "rewardedFor": ( + random.sample(completed_steps, k=min(2, len(completed_steps))) + if completed_steps + else [] + ), + "usageReason": ( + random.choice(["personal", "business", "research", "learning"]) + if random.random() < 0.7 + else None + ), + "integrations": random.sample( + ["github", "google", "discord", "slack"], k=random.randint(0, 2) + ), + "otherIntegrations": ( + faker.word() if random.random() < 0.2 else None + ), + "selectedStoreListingVersionId": ( + random.choice(store_listing_versions).id + if store_listing_versions and random.random() < 0.5 + else None + ), + "agentInput": ( + Json({"test": "data"}) if random.random() < 0.3 else None + ), + "onboardingAgentExecutionId": ( + random.choice(agent_graph_executions).id + if agent_graph_executions and random.random() < 0.3 + else None + ), + "agentRuns": random.randint(0, 10), + } + ) + except Exception as e: + print(f"Error creating onboarding for user {user.id}: {e}") + # Try simpler version + await db.useronboarding.create( + data={ + "userId": user.id, + } + ) + + # Insert IntegrationWebhooks for some users + print("Inserting integration webhooks") + for user in random.sample( + users, k=int(NUM_USERS * 0.3) + ): # 30% of users have webhooks + for _ in range(random.randint(1, 3)): + await db.integrationwebhook.create( + data=IntegrationWebhookCreateInput( + userId=user.id, + provider=random.choice(["github", "slack", "discord"]), + credentialsId=str(faker.uuid4()), + webhookType=random.choice(["repo", "channel", "server"]), + resource=faker.slug(), + events=[ + random.choice(["created", "updated", "deleted"]) + for _ in range(random.randint(1, 3)) + ], + config=prisma.Json({"url": faker.url()}), + secret=str(faker.sha256()), + providerWebhookId=str(faker.uuid4()), + ) + ) # Insert APIKeys print(f"Inserting {NUM_USERS} api keys") @@ -451,7 +560,12 @@ async def main(): ) ) + # Refresh materialized views + print("Refreshing materialized views...") + await db.execute_raw("SELECT refresh_store_materialized_views();") + await db.disconnect() + print("Test data creation completed successfully!") if __name__ == "__main__": diff --git a/autogpt_platform/backend/test/test_data_updater.py b/autogpt_platform/backend/test/test_data_updater.py new file mode 100755 index 000000000000..561a5da21a6b --- /dev/null +++ b/autogpt_platform/backend/test/test_data_updater.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +""" +Test Data Updater for Store Materialized Views + +This script updates existing test data to trigger changes in the materialized views: +- mv_agent_run_counts: Updated by creating new AgentGraphExecution records +- mv_review_stats: Updated by creating new StoreListingReview records + +Run this after test_data_creator.py to test that materialized views update correctly. +""" + +import asyncio +import random +from datetime import datetime, timedelta + +import prisma.enums +from faker import Faker +from prisma import Json, Prisma + +faker = Faker() + + +async def main(): + db = Prisma() + await db.connect() + + print("Starting test data updates for materialized views...") + print("=" * 60) + + # Get existing data + users = await db.user.find_many(take=50) + agent_graphs = await db.agentgraph.find_many(where={"isActive": True}, take=50) + store_listings = await db.storelisting.find_many( + where={"hasApprovedVersion": True}, include={"Versions": True}, take=30 + ) + agent_nodes = await db.agentnode.find_many(take=100) + + if not all([users, agent_graphs, store_listings]): + print( + "ERROR: Not enough test data found. Please run test_data_creator.py first." + ) + await db.disconnect() + return + + print( + f"Found {len(users)} users, {len(agent_graphs)} graphs, {len(store_listings)} store listings" + ) + print() + + # 1. Add new AgentGraphExecutions to update mv_agent_run_counts + print("1. Adding new agent graph executions...") + print("-" * 40) + + new_executions_count = 0 + execution_data = [] + + for graph in random.sample(agent_graphs, min(20, len(agent_graphs))): + # Add 5-15 new executions per selected graph + num_new_executions = random.randint(5, 15) + for _ in range(num_new_executions): + user = random.choice(users) + execution_data.append( + { + "agentGraphId": graph.id, + "agentGraphVersion": graph.version, + "userId": user.id, + "executionStatus": random.choice( + [ + prisma.enums.AgentExecutionStatus.COMPLETED, + prisma.enums.AgentExecutionStatus.FAILED, + prisma.enums.AgentExecutionStatus.RUNNING, + ] + ), + "startedAt": faker.date_time_between( + start_date="-7d", end_date="now" + ), + "stats": Json( + { + "duration": random.randint(100, 5000), + "blocks_executed": random.randint(1, 10), + } + ), + } + ) + new_executions_count += 1 + + # Batch create executions + await db.agentgraphexecution.create_many(data=execution_data) + print(f"✓ Created {new_executions_count} new executions") + + # Get the created executions for node executions + recent_executions = await db.agentgraphexecution.find_many( + take=new_executions_count, order={"createdAt": "desc"} + ) + + # 2. Add corresponding AgentNodeExecutions + print("\n2. Adding agent node executions...") + print("-" * 40) + + node_execution_data = [] + for execution in recent_executions: + # Get nodes for this graph + graph_nodes = [ + n for n in agent_nodes if n.agentGraphId == execution.agentGraphId + ] + if graph_nodes: + for node in random.sample(graph_nodes, min(3, len(graph_nodes))): + node_execution_data.append( + { + "agentGraphExecutionId": execution.id, + "agentNodeId": node.id, + "executionStatus": execution.executionStatus, + "addedTime": datetime.now(), + "startedTime": datetime.now() + - timedelta(minutes=random.randint(1, 10)), + "endedTime": ( + datetime.now() + if execution.executionStatus + == prisma.enums.AgentExecutionStatus.COMPLETED + else None + ), + } + ) + + await db.agentnodeexecution.create_many(data=node_execution_data) + print(f"✓ Created {len(node_execution_data)} node executions") + + # 3. Add new StoreListingReviews to update mv_review_stats + print("\n3. Adding new store listing reviews...") + print("-" * 40) + + new_reviews_count = 0 + + for listing in store_listings: + if not listing.Versions: + continue + + # Get approved versions + approved_versions = [ + v + for v in listing.Versions + if v.submissionStatus == prisma.enums.SubmissionStatus.APPROVED + ] + if not approved_versions: + continue + + # Pick a version to add reviews to + version = random.choice(approved_versions) + + # Get existing reviews for this version to avoid duplicates + existing_reviews = await db.storelistingreview.find_many( + where={"storeListingVersionId": version.id} + ) + existing_reviewer_ids = {r.reviewByUserId for r in existing_reviews} + + # Find users who haven't reviewed this version yet + available_reviewers = [u for u in users if u.id not in existing_reviewer_ids] + + if available_reviewers: + # Add 2-5 new reviews + num_new_reviews = min(random.randint(2, 5), len(available_reviewers)) + selected_reviewers = random.sample(available_reviewers, num_new_reviews) + + for reviewer in selected_reviewers: + # Bias towards positive reviews (4-5 stars) + score = random.choices([1, 2, 3, 4, 5], weights=[5, 10, 20, 40, 25])[0] + + await db.storelistingreview.create( + data={ + "storeListingVersionId": version.id, + "reviewByUserId": reviewer.id, + "score": score, + "comments": ( + faker.text(max_nb_chars=200) + if random.random() < 0.7 + else None + ), + } + ) + new_reviews_count += 1 + + print(f"✓ Created {new_reviews_count} new reviews") + + # 4. Update some store listing versions (change categories, featured status) + print("\n4. Updating store listing versions...") + print("-" * 40) + + updates_count = 0 + for listing in random.sample(store_listings, min(10, len(store_listings))): + if listing.Versions: + version = random.choice(listing.Versions) + if version.submissionStatus == prisma.enums.SubmissionStatus.APPROVED: + # Toggle featured status or update categories + new_categories = random.sample( + [ + "productivity", + "ai", + "automation", + "data", + "social", + "marketing", + "development", + "analytics", + ], + k=random.randint(2, 4), + ) + + await db.storelistingversion.update( + where={"id": version.id}, + data={ + "isFeatured": ( + not version.isFeatured + if random.random() < 0.3 + else version.isFeatured + ), + "categories": new_categories, + "updatedAt": datetime.now(), + }, + ) + updates_count += 1 + + print(f"✓ Updated {updates_count} store listing versions") + + # 5. Create some new credit transactions + print("\n5. Adding credit transactions...") + print("-" * 40) + + transaction_count = 0 + for user in random.sample(users, min(30, len(users))): + # Add 1-3 transactions per user + for _ in range(random.randint(1, 3)): + transaction_type = random.choice( + [ + prisma.enums.CreditTransactionType.USAGE, + prisma.enums.CreditTransactionType.TOP_UP, + prisma.enums.CreditTransactionType.GRANT, + ] + ) + + amount = ( + random.randint(10, 500) + if transaction_type == prisma.enums.CreditTransactionType.TOP_UP + else -random.randint(1, 50) + ) + + await db.credittransaction.create( + data={ + "userId": user.id, + "amount": amount, + "type": transaction_type, + "metadata": Json( + { + "source": "test_updater", + "timestamp": datetime.now().isoformat(), + } + ), + } + ) + transaction_count += 1 + + print(f"✓ Created {transaction_count} credit transactions") + + # 6. Refresh materialized views + print("\n6. Refreshing materialized views...") + print("-" * 40) + + try: + await db.execute_raw("SELECT refresh_store_materialized_views();") + print("✓ Materialized views refreshed successfully") + except Exception as e: + print(f"⚠ Warning: Could not refresh materialized views: {e}") + print( + " You may need to refresh them manually with: SELECT refresh_store_materialized_views();" + ) + + # 7. Verify the updates + print("\n7. Verifying updates...") + print("-" * 40) + + # Check agent run counts + run_counts = await db.query_raw( + "SELECT COUNT(*) as view_count FROM mv_agent_run_counts" + ) + print(f"✓ mv_agent_run_counts has {run_counts[0]['view_count']} entries") + + # Check review stats + review_stats = await db.query_raw( + "SELECT COUNT(*) as view_count FROM mv_review_stats" + ) + print(f"✓ mv_review_stats has {review_stats[0]['view_count']} entries") + + # Sample some data from the views + print("\nSample data from materialized views:") + + sample_runs = await db.query_raw( + "SELECT * FROM mv_agent_run_counts ORDER BY run_count DESC LIMIT 5" + ) + print("\nTop 5 agents by run count:") + for row in sample_runs: + print(f" - Agent {row['agentGraphId'][:8]}...: {row['run_count']} runs") + + sample_reviews = await db.query_raw( + "SELECT * FROM mv_review_stats ORDER BY avg_rating DESC NULLS LAST LIMIT 5" + ) + print("\nTop 5 store listings by rating:") + for row in sample_reviews: + avg_rating = row["avg_rating"] if row["avg_rating"] is not None else 0.0 + print( + f" - Listing {row['storeListingId'][:8]}...: {avg_rating:.2f} ⭐ ({row['review_count']} reviews)" + ) + + await db.disconnect() + + print("\n" + "=" * 60) + print("Test data update completed successfully!") + print("The materialized views should now reflect the updated data.") + print( + "\nTo manually refresh views, run: SELECT refresh_store_materialized_views();" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/autogpt_platform/frontend/next.config.mjs b/autogpt_platform/frontend/next.config.mjs index d0d798f77fb0..b7f2f0b3fba7 100644 --- a/autogpt_platform/frontend/next.config.mjs +++ b/autogpt_platform/frontend/next.config.mjs @@ -11,8 +11,6 @@ const nextConfig = { "ideogram.ai", // for generated images "picsum.photos", // for placeholder images - "dummyimage.com", // for placeholder images - "placekitten.com", // for placeholder images ], }, output: "standalone",