Skip to content

Commit 7c996f0

Browse files
Merge pull request #136 from autonomys/byte-range-downloads
feat: implement byte range downloads
2 parents ef9ba13 + ce479dd commit 7c996f0

File tree

9 files changed

+333
-62
lines changed

9 files changed

+333
-62
lines changed

services/file-retriever/package.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
"type": "module",
55
"dependencies": {
66
"@auto-files/rpc-apis": "workspace:*",
7-
"@autonomys/asynchronous": "^1.4.36",
8-
"@autonomys/auto-dag-data": "^1.4.31",
9-
"@autonomys/auto-utils": "^1.4.31",
10-
"@autonomys/file-caching": "^1.4.36",
11-
"@autonomys/rpc": "^1.4.31",
7+
"@autonomys/asynchronous": "^1.5.12",
8+
"@autonomys/auto-dag-data": "^1.5.12",
9+
"@autonomys/auto-utils": "^1.5.12",
10+
"@autonomys/file-caching": "^1.5.12",
11+
"@autonomys/rpc": "^1.5.12",
1212
"@ipld/dag-pb": "^4.1.3",
1313
"@keyvhq/core": "^2.1.1",
1414
"@keyvhq/sqlite": "^2.1.6",

services/file-retriever/src/http/controllers/file.ts

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { fileComposer } from '../../services/fileComposer.js'
44
import { pipeline } from 'stream'
55
import { logger } from '../../drivers/logger.js'
66
import { asyncSafeHandler } from '../../utils/express.js'
7-
import { uniqueHeaderValue } from '../../utils/http.js'
7+
import { getByteRange, uniqueHeaderValue } from '../../utils/http.js'
88
import { HttpError } from '../middlewares/error.js'
99
import { dsnFetcher } from '../../services/dsnFetcher.js'
1010
import { isValidCID } from '../../utils/dagData.js'
@@ -56,12 +56,25 @@ fileRouter.get(
5656
}
5757

5858
const rawMode = req.query.raw === 'true'
59+
const byteRange = getByteRange(req)
5960
const ignoreCache =
6061
req.query.originControl === 'no-cache' ||
6162
uniqueHeaderValue(req.headers['x-origin-control'])?.toLowerCase() ===
6263
'no-cache'
6364

64-
const [fromCache, file] = await fileComposer.get(cid, ignoreCache)
65+
const metadata = await dsnFetcher.fetchNodeMetadata(cid)
66+
if (byteRange) {
67+
if (byteRange[0] > Number(metadata.size)) {
68+
res.set('Content-Range', `bytes */${metadata.size}`)
69+
res.sendStatus(416)
70+
return
71+
}
72+
}
73+
74+
const [fromCache, file] = await fileComposer.get(cid, {
75+
ignoreCache,
76+
byteRange,
77+
})
6578
if (fromCache) {
6679
res.setHeader('x-file-origin', 'cache')
6780
} else {
@@ -77,10 +90,30 @@ fileRouter.get(
7790
`filename="${encodeURIComponent(file.filename)}"`,
7891
)
7992
}
80-
if (file.size) {
93+
94+
// Advertise range support
95+
res.set('Accept-Ranges', 'bytes')
96+
97+
if (byteRange && file.size != null) {
98+
const fileSizeNumber = Number(file.size)
99+
const startIndex = byteRange[0]
100+
101+
const effectiveEnd = Math.min(
102+
byteRange[1] != null ? byteRange[1] : fileSizeNumber - 1,
103+
fileSizeNumber - 1,
104+
)
105+
const contentLength = effectiveEnd - startIndex + 1
106+
107+
res.status(206)
108+
res.set(
109+
'Content-Range',
110+
`bytes ${startIndex}-${effectiveEnd}/${fileSizeNumber}`,
111+
)
112+
res.set('Content-Length', contentLength.toString())
113+
} else if (file.size != null) {
81114
res.set('Content-Length', file.size.toString())
82115
}
83-
if (file.encoding && !rawMode) {
116+
if (file.encoding && !rawMode && !byteRange) {
84117
res.set('Content-Encoding', file.encoding)
85118
}
86119

services/file-retriever/src/repositories/dag-indexer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const mapToDomain = (db: ExtendedIPLDMetadataDB): ExtendedIPLDMetadata => {
2727
type: db.type as MetadataType,
2828
linkDepth: db.link_depth,
2929
name: db.name,
30+
size: BigInt(db.size),
3031
blockHeight: db.block_height,
3132
blockHash: db.block_hash,
3233
extrinsicId: db.extrinsic_id,

services/file-retriever/src/services/cache.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
createFileCache,
33
defaultMemoryAndSqliteConfig,
44
ensureDirectoryExists,
5+
FileCacheOptions,
56
FileResponse,
67
} from '@autonomys/file-caching'
78
import path from 'path'
@@ -17,7 +18,8 @@ export const cache = createFileCache(
1718

1819
const cacheWithNamespace = (namespace: string) => {
1920
return {
20-
get: (cid: string) => cache.get(`${namespace}:${cid}`),
21+
get: (cid: string, options?: FileCacheOptions) =>
22+
cache.get(`${namespace}:${cid}`, options),
2123
set: (cid: string, fileResponse: FileResponse) =>
2224
cache.set(`${namespace}:${cid}`, fileResponse),
2325
has: (cid: string) => cache.has(`${namespace}:${cid}`),

services/file-retriever/src/services/dsnFetcher.ts

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import {
99
IPLDNodeData,
1010
MetadataType,
1111
} from '@autonomys/auto-dag-data'
12-
import { FileResponse } from '@autonomys/file-caching'
12+
import {
13+
ByteRange,
14+
FileCacheOptions,
15+
FileResponse,
16+
} from '@autonomys/file-caching'
1317
import { z } from 'zod'
1418
import { PBNode } from '@ipld/dag-pb'
1519
import { HttpError } from '../http/middlewares/error.js'
@@ -29,6 +33,7 @@ import { Readable } from 'stream'
2933
import { ReadableStream } from 'stream/web'
3034
import { fileCache, nodeCache } from './cache.js'
3135
import { dagIndexerRepository } from '../repositories/dag-indexer.js'
36+
import { sliceReadable } from '../utils/readable.js'
3237

3338
const fetchNodesSchema = z.object({
3439
jsonrpc: z.string(),
@@ -141,6 +146,123 @@ const fetchObjects = async (objects: ObjectMapping[]) => {
141146
)
142147
}
143148

149+
const getNodesForPartialRetrieval = async (
150+
chunks: ExtendedIPLDMetadata[],
151+
byteRange: ByteRange,
152+
): Promise<{
153+
nodes: string[]
154+
firstNodeFileOffset: number
155+
}> => {
156+
let accumulatedLength = 0
157+
const nodeRange: [number | null, number | null] = [null, null]
158+
let firstNodeFileOffset: number | undefined
159+
let i = 0
160+
161+
logger.debug(
162+
`getNodesForPartialRetrieval called (byteRange=[${byteRange[0]}, ${byteRange[1] ?? 'EOF'}])`,
163+
)
164+
165+
// Searches for the first node that contains the byte range
166+
while (nodeRange[0] === null && i < chunks.length) {
167+
const chunk = chunks[i]
168+
const chunkSize = Number((chunk.size ?? 0).valueOf())
169+
// [accumulatedLength, accumulatedLength + chunkSize) // is the range of the chunk
170+
if (
171+
byteRange[0] >= accumulatedLength &&
172+
byteRange[0] < accumulatedLength + chunkSize
173+
) {
174+
nodeRange[0] = i
175+
firstNodeFileOffset = accumulatedLength
176+
} else {
177+
accumulatedLength += chunkSize
178+
i++
179+
}
180+
}
181+
182+
// Searchs for the last node that contains the byte range
183+
// unless the byte range is the last byte of the file
184+
if (byteRange[1]) {
185+
while (nodeRange[1] === null && i < chunks.length) {
186+
const chunk = chunks[i]
187+
const chunkSize = Number((chunk.size ?? 0).valueOf())
188+
if (
189+
byteRange[1] >= accumulatedLength &&
190+
byteRange[1] < accumulatedLength + chunkSize
191+
) {
192+
nodeRange[1] = i
193+
}
194+
accumulatedLength += chunkSize
195+
i++
196+
}
197+
}
198+
199+
if (nodeRange[0] == null) {
200+
throw new Error('Byte range not found')
201+
}
202+
203+
const nodes = chunks
204+
.slice(nodeRange[0], nodeRange[1] === null ? undefined : nodeRange[1] + 1)
205+
.map((e) => e.cid)
206+
207+
return {
208+
nodes,
209+
firstNodeFileOffset: firstNodeFileOffset ?? 0,
210+
}
211+
}
212+
213+
const fetchFileAsStreamWithByteRange = async (
214+
cid: string,
215+
byteRange: ByteRange,
216+
): Promise<Readable> => {
217+
const chunks = await dsnFetcher.getFileChunks(cid)
218+
const { nodes, firstNodeFileOffset } = await getNodesForPartialRetrieval(
219+
chunks,
220+
byteRange,
221+
)
222+
223+
logger.debug(
224+
`getNodesForPartialRetrieval called (byteRange=[${byteRange[0]}, ${byteRange[1] ?? 'EOF'}]) nodes=${JSON.stringify(nodes)} firstNodeFileOffset=${firstNodeFileOffset}`,
225+
)
226+
227+
// We pass all the chunks to the fetchNode function
228+
// So that we can fetch all the nodes within the same piece
229+
// in one go
230+
const siblings = chunks.map((e) => e.cid)
231+
const stream = new ReadableStream({
232+
start: async (controller) => {
233+
try {
234+
for (const chunk of nodes) {
235+
const node = await dsnFetcher.fetchNode(chunk, siblings)
236+
const data = safeIPLDDecode(node)
237+
if (!data) {
238+
throw new HttpError(
239+
400,
240+
'Bad request: Not a valid auto-dag-data IPLD node',
241+
)
242+
}
243+
244+
controller.enqueue(Buffer.from(data.data ?? []))
245+
}
246+
247+
controller.close()
248+
} catch (error) {
249+
controller.error(error)
250+
}
251+
},
252+
})
253+
254+
const metadata = await dsnFetcher.fetchNodeMetadata(cid)
255+
const fileSize = Number(metadata.size)
256+
const endIndex = byteRange[1] ?? fileSize - 1
257+
const length = endIndex - byteRange[0] + 1
258+
259+
return sliceReadable(
260+
Readable.fromWeb(stream),
261+
byteRange[0] - firstNodeFileOffset,
262+
length,
263+
)
264+
}
265+
144266
/**
145267
* Fetches a file as a stream
146268
*
@@ -150,13 +272,13 @@ const fetchObjects = async (objects: ObjectMapping[]) => {
150272
* @param node - The root node of the file
151273
* @returns A readable stream of the file
152274
*/
153-
const fetchFileAsStream = async (cid: string): Promise<ReadableStream> => {
275+
const fetchFileAsStream = async (cid: string): Promise<Readable> => {
154276
const chunks = await dsnFetcher.getFileChunks(cid)
155277

156278
// if a file is a multi-node file, we need to fetch the nodes in the correct order
157279
// bearing in mind there might be multiple levels of links, we need to fetch
158280
// all the links from the root node first and then continue with the next level
159-
return new ReadableStream({
281+
const stream = new ReadableStream({
160282
start: async (controller) => {
161283
try {
162284
for (const chunk of chunks) {
@@ -181,6 +303,8 @@ const fetchFileAsStream = async (cid: string): Promise<ReadableStream> => {
181303
}
182304
},
183305
})
306+
307+
return Readable.fromWeb(stream)
184308
}
185309

186310
const getFileMetadata = (
@@ -202,7 +326,10 @@ const getFileMetadata = (
202326
}
203327
}
204328

205-
const fetchFile = async (cid: string): Promise<FileResponse> => {
329+
const fetchFile = async (
330+
cid: string,
331+
options?: FileCacheOptions,
332+
): Promise<FileResponse> => {
206333
try {
207334
const metadata = await dsnFetcher.fetchNodeMetadata(cid)
208335
if (metadata.type !== MetadataType.File) {
@@ -215,8 +342,12 @@ const fetchFile = async (cid: string): Promise<FileResponse> => {
215342
`Fetching file (cid=${cid}, size=${traits.size}, mimeType=${traits.mimeType}, filename=${traits.filename}, encoding=${traits.encoding})`,
216343
)
217344

345+
const readable = options?.byteRange
346+
? await fetchFileAsStreamWithByteRange(cid, options.byteRange)
347+
: await fetchFileAsStream(cid)
348+
218349
return {
219-
data: Readable.fromWeb(await fetchFileAsStream(cid)),
350+
data: readable,
220351
...traits,
221352
}
222353
} catch (error) {

services/file-retriever/src/services/fileComposer.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
11
import { dsnFetcher } from './dsnFetcher.js'
22
import { forkStream } from '@autonomys/asynchronous'
33
import { logger } from '../drivers/logger.js'
4-
import { FileResponse } from '@autonomys/file-caching'
4+
import { FileCacheOptions, FileResponse } from '@autonomys/file-caching'
55
import { fileCache } from './cache.js'
66
import { moderationService } from './moderation.js'
77
import { HttpError } from '../http/middlewares/error.js'
88

9+
interface DownloadOptions extends FileCacheOptions {
10+
ignoreCache?: boolean
11+
}
12+
913
const get = async (
1014
cid: string,
11-
ignoreCache = false,
15+
options: DownloadOptions = {},
1216
): Promise<[fromCache: boolean, FileResponse]> => {
17+
const isPartialRequest = options.byteRange != null
1318
const isBanned = await moderationService.isFileBanned(cid)
1419
if (isBanned) {
1520
logger.warn(`File download blocked: ${cid}`)
1621
throw new HttpError(451, 'Unavailable for legal reasons')
1722
}
1823

19-
if (!ignoreCache) {
20-
const cachedFile = await fileCache.get(cid)
24+
if (!options.ignoreCache) {
25+
const cachedFile = await fileCache.get(cid, options)
2126
if (cachedFile) {
2227
logger.debug(`Cache hit for file ${cid}`)
2328
return [true, cachedFile]
@@ -26,10 +31,15 @@ const get = async (
2631

2732
let start = performance.now()
2833
logger.debug(`Fetching file from DSN ${cid}`)
29-
const file = await dsnFetcher.fetchFile(cid)
34+
const file = await dsnFetcher.fetchFile(cid, options)
3035
let end = performance.now()
3136
logger.debug(`Fetching file from DSN ${cid} took ${end - start}ms`)
3237

38+
// Do not cache partial responses
39+
if (isPartialRequest) {
40+
return [false, file]
41+
}
42+
3343
start = performance.now()
3444
const [data, cachingStream] = await forkStream(file.data)
3545
end = performance.now()
@@ -44,13 +54,7 @@ const get = async (
4454
console.error('Error caching file', e)
4555
})
4656

47-
return [
48-
false,
49-
{
50-
...file,
51-
data,
52-
},
53-
]
57+
return [false, { ...file, data }]
5458
}
5559

5660
export const fileComposer = {

0 commit comments

Comments
 (0)