Skip to content

Commit 8c5520e

Browse files
authored
Handle HyperSync logs query errors (#502)
* Handle HyperSync logs query errors * Remove only
1 parent f3409fb commit 8c5520e

File tree

12 files changed

+365
-404
lines changed

12 files changed

+365
-404
lines changed

codegenerator/cli/templates/static/codegen/src/ErrorHandling.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
type t = {logger: Pino.t, exn: exn, msg: option<string>}
22

33
let prettifyExn = exn => {
4-
switch exn {
4+
switch exn->Js.Exn.anyToExnInternal {
55
| Js.Exn.Error(e) => e->(Utils.magic: Js.Exn.t => exn)
66
| exn => exn
77
}

codegenerator/cli/templates/static/codegen/src/eventFetching/Source.res

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type blockRangeFetchResponse = {
2222

2323
type getItemsRetry =
2424
| WithSuggestedToBlock({toBlock: int})
25-
| WithBackoff({backoffMillis: int})
25+
| WithBackoff({message: string, backoffMillis: int})
2626

2727
type getItemsError =
2828
| UnsupportedSelection({message: string})
@@ -52,6 +52,7 @@ type t = {
5252
~currentBlockHeight: int,
5353
~partitionId: string,
5454
~selection: FetchState.selection,
55+
~retry: int,
5556
~logger: Pino.t,
5657
) => promise<blockRangeFetchResponse>,
5758
}

codegenerator/cli/templates/static/codegen/src/eventFetching/SourceManager.res

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ type t = {
1919

2020
let getActiveSource = sourceManager => sourceManager.activeSource
2121

22-
let makeGetHeightRetryInterval = (~initialRetryInterval, ~backoffMultiplicative, ~maxRetryInterval) => {
22+
let makeGetHeightRetryInterval = (
23+
~initialRetryInterval,
24+
~backoffMultiplicative,
25+
~maxRetryInterval,
26+
) => {
2327
(~retry: int) => {
2428
let backoff = if retry === 0 {
2529
1
@@ -35,7 +39,11 @@ let make = (
3539
~maxPartitionConcurrency,
3640
~newBlockFallbackStallTimeout=20_000,
3741
~stalledPollingInterval=5_000,
38-
~getHeightRetryInterval=makeGetHeightRetryInterval(~initialRetryInterval=1000, ~backoffMultiplicative=2, ~maxRetryInterval=60_000),
42+
~getHeightRetryInterval=makeGetHeightRetryInterval(
43+
~initialRetryInterval=1000,
44+
~backoffMultiplicative=2,
45+
~maxRetryInterval=60_000,
46+
),
3947
) => {
4048
let initialActiveSource = switch sources->Js.Array2.find(source => source.sourceFor === Sync) {
4149
| None => Js.Exn.raiseError("Invalid configuration, no data-source for historical sync provided")
@@ -244,6 +252,10 @@ let getNextSyncSource = (
244252
sourceManager,
245253
// This is needed to include the Fallback source to rotation
246254
~initialSource,
255+
// After multiple failures start returning fallback sources as well
256+
// But don't try it when main sync sources fail because of invalid configuration
257+
// note: The logic might be changed in the future
258+
~attemptFallbacks=false,
247259
) => {
248260
let before = []
249261
let after = []
@@ -256,7 +268,7 @@ let getNextSyncSource = (
256268
} else if (
257269
switch source.sourceFor {
258270
| Sync => true
259-
| Fallback => source === initialSource
271+
| Fallback => attemptFallbacks || source === initialSource
260272
}
261273
) {
262274
(hasActive.contents ? after : before)->Array.push(source)
@@ -291,11 +303,13 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
291303
},
292304
)
293305
let responseRef = ref(None)
306+
let retryRef = ref(0)
294307
let initialSource = sourceManager.activeSource
295308

296309
while responseRef.contents->Option.isNone {
297310
let source = sourceManager.activeSource
298311
let toBlock = toBlockRef.contents
312+
let retry = retryRef.contents
299313

300314
let logger = Logging.createChild(
301315
~params={
@@ -306,6 +320,7 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
306320
"fromBlock": query.fromBlock,
307321
"toBlock": toBlock,
308322
"addresses": addresses,
323+
"retry": retry,
309324
},
310325
)
311326

@@ -317,6 +332,7 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
317332
~partitionId=query.partitionId,
318333
~currentBlockHeight,
319334
~selection=query.selection,
335+
~retry,
320336
~logger,
321337
)
322338
logger->Logging.childTrace({
@@ -339,7 +355,10 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
339355
let notAlreadyDeleted = sourceManager.sources->Utils.Set.delete(source)
340356

341357
if nextSource === source {
342-
exn->ErrorHandling.mkLogAndRaise(~logger, ~msg="The indexer doesn't have data-sources which can continue fetching. Please, check the error logs or reach out to the Envio team.")
358+
exn->ErrorHandling.mkLogAndRaise(
359+
~logger,
360+
~msg="The indexer doesn't have data-sources which can continue fetching. Please, check the error logs or reach out to the Envio team.",
361+
)
343362
} else {
344363
// In case there are multiple partitions
345364
// failing at the same time. Log only once
@@ -361,33 +380,60 @@ let executeQuery = async (sourceManager: t, ~query: FetchState.query, ~currentBl
361380
"source": nextSource.name,
362381
})
363382
sourceManager.activeSource = nextSource
383+
retryRef := 0
364384
}
365385
}
366386
| FailedGettingItems({attemptedToBlock, retry: WithSuggestedToBlock({toBlock})}) =>
367387
logger->Logging.childTrace({
368-
"msg": "Failed getting data for the block range. Retrying with the suggested block range from response.",
388+
"msg": "Failed getting data for the block range. Immediately retrying with the suggested block range from response.",
369389
"toBlock": attemptedToBlock,
370390
"suggestedToBlock": toBlock,
371391
})
372392
toBlockRef := Some(toBlock)
373-
| FailedGettingItems({exn, attemptedToBlock, retry: WithBackoff({backoffMillis})}) =>
374-
let nextSource = sourceManager->getNextSyncSource(~initialSource)
375-
let hasAnotherSyncSource = nextSource !== source
376-
logger->Logging.childTrace({
377-
"msg": `Failed getting data for the block range. Will try smaller block range for the next attempt.`,
393+
retryRef := 0
394+
| FailedGettingItems({exn, attemptedToBlock, retry: WithBackoff({message, backoffMillis})}) =>
395+
// Starting from the 11th failure (retry=10)
396+
// include fallback sources for switch
397+
// (previously it would consider only sync sources or the initial one)
398+
// This is a little bit tricky to find the right number,
399+
// because meaning between RPC and HyperSync is different for the error
400+
// but since Fallback was initially designed to be used only for height check
401+
// just keep the value high
402+
let attemptFallbacks = retry >= 10
403+
404+
let nextSource = switch retry {
405+
// Don't attempt a switch on first two failure
406+
| 0 | 1 => source
407+
| _ =>
408+
// Then try to switch every second failure
409+
if retry->mod(2) === 0 {
410+
sourceManager->getNextSyncSource(~initialSource, ~attemptFallbacks)
411+
} else {
412+
source
413+
}
414+
}
415+
416+
// Start displaying warnings after 4 failures
417+
let log = retry >= 4 ? Logging.childWarn : Logging.childTrace
418+
logger->log({
419+
"msg": message,
378420
"toBlock": attemptedToBlock,
379421
"backOffMilliseconds": backoffMillis,
380-
"err": exn,
422+
"retry": retry,
423+
"err": exn->ErrorHandling.prettifyExn,
381424
})
382-
if hasAnotherSyncSource {
425+
426+
let shouldSwitch = nextSource !== source
427+
if shouldSwitch {
383428
logger->Logging.childInfo({
384429
"msg": "Switching to another data-source",
385430
"source": nextSource.name,
386431
})
387432
sourceManager.activeSource = nextSource
388433
} else {
389-
await Utils.delay(backoffMillis)
434+
await Utils.delay(Pervasives.min(backoffMillis, 60_000))
390435
}
436+
retryRef := retryRef.contents + 1
391437
}
392438
// TODO: Handle more error cases and hang/retry instead of throwing
393439
| exn => exn->ErrorHandling.mkLogAndRaise(~logger, ~msg="Failed to fetch block Range")

codegenerator/cli/templates/static/codegen/src/eventFetching/hyperfuel/HyperFuel.res

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,15 @@ let queryErrorToMsq = (e: queryError): string => {
5959
}
6060
}
6161

62-
exception UnexpectedMissingParamsExn(missingParams)
63-
6462
type queryResponse<'a> = result<'a, queryError>
6563

66-
module LogsQuery = {
64+
module GetLogs = {
65+
type error =
66+
| UnexpectedMissingParams({missingParams: array<string>})
67+
| WrongInstance
68+
69+
exception Error(error)
70+
6771
let makeRequestBody = (
6872
~fromBlock,
6973
~toBlockInclusive,
@@ -103,10 +107,11 @@ module LogsQuery = {
103107
| Some(v) => v
104108
| None =>
105109
raise(
106-
UnexpectedMissingParamsExn({
107-
queryName: "queryLogsPage HyperFuel",
108-
missingParams: [name],
109-
}),
110+
Error(
111+
UnexpectedMissingParams({
112+
missingParams: [name],
113+
}),
114+
),
110115
)
111116
}
112117
}
@@ -153,24 +158,17 @@ module LogsQuery = {
153158
items
154159
}
155160

156-
let convertResponse = (res: HyperFuelClient.queryResponseTyped): queryResponse<logsQueryPage> => {
157-
try {
158-
let {nextBlock, ?archiveHeight} = res
159-
let page: logsQueryPage = {
160-
items: res.data->decodeLogQueryPageItems,
161-
nextBlock,
162-
archiveHeight: archiveHeight->Option.getWithDefault(0), // TODO: FIXME: Shouldn't have a default here
163-
}
164-
165-
Ok(page)
166-
} catch {
167-
| UnexpectedMissingParamsExn(err) => Error(UnexpectedMissingParams(err))
161+
let convertResponse = (res: HyperFuelClient.queryResponseTyped): logsQueryPage => {
162+
let {nextBlock, ?archiveHeight} = res
163+
let page: logsQueryPage = {
164+
items: res.data->decodeLogQueryPageItems,
165+
nextBlock,
166+
archiveHeight: archiveHeight->Option.getWithDefault(0), // TODO: FIXME: Shouldn't have a default here
168167
}
168+
page
169169
}
170170

171-
let queryLogsPage = async (~serverUrl, ~fromBlock, ~toBlock, ~recieptsSelection): queryResponse<
172-
logsQueryPage,
173-
> => {
171+
let query = async (~serverUrl, ~fromBlock, ~toBlock, ~recieptsSelection): logsQueryPage => {
174172
let query: HyperFuelClient.QueryTypes.query = makeRequestBody(
175173
~fromBlock,
176174
~toBlockInclusive=toBlock,
@@ -179,23 +177,11 @@ module LogsQuery = {
179177

180178
let hyperFuelClient = CachedClients.getClient(serverUrl)
181179

182-
let logger = Logging.createChild(
183-
~params={"type": "hypersync query", "fromBlock": fromBlock, "serverUrl": serverUrl},
184-
)
185-
186-
let executeQuery = async () => {
187-
let res = await hyperFuelClient->HyperFuelClient.getSelectedData(query)
188-
if res.nextBlock <= fromBlock {
189-
// Might happen when /height response was from another instance of HyperSync
190-
Js.Exn.raiseError(
191-
"Received page response from another instance of HyperFuel. Should work after a retry.",
192-
)
193-
}
194-
res
180+
let res = await hyperFuelClient->HyperFuelClient.getSelectedData(query)
181+
if res.nextBlock <= fromBlock {
182+
// Might happen when /height response was from another instance of HyperSync
183+
raise(Error(WrongInstance))
195184
}
196-
197-
let res = await executeQuery->Time.retryAsyncWithExponentialBackOff(~logger)
198-
199185
res->convertResponse
200186
}
201187
}
@@ -264,7 +250,6 @@ module BlockData = {
264250
}
265251
}
266252

267-
let queryLogsPage = LogsQuery.queryLogsPage
268253
let queryBlockData = BlockData.queryBlockData
269254

270255
let heightRoute = Rest.route(() => {

codegenerator/cli/templates/static/codegen/src/eventFetching/hyperfuel/HyperFuel.resi

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,31 @@ type missingParams = {
2929
queryName: string,
3030
missingParams: array<string>,
3131
}
32-
type queryError =
33-
UnexpectedMissingParams(missingParams)
34-
35-
exception UnexpectedMissingParamsExn(missingParams)
32+
type queryError = UnexpectedMissingParams(missingParams)
3633

3734
let queryErrorToMsq: queryError => string
3835

3936
type queryResponse<'a> = result<'a, queryError>
40-
let queryLogsPage: (
41-
~serverUrl: string,
42-
~fromBlock: int,
43-
~toBlock: option<int>,
44-
~recieptsSelection: array<HyperFuelClient.QueryTypes.receiptSelection>,
45-
) => promise<queryResponse<logsQueryPage>>
37+
38+
module GetLogs: {
39+
type error =
40+
| UnexpectedMissingParams({missingParams: array<string>})
41+
| WrongInstance
42+
43+
exception Error(error)
44+
45+
let query: (
46+
~serverUrl: string,
47+
~fromBlock: int,
48+
~toBlock: option<int>,
49+
~recieptsSelection: array<HyperFuelClient.QueryTypes.receiptSelection>,
50+
) => promise<logsQueryPage>
51+
}
4652

4753
let queryBlockData: (
4854
~serverUrl: string,
4955
~blockNumber: int,
5056
~logger: Pino.t,
5157
) => promise<option<ReorgDetection.blockDataWithTimestamp>>
5258

53-
let heightRoute: Rest.route<(), int>
59+
let heightRoute: Rest.route<unit, int>

0 commit comments

Comments
 (0)