Skip to content
Open
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8ec4ec8
WebSocket notifications backend
kcze Nov 1, 2025
f56a3c8
Frontend types
kcze Nov 1, 2025
f7e57ef
Feedback
kcze Nov 5, 2025
e38d1c5
Update types
kcze Nov 5, 2025
61c65a5
Redis notification bus
kcze Nov 6, 2025
9e4e150
Merge branch 'kpczerwinski/secrt-1724-websocket-based-notifications' …
kcze Nov 6, 2025
e019233
Confetti on ws notification
kcze Nov 7, 2025
59d652c
Scroll completed task into view
kcze Nov 7, 2025
16164d5
Complete rerun task on Run again
kcze Nov 7, 2025
49a1c50
Merge branch 'dev' into kpczerwinski/secrt-1724-websocket-based-notif…
kcze Nov 7, 2025
4fade1c
Merge branch 'kpczerwinski/secrt-1724-websocket-based-notifications' …
kcze Nov 7, 2025
614d957
Merge branch 'dev' into kpczerwinski/secrt-1727-onboarding-notifications
kcze Nov 9, 2025
f520469
Update endpoints
kcze Nov 9, 2025
3d48e49
Backend `SCHEDULE_AGENT`
kcze Nov 9, 2025
0560e3b
Add `resolveResponse` helper function
kcze Nov 9, 2025
4477864
Use `postV2AddMarketplaceAgent`
kcze Nov 9, 2025
613940b
Use `getV1OnboardingState`
kcze Nov 9, 2025
e7915ea
Use `patchV1UpdateOnboardingState`
kcze Nov 9, 2025
55b55f9
Use `postV1CompleteOnboardingStep`
kcze Nov 10, 2025
bb9fed5
Update provider
kcze Nov 10, 2025
c41e1cd
Use `getV1RecommendedOnboardingAgents`
kcze Nov 10, 2025
96067cd
Use `getV1IsOnboardingEnabled`
kcze Nov 10, 2025
688c784
Backend increment runs logic
kcze Nov 11, 2025
6e58fa7
Backend `GET_RESULTS`
kcze Nov 11, 2025
c185f28
Backend `MARKETPLACE_ADD_AGENT`
kcze Nov 11, 2025
0f51a8e
Backend `BUILDER_SAVE_AGENT`
kcze Nov 12, 2025
f938e4d
Track source of execution
kcze Nov 12, 2025
7f3efd9
Backend `RE_RUN_AGENT`
kcze Nov 13, 2025
201cf9c
Merge branch 'dev' into kpczerwinski/secrt-1726-move-completion-backend
kcze Nov 13, 2025
85f0429
Merge branch 'dev' into kpczerwinski/secrt-1726-move-completion-backend
kcze Nov 13, 2025
7ab1660
Format
kcze Nov 13, 2025
1e731c0
Increment runs for scheduled agents
kcze Nov 14, 2025
154135f
Fixes
kcze Nov 17, 2025
64ef19f
Merge branch 'dev' into kpczerwinski/secrt-1726-move-completion-backend
kcze Nov 17, 2025
05866d0
Fix
kcze Nov 17, 2025
3ceec80
Merge branch 'dev' into kpczerwinski/secrt-1726-move-completion-backend
kcze Nov 18, 2025
ab3d325
Fix openapi schema
kcze Nov 19, 2025
49e8afc
Remove duplicated call
kcze Nov 20, 2025
8c4899b
Merge branch 'dev' into kpczerwinski/secrt-1726-move-completion-backend
kcze Nov 20, 2025
db1abb1
Format
kcze Nov 20, 2025
63384a8
Move `UserOnboarding` to `model.py`
kcze Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion autogpt_platform/backend/backend/data/notification_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import AsyncGenerator

from pydantic import BaseModel
from pydantic import BaseModel, field_serializer

from backend.data.event_bus import AsyncRedisEventBus
from backend.server.model import NotificationPayload
Expand All @@ -15,6 +15,11 @@ class NotificationEvent(BaseModel):
user_id: str
payload: NotificationPayload

@field_serializer("payload")
def serialize_payload(self, payload: NotificationPayload):
"""Ensure extra fields survive Redis serialization."""
return payload.model_dump()
Comment on lines +18 to +21
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes all fields of NotificationPayload subclasses serialized (in this case OnboardingNotificationPayload)



class AsyncRedisNotificationEventBus(AsyncRedisEventBus[NotificationEvent]):
Model = NotificationEvent # type: ignore
Expand Down
166 changes: 131 additions & 35 deletions autogpt_platform/backend/backend/data/onboarding.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
import re
from datetime import datetime
from typing import Any, Optional
from datetime import datetime, timedelta, timezone
from typing import Any, Literal, Optional
from zoneinfo import ZoneInfo

import prisma
import pydantic
from prisma.enums import OnboardingStep
from prisma.models import UserOnboarding
from prisma.types import UserOnboardingCreateInput, UserOnboardingUpdateInput

from backend.data import execution as execution_db
from backend.data.block import get_blocks
from backend.data.credit import get_user_credit_model
from backend.data.model import CredentialsMetaInput
from backend.data.notification_bus import (
AsyncRedisNotificationEventBus,
NotificationEvent,
)
from backend.data.user import get_user_by_id
from backend.server.model import OnboardingNotificationPayload
from backend.server.v2.store.model import StoreAgentDetails
from backend.util.cache import cached
from backend.util.json import SafeJson
from backend.util.timezone_utils import get_user_timezone_or_utc

# Mapping from user reason id to categories to search for when choosing agent to show
REASON_MAPPING: dict[str, list[str]] = {
Expand All @@ -31,9 +35,20 @@
POINTS_AGENT_COUNT = 50 # Number of agents to calculate points for
MIN_AGENT_COUNT = 2 # Minimum number of marketplace agents to enable onboarding

FrontendOnboardingStep = Literal[
OnboardingStep.WELCOME,
OnboardingStep.USAGE_REASON,
OnboardingStep.INTEGRATIONS,
OnboardingStep.AGENT_CHOICE,
OnboardingStep.AGENT_NEW_RUN,
OnboardingStep.AGENT_INPUT,
OnboardingStep.CONGRATS,
OnboardingStep.MARKETPLACE_VISIT,
OnboardingStep.BUILDER_OPEN,
]
Comment on lines +38 to +48
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a subset of OnboardingStep with only the step that are allowed to be completed frontend-side.



class UserOnboardingUpdate(pydantic.BaseModel):
completedSteps: Optional[list[OnboardingStep]] = None
walletShown: Optional[bool] = None
notified: Optional[list[OnboardingStep]] = None
usageReason: Optional[str] = None
Expand All @@ -42,9 +57,6 @@ class UserOnboardingUpdate(pydantic.BaseModel):
selectedStoreListingVersionId: Optional[str] = None
agentInput: Optional[dict[str, Any]] = None
onboardingAgentExecutionId: Optional[str] = None
agentRuns: Optional[int] = None
lastRunAt: Optional[datetime] = None
consecutiveRunDays: Optional[int] = None


async def get_user_onboarding(user_id: str):
Expand Down Expand Up @@ -83,14 +95,6 @@ async def reset_user_onboarding(user_id: str):
async def update_user_onboarding(user_id: str, data: UserOnboardingUpdate):
update: UserOnboardingUpdateInput = {}
onboarding = await get_user_onboarding(user_id)
if data.completedSteps is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Step completion has now dedicated endpoint.

update["completedSteps"] = list(
set(data.completedSteps + onboarding.completedSteps)
)
for step in data.completedSteps:
if step not in onboarding.completedSteps:
await _reward_user(user_id, onboarding, step)
await _send_onboarding_notification(user_id, step)
if data.walletShown:
update["walletShown"] = data.walletShown
if data.notified is not None:
Expand All @@ -107,12 +111,6 @@ async def update_user_onboarding(user_id: str, data: UserOnboardingUpdate):
update["agentInput"] = SafeJson(data.agentInput)
if data.onboardingAgentExecutionId is not None:
update["onboardingAgentExecutionId"] = data.onboardingAgentExecutionId
if data.agentRuns is not None and data.agentRuns > onboarding.agentRuns:
update["agentRuns"] = data.agentRuns
if data.lastRunAt is not None:
update["lastRunAt"] = data.lastRunAt
if data.consecutiveRunDays is not None:
update["consecutiveRunDays"] = data.consecutiveRunDays

return await UserOnboarding.prisma().upsert(
where={"userId": user_id},
Expand Down Expand Up @@ -161,14 +159,12 @@ async def _reward_user(user_id: str, onboarding: UserOnboarding, step: Onboardin
if step in onboarding.rewardedFor:
return

onboarding.rewardedFor.append(step)
user_credit_model = await get_user_credit_model(user_id)
await user_credit_model.onboarding_reward(user_id, reward, step)
await UserOnboarding.prisma().update(
where={"userId": user_id},
data={
"completedSteps": list(set(onboarding.completedSteps + [step])),
"rewardedFor": onboarding.rewardedFor,
"rewardedFor": list(set(onboarding.rewardedFor + [step])),
},
)

Expand All @@ -177,31 +173,53 @@ async def complete_onboarding_step(user_id: str, step: OnboardingStep):
"""
Completes the specified onboarding step for the user if not already completed.
"""

onboarding = await get_user_onboarding(user_id)
if step not in onboarding.completedSteps:
await update_user_onboarding(
user_id,
UserOnboardingUpdate(completedSteps=onboarding.completedSteps + [step]),
onboarding = await get_user_onboarding(user_id)
await UserOnboarding.prisma().update(
where={"userId": user_id},
data={
"completedSteps": list(set(onboarding.completedSteps + [step])),
},
)
await _reward_user(user_id, onboarding, step)
await _send_onboarding_notification(user_id, step)


async def _send_onboarding_notification(user_id: str, step: OnboardingStep):
async def _send_onboarding_notification(
user_id: str, step: OnboardingStep, event: str = "step_completed"
):
"""
Sends an onboarding notification to the user for the specified step.
Sends an onboarding notification to the user.
"""
payload = OnboardingNotificationPayload(
type="onboarding",
event="step_completed",
step=step.value,
event=event,
step=step,
)
await AsyncRedisNotificationEventBus().publish(
NotificationEvent(user_id=user_id, payload=payload)
)


def clean_and_split(text: str) -> list[str]:
async def complete_re_run_agent(user_id: str, graph_id: str) -> None:
"""
Complete RE_RUN_AGENT step when a user runs a graph they've run before.
Keeps overhead low by only counting executions if the step is still pending.
"""
onboarding = await get_user_onboarding(user_id)
if OnboardingStep.RE_RUN_AGENT in onboarding.completedSteps:
return

# Includes current execution, so count > 1 means there was at least one prior run.
previous_exec_count = await execution_db.get_graph_executions_count(
user_id=user_id, graph_id=graph_id
)
if previous_exec_count > 1:
await complete_onboarding_step(user_id, OnboardingStep.RE_RUN_AGENT)


def _clean_and_split(text: str) -> list[str]:
"""
Removes all special characters from a string, truncates it to 100 characters,
and splits it by whitespace and commas.
Expand All @@ -224,7 +242,7 @@ def clean_and_split(text: str) -> list[str]:
return words


def calculate_points(
def _calculate_points(
agent, categories: list[str], custom: list[str], integrations: list[str]
) -> int:
"""
Expand Down Expand Up @@ -282,13 +300,91 @@ def get_credentials_blocks() -> dict[str, str]:
CREDENTIALS_FIELDS: dict[str, str] = get_credentials_blocks()


def _normalize_datetime(value: datetime | None) -> datetime | None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following functions are to calculate run streak for the user for running agent, i.e. keep increasing by one every day and if one day is missed reset. The time is calculated in the user timezone.

if value is None:
return None
if value.tzinfo is None:
return value.replace(tzinfo=timezone.utc)
return value.astimezone(timezone.utc)


def _calculate_consecutive_run_days(
last_run_at: datetime | None, current_consecutive_days: int, user_timezone: str
) -> tuple[datetime, int]:
tz = ZoneInfo(user_timezone)
local_now = datetime.now(tz)
normalized_last_run = _normalize_datetime(last_run_at)

if normalized_last_run is None:
return local_now.astimezone(timezone.utc), 1

last_run_local = normalized_last_run.astimezone(tz)
last_run_date = last_run_local.date()
today = local_now.date()

if last_run_date == today:
return local_now.astimezone(timezone.utc), current_consecutive_days

if last_run_date == today - timedelta(days=1):
return local_now.astimezone(timezone.utc), current_consecutive_days + 1

return local_now.astimezone(timezone.utc), 1


def _get_run_milestone_steps(
new_run_count: int, consecutive_days: int
) -> list[OnboardingStep]:
milestones: list[OnboardingStep] = []
if new_run_count >= 10:
milestones.append(OnboardingStep.RUN_AGENTS)
if new_run_count >= 100:
milestones.append(OnboardingStep.RUN_AGENTS_100)
if consecutive_days >= 3:
milestones.append(OnboardingStep.RUN_3_DAYS)
if consecutive_days >= 14:
milestones.append(OnboardingStep.RUN_14_DAYS)
return milestones


async def _get_user_timezone(user_id: str) -> str:
user = await get_user_by_id(user_id)
return get_user_timezone_or_utc(user.timezone if user else None)


async def increment_runs(user_id: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how far are we trying to be careful with this,
It's safer to just increment the integer field than do this see Updating Atomic Fields in https://prisma-client-py.readthedocs.io/en/v0.1.0/reference/operations/#integer-fields_1

"""
Increment a user's run counters and trigger any onboarding milestones.
"""
user_timezone = await _get_user_timezone(user_id)
onboarding = await get_user_onboarding(user_id)
new_run_count = onboarding.agentRuns + 1
last_run_at, consecutive_run_days = _calculate_consecutive_run_days(
onboarding.lastRunAt, onboarding.consecutiveRunDays, user_timezone
)

await UserOnboarding.prisma().update(
where={"userId": user_id},
data={
"agentRuns": new_run_count,
"lastRunAt": last_run_at,
"consecutiveRunDays": consecutive_run_days,
},
)

milestones = _get_run_milestone_steps(new_run_count, consecutive_run_days)
new_steps = [step for step in milestones if step not in onboarding.completedSteps]

for step in new_steps:
await complete_onboarding_step(user_id, step)


async def get_recommended_agents(user_id: str) -> list[StoreAgentDetails]:
user_onboarding = await get_user_onboarding(user_id)
categories = REASON_MAPPING.get(user_onboarding.usageReason or "", [])

where_clause: dict[str, Any] = {}

custom = clean_and_split((user_onboarding.usageReason or "").lower())
custom = _clean_and_split((user_onboarding.usageReason or "").lower())

if categories:
where_clause["OR"] = [
Expand Down Expand Up @@ -336,7 +432,7 @@ async def get_recommended_agents(user_id: str) -> list[StoreAgentDetails]:
# Calculate points for the first X agents and choose the top 2
agent_points = []
for agent in storeAgents[:POINTS_AGENT_COUNT]:
points = calculate_points(
points = _calculate_points(
agent, categories, custom, user_onboarding.integrations
)
agent_points.append((agent, points))
Expand Down
2 changes: 2 additions & 0 deletions autogpt_platform/backend/backend/executor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from backend.data.block import BlockInput
from backend.data.execution import GraphExecutionWithNodes
from backend.data.model import CredentialsMetaInput
from backend.data.onboarding import increment_runs
from backend.executor import utils as execution_utils
from backend.monitoring import (
NotificationJobArgs,
Expand Down Expand Up @@ -153,6 +154,7 @@ async def _execute_graph(**kwargs):
inputs=args.input_data,
graph_credentials_inputs=args.input_credentials,
)
await increment_runs(args.user_id)
elapsed = asyncio.get_event_loop().time() - start_time
logger.info(
f"Graph execution started with ID {graph_exec.id} for graph {args.graph_id} "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
OAuth2Credentials,
UserIntegrations,
)
from backend.data.onboarding import OnboardingStep, complete_onboarding_step
from backend.data.onboarding import (
OnboardingStep,
complete_onboarding_step,
increment_runs,
)
from backend.data.user import get_user_integrations
from backend.executor.utils import add_graph_execution
from backend.integrations.ayrshare import AyrshareClient, SocialPlatform
Expand Down Expand Up @@ -377,6 +381,7 @@ async def webhook_ingress_generic(
return

await complete_onboarding_step(user_id, OnboardingStep.TRIGGER_WEBHOOK)
await increment_runs(user_id)

# Execute all triggers concurrently for better performance
tasks = []
Expand Down
12 changes: 10 additions & 2 deletions autogpt_platform/backend/backend/server/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import enum
from typing import Any, Optional
from typing import Any, Literal, Optional

import pydantic
from prisma.enums import OnboardingStep

from backend.data.api_key import APIKeyInfo, APIKeyPermission
from backend.data.graph import Graph
Expand Down Expand Up @@ -35,8 +36,13 @@ class WSSubscribeGraphExecutionsRequest(pydantic.BaseModel):
graph_id: str


GraphCreationSource = Literal["builder", "upload"]
GraphExecutionSource = Literal["builder", "library", "onboarding"]
Comment on lines +39 to +40
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need sources to know if an onboarding step should be completed; for example running agent from library shouldn't be completed when running onboarding agent or in the builder.



class CreateGraph(pydantic.BaseModel):
graph: Graph
source: GraphCreationSource | None = None


class CreateAPIKeyRequest(pydantic.BaseModel):
Expand Down Expand Up @@ -83,6 +89,8 @@ class NotificationPayload(pydantic.BaseModel):
type: str
event: str

model_config = pydantic.ConfigDict(extra="allow")


class OnboardingNotificationPayload(NotificationPayload):
step: str
step: OnboardingStep
Loading
Loading