Skip to content

Commit 8fccf2e

Browse files
authored
fix(platform/builder): Add heartbeat mechanism (#8665)
* add heartbeat mechanism * formatting data * import List * another import fix * wip * formatting adn linting
1 parent 1f34f78 commit 8fccf2e

File tree

3 files changed

+73
-9
lines changed

3 files changed

+73
-9
lines changed

autogpt_platform/backend/backend/server/model.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import enum
2-
import typing
2+
from typing import Any, List, Optional, Union
33

44
import pydantic
55

@@ -12,11 +12,12 @@ class Methods(enum.Enum):
1212
UNSUBSCRIBE = "unsubscribe"
1313
EXECUTION_EVENT = "execution_event"
1414
ERROR = "error"
15+
HEARTBEAT = "heartbeat"
1516

1617

1718
class WsMessage(pydantic.BaseModel):
1819
method: Methods
19-
data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None
20+
data: Optional[Union[dict[str, Any], list[Any], str]] = None
2021
success: bool | None = None
2122
channel: str | None = None
2223
error: str | None = None
@@ -40,8 +41,8 @@ class CreateGraph(pydantic.BaseModel):
4041

4142
class CreateAPIKeyRequest(pydantic.BaseModel):
4243
name: str
43-
permissions: typing.List[APIKeyPermission]
44-
description: typing.Optional[str] = None
44+
permissions: List[APIKeyPermission]
45+
description: Optional[str] = None
4546

4647

4748
class CreateAPIKeyResponse(pydantic.BaseModel):
@@ -54,4 +55,4 @@ class SetGraphActiveVersion(pydantic.BaseModel):
5455

5556

5657
class UpdatePermissionsRequest(pydantic.BaseModel):
57-
permissions: typing.List[APIKeyPermission]
58+
permissions: List[APIKeyPermission]

autogpt_platform/backend/backend/server/ws_api.py

+7
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ async def websocket_router(
138138
while True:
139139
data = await websocket.receive_text()
140140
message = WsMessage.model_validate_json(data)
141+
142+
if message.method == Methods.HEARTBEAT:
143+
await websocket.send_json(
144+
{"method": Methods.HEARTBEAT.value, "data": "pong", "success": True}
145+
)
146+
continue
147+
141148
if message.method == Methods.SUBSCRIBE:
142149
await handle_subscribe(websocket, manager, message)
143150

autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts

+60-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ export default class BaseAutoGPTServerAPI {
2828
private wsConnecting: Promise<void> | null = null;
2929
private wsMessageHandlers: Record<string, Set<(data: any) => void>> = {};
3030
private supabaseClient: SupabaseClient | null = null;
31+
heartbeatInterval: number | null = null;
32+
readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds
33+
readonly HEARTBEAT_TIMEOUT = 10000; // 10 seconds
34+
heartbeatTimeoutId: number | null = null;
3135

3236
constructor(
3337
baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL ||
@@ -324,34 +328,84 @@ export default class BaseAutoGPTServerAPI {
324328
}
325329
}
326330

331+
startHeartbeat() {
332+
this.stopHeartbeat();
333+
this.heartbeatInterval = window.setInterval(() => {
334+
if (this.webSocket?.readyState === WebSocket.OPEN) {
335+
this.webSocket.send(
336+
JSON.stringify({
337+
method: "heartbeat",
338+
data: "ping",
339+
success: true,
340+
}),
341+
);
342+
343+
this.heartbeatTimeoutId = window.setTimeout(() => {
344+
console.log("Heartbeat timeout - reconnecting");
345+
this.webSocket?.close();
346+
this.connectWebSocket();
347+
}, this.HEARTBEAT_TIMEOUT);
348+
}
349+
}, this.HEARTBEAT_INTERVAL);
350+
}
351+
352+
stopHeartbeat() {
353+
if (this.heartbeatInterval) {
354+
clearInterval(this.heartbeatInterval);
355+
this.heartbeatInterval = null;
356+
}
357+
if (this.heartbeatTimeoutId) {
358+
clearTimeout(this.heartbeatTimeoutId);
359+
this.heartbeatTimeoutId = null;
360+
}
361+
}
362+
363+
handleHeartbeatResponse() {
364+
if (this.heartbeatTimeoutId) {
365+
clearTimeout(this.heartbeatTimeoutId);
366+
this.heartbeatTimeoutId = null;
367+
}
368+
}
369+
327370
async connectWebSocket(): Promise<void> {
328371
this.wsConnecting ??= new Promise(async (resolve, reject) => {
329372
try {
330373
const token =
331374
(await this.supabaseClient?.auth.getSession())?.data.session
332375
?.access_token || "";
333-
334376
const wsUrlWithToken = `${this.wsUrl}?token=${token}`;
335377
this.webSocket = new WebSocket(wsUrlWithToken);
336378

337379
this.webSocket.onopen = () => {
338-
console.debug("WebSocket connection established");
380+
console.log("WebSocket connection established");
381+
this.startHeartbeat(); // Start heartbeat when connection opens
339382
resolve();
340383
};
341384

342385
this.webSocket.onclose = (event) => {
343-
console.debug("WebSocket connection closed", event);
386+
console.log("WebSocket connection closed", event);
387+
this.stopHeartbeat(); // Stop heartbeat when connection closes
344388
this.webSocket = null;
389+
// Attempt to reconnect after a delay
390+
setTimeout(() => this.connectWebSocket(), 1000);
345391
};
346392

347393
this.webSocket.onerror = (error) => {
348394
console.error("WebSocket error:", error);
395+
this.stopHeartbeat(); // Stop heartbeat on error
349396
reject(error);
350397
};
351398

352399
this.webSocket.onmessage = (event) => {
353400
const message: WebsocketMessage = JSON.parse(event.data);
354-
if (message.method == "execution_event") {
401+
402+
// Handle heartbeat response
403+
if (message.method === "heartbeat" && message.data === "pong") {
404+
this.handleHeartbeatResponse();
405+
return;
406+
}
407+
408+
if (message.method === "execution_event") {
355409
message.data = parseNodeExecutionResultTimestamps(message.data);
356410
}
357411
this.wsMessageHandlers[message.method]?.forEach((handler) =>
@@ -367,6 +421,7 @@ export default class BaseAutoGPTServerAPI {
367421
}
368422

369423
disconnectWebSocket() {
424+
this.stopHeartbeat(); // Stop heartbeat when disconnecting
370425
if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
371426
this.webSocket.close();
372427
}
@@ -423,6 +478,7 @@ type GraphCreateRequestBody =
423478
type WebsocketMessageTypeMap = {
424479
subscribe: { graph_id: string };
425480
execution_event: NodeExecutionResult;
481+
heartbeat: "ping" | "pong";
426482
};
427483

428484
type WebsocketMessage = {

0 commit comments

Comments
 (0)