Skip to content

Commit 6817190

Browse files
committed
refactor: update fetchFileAsStream to use ReadableStream for improved streaming support
1 parent b09e1ec commit 6817190

File tree

2 files changed

+748
-816
lines changed

2 files changed

+748
-816
lines changed

services/file-retriever/src/services/dsnFetcher.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { FileResponse } from '@autonomys/file-caching'
21
import {
32
blake3HashFromCid,
43
stringToCid,
@@ -8,6 +7,7 @@ import {
87
CompressionAlgorithm,
98
cidOfNode,
109
} from '@autonomys/auto-dag-data'
10+
import { FileResponse } from '@autonomys/file-caching'
1111
import { z } from 'zod'
1212
import { PBNode } from '@ipld/dag-pb'
1313
import { HttpError } from '../http/middlewares/error.js'
@@ -23,6 +23,7 @@ import { optimizeBatchFetch } from './batchOptimizer.js'
2323
import { ObjectMapping } from '@auto-files/models'
2424
import { withRetries } from '../utils/retries.js'
2525
import { Readable } from 'stream'
26+
import { ReadableStream } from 'stream/web'
2627

2728
const fetchNodesSchema = z.object({
2829
jsonrpc: z.string(),
@@ -142,18 +143,18 @@ const fetchObjects = async (objects: ObjectMapping[]) => {
142143
* @param node - The root node of the file
143144
* @returns A readable stream of the file
144145
*/
145-
const fetchFileAsStream = (node: PBNode): Readable => {
146+
const fetchFileAsStream = (node: PBNode): ReadableStream => {
146147
const metadata = safeIPLDDecode(node)
147148

148149
// if a file is a single node (< 64KB) no additional fetching is needed
149150
if (node.Links.length === 0) {
150151
logger.debug(
151152
`File resolved to single node file: (cid=${cidToString(cidOfNode(node))}, size=${metadata?.size})`,
152153
)
153-
return new Readable({
154-
read: async function () {
155-
this.push(Buffer.from(metadata?.data ?? []))
156-
this.push(null)
154+
return new ReadableStream({
155+
start: async (controller) => {
156+
controller.enqueue(Buffer.from(metadata?.data ?? []))
157+
controller.close()
157158
},
158159
})
159160
}
@@ -164,8 +165,8 @@ const fetchFileAsStream = (node: PBNode): Readable => {
164165
// if a file is a multi-node file, we need to fetch the nodes in the correct order
165166
// bearing in mind there might be multiple levels of links, we need to fetch
166167
// all the links from the root node first and then continue with the next level
167-
return new Readable({
168-
read: async function () {
168+
return new ReadableStream({
169+
start: async (controller) => {
169170
try {
170171
logger.debug('Starting to fetch file')
171172
// for the first iteration, we need to fetch all the links from the root node
@@ -202,7 +203,7 @@ const fetchFileAsStream = (node: PBNode): Readable => {
202203
const ipldMetadata = safeIPLDDecode(node)
203204
// if the node has no links or has data (is the same thing), we write into the stream
204205
if (ipldMetadata?.data) {
205-
this.push(ipldMetadata.data)
206+
controller.enqueue(ipldMetadata.data)
206207
} else {
207208
// if the node has links, we need to fetch them in the next iteration
208209
newLinks = newLinks.concat(
@@ -216,12 +217,12 @@ const fetchFileAsStream = (node: PBNode): Readable => {
216217
// we update the list of pending requests with the new links
217218
requestsPending = newLinks
218219
}
219-
this.push(null)
220+
controller.close()
220221
} catch (error) {
221222
logger.error(
222223
`Failed to fetch file as stream (cid=${cidToString(cidOfNode(node))}); error=${error}`,
223224
)
224-
this.emit('error', error)
225+
controller.error(error)
225226
}
226227
},
227228
})
@@ -253,7 +254,7 @@ const fetchFile = async (cid: string): Promise<FileResponse> => {
253254
nodeMetadata.uploadOptions?.compression?.algorithm ===
254255
CompressionAlgorithm.ZLIB
255256

256-
const data = fetchFileAsStream(head)
257+
const data = Readable.fromWeb(fetchFileAsStream(head))
257258

258259
return {
259260
data,

0 commit comments

Comments
 (0)