Skip to content

Commit

Permalink
fix: start failures edge cases (#106)
Browse files Browse the repository at this point in the history
* chore: bump deno version

* chore: benchmark on sepolia

* fix: oracle demo error handling

* fix: memory usage test

* fix: improve http connect logic

* fix: improve http server robustness

* fix: proxy provider log

* fix: runner ports and startup time tracking

* chore: add changeset

* chore: bump version
  • Loading branch information
goums authored Jul 10, 2024
1 parent abed75f commit ff1ab2e
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 64 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @gelatonetwork/web3-functions-sdk

## 2.4.4

### Patch Changes

- a0f32bf: fix: start failures edge cases

## 2.4.2

### Patch Changes
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@gelatonetwork/web3-functions-sdk",
"version": "2.4.3",
"version": "2.4.4",
"description": "Gelato Automate Web3 Functions sdk",
"repository": {
"type": "git",
Expand Down Expand Up @@ -124,7 +124,7 @@
"body-parser": "^1.20.1",
"colors": "^1.4.0",
"deep-object-diff": "^1.1.9",
"deno-bin": "^1.36.0",
"deno-bin": "^1.44.4",
"dockerode": "^3.3.4",
"dotenv": "^16.0.3",
"esbuild": "^0.17.4",
Expand Down
2 changes: 1 addition & 1 deletion src/lib/binaries/benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const web3FunctionPath =
process.argv[3] ??
path.join(process.cwd(), "src", "web3-functions", "index.ts");
let operation: Web3FunctionOperation = "onRun";
let chainId = 5;
let chainId = 11155111;
let runtime: "docker" | "thread" = "thread";
let debug = false;
let showLogs = false;
Expand Down
52 changes: 34 additions & 18 deletions src/lib/net/Web3FunctionHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ export class Web3FunctionHttpClient extends EventEmitter {
const end = performance.now() + timeout;
let statusOk = false;
let lastErrMsg = "";
let nbTries = 0;
let connectTimeout = 1000;
while (!statusOk && !this._isStopped && performance.now() < end) {
nbTries++;
try {
const status = await new Promise<number>(async (resolve, reject) => {
const requestAbortController = new AbortController();
const timeoutId = setTimeout(() => {
connectTimeout += 100; // gradually increase the timeout for each retry
requestAbortController.abort();
reject(new Error("Timeout"));
}, 100);
reject(new Error(`Timeout after ${nbTries} tries`));
}, connectTimeout);
try {
const { statusCode } = await request(
`${this._host}:${this._port}/${this._mountPath}`,
Expand All @@ -54,7 +58,7 @@ export class Web3FunctionHttpClient extends EventEmitter {
statusOk = status === 200;
this._log(`Connected to Web3FunctionHttpServer socket!`);
} catch (err) {
let errMsg = `${err.message} `;
const errMsg = `${err.message} `;

lastErrMsg = errMsg;
await delay(retryInterval);
Expand All @@ -81,22 +85,34 @@ export class Web3FunctionHttpClient extends EventEmitter {

private async _send(event: Web3FunctionEvent) {
let res;

try {
const { body } = await request(
`${this._host}:${this._port}/${this._mountPath}`,
{
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(event),
dispatcher: new Agent({ pipelining: 0 }),
let retry = 0;
const maxRetry = 2;
do {
try {
const { body } = await request(
`${this._host}:${this._port}/${this._mountPath}`,
{
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(event),
dispatcher: new Agent({ pipelining: 0 }),
}
);
res = body;
} catch (err) {
const errMsg = err.toString();
if (retry >= maxRetry) {
throw new Error(`Web3FunctionHttpClient request error: ${errMsg}`);
} else {
retry++;
this._log(
`Web3FunctionHttpClient _send retry#${retry} request error: ${errMsg}`
);
await delay(100);
}
);
res = body;
} catch (err) {
const errMsg = err.toString();
throw new Error(`Web3FunctionHttpClient request error: ${errMsg}`);
}
}
} while (!res);

try {
const event = (await res.json()) as Web3FunctionEvent;
this._log(`Received Web3FunctionEvent: ${event.action}`);
Expand Down
35 changes: 23 additions & 12 deletions src/lib/net/Web3FunctionHttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,32 @@ export class Web3FunctionHttpServer {
this._log(`Listening on http://${conns.addr.hostname}:${conns.addr.port}`);

for await (const conn of conns) {
// eslint-disable-next-line @typescript-eslint/no-empty-function
let connectionReleaseResolver = () => {
// Intentionally left empty to use as variable
};
this._waitConnectionReleased = new Promise((resolve) => {
connectionReleaseResolver = () => {
resolve();
try {
// eslint-disable-next-line @typescript-eslint/no-empty-function
let connectionReleaseResolver = () => {
// Intentionally left empty to use as variable
};
});
this._waitConnectionReleased = new Promise((resolve) => {
connectionReleaseResolver = () => {
resolve();
};
});

for await (const e of Deno.serveHttp(conn)) {
const res = await this._onRequest(e.request, mountPath);
await e.respondWith(res);
for await (const e of Deno.serveHttp(conn)) {
try {
const res = await this._onRequest(e.request, mountPath);
await e.respondWith(res);
} catch (err) {
this._log(`Request Error: ${err.message}`);
await e.respondWith(
new Response(`Internal error: ${err.message}`, { status: 500 })
);
}
}
connectionReleaseResolver();
} catch (err) {
this._log(`Connection Error: ${err.message}`);
}
connectionReleaseResolver();
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/lib/provider/Web3FunctionProxyProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export class Web3FunctionProxyProvider {
}

public async start(port = 3000): Promise<void> {
this._proxyUrl = `${this._host}:${port}/${this._mountPath}`;
await new Promise<void>((resolve) => {
this._server = this._app.listen(port, () => {
this._log(`Listening on: ${this._proxyUrl}`);
Expand All @@ -136,8 +137,6 @@ export class Web3FunctionProxyProvider {
`/${this._mountPath}/:chainId`,
this._requestHandler.bind(this)
);

this._proxyUrl = `${this._host}:${port}/${this._mountPath}`;
}

public getNbRpcCalls(): { total: number; throttled: number } {
Expand Down
20 changes: 13 additions & 7 deletions src/lib/runtime/Web3FunctionRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export class Web3FunctionRunner {
private _httpProxy?: Web3FunctionHttpProxy;
private _client?: Web3FunctionHttpClient;
private _sandbox?: Web3FunctionAbstractSandbox;
private _startupTime = 0;
private _execTimeoutId?: NodeJS.Timeout;
private _memoryIntervalId?: NodeJS.Timeout;
// eslint-disable-next-line @typescript-eslint/no-empty-function
Expand Down Expand Up @@ -208,6 +209,7 @@ export class Web3FunctionRunner {
const logs: string[] = this._sandbox?.getLogs() ?? [];
const duration = (performance.now() - start) / 1000;
const memory = this._memory / 1024 / 1024;
const startup = Number(this._startupTime.toFixed());
const rpcCalls = this._proxyProvider?.getNbRpcCalls() ?? {
total: 0,
throttled: 0,
Expand All @@ -219,6 +221,7 @@ export class Web3FunctionRunner {
upload: 0,
};

this._log(`Startup time=${startup}ms ${startup > 1000 ? "(SLOW)" : ""}`);
this._log(`Runtime duration=${duration.toFixed(2)}s`);
this._log(`Runtime memory=${memory.toFixed(2)}mb`);
this._log(`Runtime rpc calls=${JSON.stringify(rpcCalls)}`);
Expand Down Expand Up @@ -375,16 +378,17 @@ export class Web3FunctionRunner {
options.requestLimit,
this._debug
);
const httpProxyPort = await Web3FunctionNetHelper.getAvailablePort(
this._portsOccupied
);
const httpProxyPort =
options.httpProxyPort ??
(await Web3FunctionNetHelper.getAvailablePort(this._portsOccupied));
this._httpProxy.start(httpProxyPort);

const mountPath = randomUUID();
const serverPort =
options.serverPort ??
(await Web3FunctionNetHelper.getAvailablePort(this._portsOccupied));

const start = performance.now();
try {
this._log(`Starting sandbox: ${script}`);
await this._sandbox.start(
Expand All @@ -411,10 +415,11 @@ export class Web3FunctionRunner {
multiChainProviderConfig,
this._debug
);
const proxyProviderPort = await Web3FunctionNetHelper.getAvailablePort(
this._portsOccupied
);
await this._proxyProvider.start(proxyProviderPort);

const rpcProxyPort =
options.rpcProxyPort ??
(await Web3FunctionNetHelper.getAvailablePort(this._portsOccupied));
await this._proxyProvider.start(rpcProxyPort);
context.rpcProviderUrl = this._proxyProvider.getProxyUrl();

// Override gelatoArgs according to schema version
Expand All @@ -436,6 +441,7 @@ export class Web3FunctionRunner {
this._client.connect(START_TIMEOUT),
this._sandbox?.waitForProcessEnd(), // Early exit if sandbox is crashed
]);
this._startupTime = performance.now() - start;
} catch (err) {
this._log(`Fail to connect to Web3Function ${err.message}`);
throw new Error(
Expand Down
16 changes: 11 additions & 5 deletions src/lib/runtime/Web3FunctionRunnerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class Web3FunctionRunnerPool {

public async init() {
this._tcpPortsAvailable = await Web3FunctionNetHelper.getAvailablePorts(
this._poolSize + 10
(this._poolSize + 5) * 3 // 3 ports per concurrent runner + 5 extra
);
}

Expand All @@ -35,22 +35,28 @@ export class Web3FunctionRunnerPool {
return new Promise((resolve, reject) => {
this._queuedRunners.push(async (): Promise<void> => {
this._activeRunners = this._activeRunners + 1;
const port = this._tcpPortsAvailable.shift();
const port1 = this._tcpPortsAvailable.shift();
const port2 = this._tcpPortsAvailable.shift();
const port3 = this._tcpPortsAvailable.shift();
try {
this._log(
`Starting Web3FunctionRunner, active=${this._activeRunners} port=${port}`
`Starting Web3FunctionRunner, active=${this._activeRunners} ports=${port1},${port2},${port3}`
);
const runner = new Web3FunctionRunner(
this._debug,
this._tcpPortsAvailable
);
payload.options.serverPort = port;
payload.options.serverPort = port1;
payload.options.httpProxyPort = port2;
payload.options.rpcProxyPort = port3;
const exec = await runner.run(operation, payload);
resolve(exec);
} catch (err) {
reject(err);
} finally {
if (port) this._tcpPortsAvailable.push(port);
if (port1) this._tcpPortsAvailable.push(port1);
if (port2) this._tcpPortsAvailable.push(port2);
if (port3) this._tcpPortsAvailable.push(port3);
this._activeRunners = this._activeRunners - 1;
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/lib/runtime/sandbox/__test__/exceed_memory_usage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const arr: string[] = [];
while (arr.length < 1_000_000) {
while (arr.length < 1_000_000_000) {
arr.push(`Do we have access to infinite memory?`);
}
2 changes: 2 additions & 0 deletions src/lib/runtime/types/Web3FunctionRunnerPayload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export interface Web3FunctionRunnerOptions {
storageLimit: number;
runtime: "thread" | "docker";
showLogs: boolean;
httpProxyPort?: number;
rpcProxyPort?: number;
serverPort?: number;
blacklistedHosts?: string[];
}
Expand Down
19 changes: 10 additions & 9 deletions src/web3-functions/oracle/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Contract } from "@ethersproject/contracts";
import {
Web3Function,
Web3FunctionContext,
} from "@gelatonetwork/web3-functions-sdk";
import { Contract } from "@ethersproject/contracts";
import ky from "ky"; // we recommend using ky as axios doesn't support fetch by default

const ORACLE_ABI = [
Expand All @@ -25,18 +25,19 @@ Web3Function.onRun(async (context: Web3FunctionContext) => {
oracle = new Contract(oracleAddress, ORACLE_ABI, provider);
lastUpdated = parseInt(await oracle.lastUpdated());
console.log(`Last oracle update: ${lastUpdated}`);

// Check if it's ready for a new update
const nextUpdateTime = lastUpdated + 300; // 5 min
const timestamp = (await provider.getBlock("latest")).timestamp;
console.log(`Next oracle update: ${nextUpdateTime}`);
if (timestamp < nextUpdateTime) {
return { canExec: false, message: `Time not elapsed` };
}
} catch (err) {
console.log(`Error: ${err.message}`);
return { canExec: false, message: `Rpc call failed` };
}

// Check if it's ready for a new update
const nextUpdateTime = lastUpdated + 300; // 5 min
const timestamp = (await provider.getBlock("latest")).timestamp;
console.log(`Next oracle update: ${nextUpdateTime}`);
if (timestamp < nextUpdateTime) {
return { canExec: false, message: `Time not elapsed` };
}

// Get current price on coingecko
const currency = userArgs.currency as string;
let price = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/web3-functions/oracle/userArgs.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"oracle": "0x6a3c82330164822A8a39C7C0224D20DB35DD030a",
"oracle": "0x71B9B0F6C999CBbB0FeF9c92B80D54e4973214da",
"currency": "ethereum"
}
11 changes: 5 additions & 6 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2956,13 +2956,12 @@ delayed-stream@~1.0.0:
resolved "https://registry.yarnpkg.com/delayed-stream/-/delayed-stream-1.0.0.tgz#df3ae199acadfb7d440aaae0b29e2272b24ec619"
integrity sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==

deno-bin@^1.36.0:
version "1.36.0"
resolved "https://registry.yarnpkg.com/deno-bin/-/deno-bin-1.36.0.tgz#35f61891438ef5d012dde04fc342032ecc9c56f7"
integrity sha512-gJlrpILuliLlBTxM1Fo/304KI2lU/Nr/g0KZw6T7xhfG01BKTwUA+1amTg/m+8/4y2QnKwQC5tdHSuA9UllJJw==
deno-bin@^1.44.4:
version "1.44.4"
resolved "https://registry.yarnpkg.com/deno-bin/-/deno-bin-1.44.4.tgz#065cdb214790def562d63898e45df26519a23da4"
integrity sha512-IJWKr7o4Qh7iB9yhNfvJURPsfifPqTcrmg45J95oS2vvDv3OdRCf8SNramL3cqcsYyLeQeTRzNj2QgmjH14oyw==
dependencies:
adm-zip "^0.5.4"
follow-redirects "^1.10.0"

[email protected]:
version "2.0.0"
Expand Down Expand Up @@ -3666,7 +3665,7 @@ flatted@^3.1.0:
resolved "https://registry.yarnpkg.com/flatted/-/flatted-3.2.7.tgz#609f39207cb614b89d0765b477cb2d437fbf9787"
integrity sha512-5nqDSxl8nn5BSNxyR3n4I6eDmbolI6WT+QqR547RwxQapgjQBmtktdP+HTBb/a/zLsbzERTONyUB5pefh5TtjQ==

follow-redirects@^1.10.0, follow-redirects@^1.12.1:
follow-redirects@^1.12.1:
version "1.15.2"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.2.tgz#b460864144ba63f2681096f274c4e57026da2c13"
integrity sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==
Expand Down

0 comments on commit ff1ab2e

Please sign in to comment.