Skip to content
This repository was archived by the owner on Feb 26, 2024. It is now read-only.

Commit 926e213

Browse files
added retry mechanism considering in flight requests
1 parent 12f9759 commit 926e213

File tree

2 files changed

+171
-35
lines changed

2 files changed

+171
-35
lines changed

src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts

Lines changed: 92 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@ import { JsonRpcResponse, JsonRpcError } from "@ganache/utils";
99

1010
const { JSONRPC_PREFIX } = BaseHandler;
1111

12+
export type RetryConfiguration = {
13+
retryIntervalBaseInSeconds: number;
14+
retryCounter: number;
15+
};
16+
1217
export class WsHandler extends BaseHandler implements Handler {
1318
private open: Promise<unknown>;
1419
private connection: WebSocket;
@@ -20,45 +25,80 @@ export class WsHandler extends BaseHandler implements Handler {
2025
}>
2126
>();
2227

28+
// queue requests when connection is closed.
29+
private delayedRequestsQueue = [];
30+
// flag to identify if adhoc reconnection attempt.
31+
private adhocReconnectionRequest = false;
32+
2333
// retry configuration
24-
private retryIntervalBase: number = 2;
2534
private retryCounter: number = 3;
26-
private initialRetryCounter = this.retryCounter;
35+
private retryIntervalBaseInSeconds: number = 2;
36+
private initialRetryCounter: number;
2737
private retryTimeoutId: NodeJS.Timeout;
2838

29-
constructor(options: EthereumInternalOptions, abortSignal: AbortSignal) {
39+
// socket configuration
40+
private url: string;
41+
private origin: string;
42+
private logging: EthereumInternalOptions["logging"];
43+
44+
constructor(
45+
options: EthereumInternalOptions,
46+
abortSignal: AbortSignal,
47+
retryConfiguration?: RetryConfiguration | undefined
48+
) {
3049
super(options, abortSignal);
3150

3251
const {
3352
fork: { url, origin },
3453
logging
3554
} = options;
55+
this.url = url.toString();
56+
this.origin = origin;
57+
this.logging = logging;
58+
59+
// set retry configuration values
60+
if (retryConfiguration) {
61+
this.retryCounter = retryConfiguration.retryCounter;
62+
this.initialRetryCounter = retryConfiguration.retryIntervalBaseInSeconds;
63+
}
64+
this.initialRetryCounter = this.retryCounter;
3665

37-
this.open = this.connect(url.toString(), origin, logging);
38-
this.connection.onclose = () => {
66+
const onCloseEvent = () => {
3967
// try to connect again...
4068
// backoff and eventually fail
41-
if( this.retryCounter > 0 ) {
42-
clearTimeout( this.retryTimeoutId );
43-
this.retryTimeoutId = setTimeout( () => {
44-
this.reconnect(url.toString(), origin, logging);
45-
}, Math.pow( this.retryIntervalBase, this.initialRetryCounter - this.retryCounter ) * 1000 );
46-
this.retryCounter--;
47-
}
69+
// do not schedule reconnection for adhoc reconnection requests
70+
if (this.retryCounter === 0) {
71+
throw new Error("Connection to Infura has failed. Try again");
72+
} else {
73+
if (!this.adhocReconnectionRequest) {
74+
this.retryCounter--;
75+
clearTimeout(this.retryTimeoutId);
76+
this.retryTimeoutId = setTimeout(async () => {
77+
this.reconnect(this.url, this.origin, false);
78+
}, Math.pow(this.retryIntervalBaseInSeconds, this.initialRetryCounter - this.retryCounter) * 1000);
79+
}
80+
}
4881
};
82+
this.open = this.connect(this.url, this.origin, onCloseEvent);
4983
this.abortSignal.addEventListener("abort", () => {
5084
this.connection.onclose = null;
5185
this.connection.close(1000);
5286
});
53-
this.connection.onmessage = this.onMessage.bind(this);
5487
}
5588

5689
public async request<T>(
5790
method: string,
5891
params: unknown[],
5992
options = { disableCache: false }
6093
) {
61-
await this.open;
94+
try {
95+
await this.open;
96+
} catch (er) {
97+
this.logging.logger.log("Connection to Infura has failed");
98+
// skip the reconnection if connection is being made
99+
if (this.connection.readyState !== this.connection.CONNECTING)
100+
this.reconnect(this.url, this.origin, true);
101+
}
62102
if (this.abortSignal.aborted) return Promise.reject(new AbortError());
63103

64104
const key = JSON.stringify({ method, params });
@@ -76,7 +116,13 @@ export class WsHandler extends BaseHandler implements Handler {
76116
// Issue: https://github.com/trufflesuite/ganache/issues/3478
77117
this.inFlightRequests.set(messageId, deferred);
78118

79-
this.connection.send(`${JSONRPC_PREFIX}${messageId},${key.slice(1)}`);
119+
// if connection is alive send request else delay the request
120+
const data = `${JSONRPC_PREFIX}${messageId},${key.slice(1)}`;
121+
if (this.connection && this.connection.readyState === 1) {
122+
this.connection.send(data);
123+
} else {
124+
this.delayRequest(data);
125+
}
80126
return deferred.promise.finally(() => this.requestCache.delete(key));
81127
};
82128
return await this.queueRequest<T>(method, params, key, send, options);
@@ -100,11 +146,7 @@ export class WsHandler extends BaseHandler implements Handler {
100146
}
101147
}
102148

103-
private connect(
104-
url: string,
105-
origin: string,
106-
logging: EthereumInternalOptions["logging"]
107-
) {
149+
private connect(url: string, origin: string, onCloseEvent: any) {
108150
this.connection = new WebSocket(url, {
109151
origin,
110152
headers: this.headers
@@ -119,30 +161,45 @@ export class WsHandler extends BaseHandler implements Handler {
119161
// If you need to change this, you probably need to change our `onMessage`
120162
// handler too.
121163
this.connection.binaryType = "nodebuffer";
164+
this.connection.onclose = onCloseEvent;
165+
this.connection.onmessage = this.onMessage.bind(this);
122166
let open = new Promise((resolve, reject) => {
123167
this.connection.onopen = resolve;
124168
this.connection.onerror = reject;
125169
});
126-
open.then(
127-
() => {
128-
this.connection.onopen = null;
129-
this.connection.onerror = null;
130-
// reset the retry counter
131-
this.retryCounter = this.initialRetryCounter;
132-
},
133-
err => {
134-
logging.logger.log(err);
135-
}
136-
);
170+
open.then(() => {
171+
this.connection.onopen = null;
172+
this.connection.onerror = null;
173+
// reset the retry counter and any timeouts scheduled for retries
174+
this.retryCounter = this.initialRetryCounter;
175+
clearTimeout(this.retryTimeoutId);
176+
177+
this.adhocReconnectionRequest = false;
178+
// process delayed requests which were queued at the time of connection failure
179+
this.sendDelayedRequests();
180+
});
137181
return open;
138182
}
139183

140-
private reconnect (url: string,
184+
private reconnect(
185+
url: string,
141186
origin: string,
142-
logging: EthereumInternalOptions["logging"]) {
187+
adhocReconnectionRequest: boolean = false
188+
) {
189+
this.adhocReconnectionRequest = adhocReconnectionRequest;
143190
const onCloseEvent = this.connection.onclose;
144-
this.open = this.connect(url, origin, logging);
145-
this.connection.onclose = onCloseEvent;
191+
this.open = this.connect(url, origin, onCloseEvent);
192+
}
193+
194+
private delayRequest(request: any) {
195+
this.delayedRequestsQueue.push(request);
196+
}
197+
198+
private sendDelayedRequests() {
199+
while (this.delayedRequestsQueue.length > 0) {
200+
const request = this.delayedRequestsQueue.pop();
201+
this.connection.send(request);
202+
}
146203
}
147204

148205
public async close() {
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import assert from "assert";
2+
import AbortController from "abort-controller";
3+
import { WsHandler } from "../../../src/forking/handlers/ws-handler";
4+
import {
5+
EthereumOptionsConfig,
6+
EthereumProviderOptions
7+
} from "@ganache/ethereum-options";
8+
import WebSocket from "ws";
9+
10+
const createWebSocketServer = (port: number): WebSocket.Server => {
11+
let wsServer = new WebSocket.Server({ port });
12+
wsServer.on("connection", async ws => {
13+
ws.on("message", data => {
14+
const message = JSON.parse(data.toString());
15+
ws.send(
16+
Buffer.from(
17+
JSON.stringify({
18+
id: message.id,
19+
jsonrpc: "2.0",
20+
result: "0x0"
21+
}),
22+
"utf-8"
23+
)
24+
);
25+
if (message.method === "client-disconnect") {
26+
setTimeout(() => {
27+
ws.terminate();
28+
}, 10);
29+
}
30+
});
31+
});
32+
return wsServer;
33+
};
34+
35+
// create test server
36+
const URL = "ws://localhost:1001/";
37+
let wsServer: WebSocket.Server;
38+
let wsHandler: WsHandler;
39+
wsServer = createWebSocketServer(1001);
40+
41+
describe("ws-handler", function () {
42+
describe("retries", function () {
43+
before(() => {
44+
const providerOptions = EthereumOptionsConfig.normalize({
45+
fork: {
46+
url: URL,
47+
origin: "test"
48+
}
49+
} as EthereumProviderOptions);
50+
const abortController: AbortController = new AbortController();
51+
wsHandler = new WsHandler(providerOptions, abortController.signal, {
52+
retryCounter: 4,
53+
retryIntervalBaseInSeconds: 3
54+
});
55+
});
56+
57+
after(() => {
58+
wsHandler.close();
59+
wsServer.close();
60+
});
61+
62+
it("should attempt to reconnect the server when connection is terminated", async () => {
63+
// send a request to websocket server to get connection termination.
64+
await wsHandler.request<any>("client-disconnect", [], {
65+
disableCache: true
66+
});
67+
await new Promise(resolve => setTimeout(resolve, 100));
68+
69+
// send request after connection is terminated
70+
const retryPromise = wsHandler.request<any>("retry", [], {
71+
disableCache: true
72+
});
73+
74+
// assert the result
75+
const response = await retryPromise;
76+
assert.equal(response, "0x0");
77+
}).timeout(10000);
78+
});
79+
});

0 commit comments

Comments
 (0)