Skip to content

Commit 22bd9dd

Browse files
authored
Merge pull request #92 from autonomys/fix-blocking-batches
fix: blocking batches
2 parents 29efa07 + 16172e3 commit 22bd9dd

File tree

3 files changed

+85
-34
lines changed

3 files changed

+85
-34
lines changed

services/file-retriever/src/services/batchOptimizer.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { config } from '../config.js'
22
import { ObjectMapping } from './objectMappingIndexer.js'
33

4+
const maxObjectsPerBatch = Math.min(
5+
config.maxObjectsPerFetch,
6+
config.maxSimultaneousFetches,
7+
)
8+
49
export const optimizeBatchFetch = (
510
objects: ObjectMapping[],
611
): ObjectMapping[][] => {
@@ -21,7 +26,7 @@ export const optimizeBatchFetch = (
2126
pieceIndex === safePieceIndex ||
2227
pieceIndex === safePieceIndex + 1 ||
2328
lastPieceIndex === null
24-
const isFull = currentBatch.length === config.maxObjectsPerFetch
29+
const isFull = currentBatch.length === maxObjectsPerBatch
2530

2631
// if pieces are not consecutive, they're not sharing the same piece
2732
if (isSameOrConsecutive && !isFull) {
@@ -43,7 +48,7 @@ export const optimizeBatchFetch = (
4348
return optimizedObjects.reduce(
4449
(acc, curr) => {
4550
const lastBatch = acc[acc.length - 1]
46-
if (lastBatch.length + curr.length <= config.maxObjectsPerFetch) {
51+
if (lastBatch.length + curr.length <= maxObjectsPerBatch) {
4752
acc[acc.length - 1] = lastBatch.concat(curr)
4853
} else {
4954
acc.push(curr)

services/file-retriever/src/services/dsnFetcher.ts

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { fromEntries, promiseAll } from '../utils/array.js'
2525
import { weightedRequestConcurrencyController } from '@autonomys/asynchronous'
2626
import { optimizeBatchFetch } from './batchOptimizer.js'
2727
import { readableToStream } from '../utils/stream.js'
28+
import { withRetries } from '../utils/retries.js'
2829

2930
const fetchNodesSchema = z.object({
3031
jsonrpc: z.string(),
@@ -75,41 +76,60 @@ const fetchObjects = async (objects: ObjectMapping[]) => {
7576
id: requestId,
7677
}
7778

78-
return concurrencyController(async () => {
79-
logger.debug(
80-
`Fetching nodes (requestId=${requestId}): ${objects.map((e) => e[0]).join(', ')}`,
81-
)
82-
const fetchStart = performance.now()
83-
const response = await axios.post(gatewayUrl, body, {
84-
timeout: 3600_000,
85-
responseType: 'json',
86-
})
87-
if (response.status !== 200) {
88-
console.error('Failed to fetch nodes', response.status, response.data)
89-
throw new HttpError(500, 'Internal server error: Failed to fetch nodes')
90-
}
79+
return concurrencyController(
80+
async () =>
81+
withRetries(
82+
async () => {
83+
logger.debug(
84+
`Fetching nodes (requestId=${requestId}): ${objects.map((e) => e[0]).join(', ')}`,
85+
)
86+
const fetchStart = performance.now()
87+
const response = await axios.post(gatewayUrl, body, {
88+
timeout: 3600_000,
89+
responseType: 'json',
90+
})
91+
if (response.status !== 200) {
92+
console.error(
93+
'Failed to fetch nodes',
94+
response.status,
95+
response.data,
96+
)
97+
throw new HttpError(
98+
500,
99+
'Internal server error: Failed to fetch nodes',
100+
)
101+
}
91102

92-
const validatedResponseData = fetchNodesSchema.safeParse(response.data)
93-
if (!validatedResponseData.success) {
94-
console.error(
95-
'Failed to parse fetch nodes response',
96-
validatedResponseData.error,
97-
)
98-
throw new HttpError(
99-
500,
100-
'Internal server error: Failed to parse fetch nodes response',
101-
)
102-
}
103+
const validatedResponseData = fetchNodesSchema.safeParse(
104+
response.data,
105+
)
106+
if (!validatedResponseData.success) {
107+
console.error(
108+
'Failed to parse fetch nodes response',
109+
validatedResponseData.error,
110+
)
111+
throw new HttpError(
112+
500,
113+
'Internal server error: Failed to parse fetch nodes response',
114+
)
115+
}
103116

104-
const end = performance.now()
105-
logger.debug(
106-
`Fetched ${objects.length} nodes in total=${end - now}ms fetch=${end - fetchStart}ms (requestId=${requestId})`,
107-
)
117+
const end = performance.now()
118+
logger.debug(
119+
`Fetched ${objects.length} nodes in total=${end - now}ms fetch=${end - fetchStart}ms (requestId=${requestId})`,
120+
)
108121

109-
return validatedResponseData.data.result.map((hex) =>
110-
decodeNode(Buffer.from(hex, 'hex')),
111-
)
112-
}, objects.length)
122+
return validatedResponseData.data.result.map((hex) =>
123+
decodeNode(Buffer.from(hex, 'hex')),
124+
)
125+
},
126+
{
127+
maxRetries: 3,
128+
delay: 500,
129+
},
130+
),
131+
objects.length,
132+
)
113133
}
114134

115135
/**
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
type RetryOptions = {
2+
maxRetries?: number
3+
delay?: number
4+
}
5+
6+
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
7+
8+
export const withRetries = async <T>(
9+
fn: () => Promise<T>,
10+
{ maxRetries = 5, delay = 1000 }: RetryOptions = {},
11+
): Promise<T> => {
12+
let retries = 0
13+
let lastError: Error
14+
15+
while (retries < maxRetries) {
16+
try {
17+
return await fn()
18+
} catch (error) {
19+
lastError = error as Error
20+
retries++
21+
await sleep(delay)
22+
}
23+
}
24+
25+
throw lastError!
26+
}

0 commit comments

Comments
 (0)