Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions lib/web/fetch/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ function isomorphicEncode (input) {
return input
}

const sleep = (ms) => {
return new Promise((resolve) => setTimeout(() => resolve({ value: false }), ms))
}

/**
* @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes
* @see https://streams.spec.whatwg.org/#read-loop
Expand All @@ -1107,11 +1111,64 @@ function isomorphicEncode (input) {
async function readAllBytes (reader) {
const bytes = []
let byteLength = 0

const logger = global.__asyncLogger
const tempIndexIdx = reader.__tempIndexIdx // this is injected from the download index
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
let times = 0
const timesInfo = []
while (true) {
const { done, value: chunk } = await reader.read()

times++
logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
// Significantly reduce timeout time to ensure it triggers before reader.read()
const { done, value: chunk } = await Promise.race([reader.read(), sleep(100)])
timesInfo.push({
time: new Date().toISOString(),
times,
length: byteLength
})
logger.info(`fetch enter readAllBytes with idx chunk: ${chunk === false}`)
if (chunk === false) {
try {
logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, { reader })
const infoOfReader = {}
const symbolKey = Object.getOwnPropertySymbols(reader)
const kStateSymbol = symbolKey.find(sym => sym.toString().includes('kType'))
const kStreamSymbol = symbolKey.find(sym => sym.toString().includes('kState'))
const _innerState = reader[kStateSymbol]
const _innerReader = reader[kStreamSymbol]
infoOfReader.state = _innerState
infoOfReader.state = _innerState
const waitingLength = _innerReader.readRequests?.length || -1
const waitingReq = _innerReader.readRequests[0]
const waitingReqPromimse = waitingReq?.promise
let waitingReqStatus = 'none'
if (waitingReqPromimse) {
// Increase chance to detect pending status by increasing timeout to 500ms
waitingReqStatus = await Promise.race([
waitingReqPromimse.then(() => 'fulfilled').catch(() => 'rejected'),
new Promise(resolve => setTimeout(() => resolve('pending'), 500))
])
}
infoOfReader.waitingLength = waitingLength
const _stream = _innerReader.stream
const streamKeys = Object.getOwnPropertySymbols(_stream)
const kStreamStatus = streamKeys.find(sym => sym.toString().includes('kState'))
const stateOfStream = _stream[kStreamStatus]
infoOfReader.stateOfStream = stateOfStream
const controller = stateOfStream.controller
const stateOfController = controller[kStreamStatus]
infoOfReader.controllerState = stateOfController
infoOfReader.waitingReqStatus = waitingReqStatus
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
} catch (e) {
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
}
// when hangs happened, it will await here, so I can catch the hang.
await sleep(600000)
}
if (done) {
logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
// 1. Call successSteps with bytes.
return Buffer.concat(bytes, byteLength)
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"c8": "^10.0.0",
"cross-env": "^7.0.3",
"dns-packet": "^5.4.0",
"express": "^5.1.0",
"fast-check": "^3.17.1",
"form-data": "^4.0.0",
"formdata-node": "^6.0.3",
Expand Down
125 changes: 125 additions & 0 deletions reproduce-script/asyncLogger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
const fs = require('fs')
const path = require('path')
const util = require('util')

// Create log directory
const LOG_DIR = path.join(__dirname, 'logs')
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR)
}

// Log file path
const LOG_FILE = path.join(LOG_DIR, `test-${new Date().toISOString().replace(/:/g, '-')}.log`)

// Create write stream
const logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' })

// Async logger
class AsyncLogger {
constructor () {
this.queue = []
this.isProcessing = false

// Write logs from queue to file every second
setInterval(() => this.processQueue(), 1000)

// Ensure all logs are written when process exits
process.on('exit', () => {
this.processQueueSync()
})
}

// Format log message
formatMessage (level, message, ...args) {
const timestamp = new Date().toISOString()
const formattedArgs = args.map(arg => {
if (typeof arg === 'object') {
return util.inspect(arg, { depth: null })
}
return arg
}).join(' ')

return `[${timestamp}] [${level}] ${message} ${formattedArgs}`.trim()
}

// Add log to queue
log (level, message, ...args) {
const formattedMessage = this.formatMessage(level, message, ...args)

// Output to console
if (level === 'ERROR') {
console.error(formattedMessage)
} else if (level === 'WARN') {
console.warn(formattedMessage)
} else {
console.log(formattedMessage)
}

// Add to queue
this.queue.push(formattedMessage)

// Start processing if queue is not already being processed
if (!this.isProcessing) {
this.processQueue()
}
}

// Process queue asynchronously
async processQueue () {
if (this.isProcessing || this.queue.length === 0) return

this.isProcessing = true

try {
const messagesToProcess = [...this.queue]
this.queue = []

// Write to file
for (const message of messagesToProcess) {
logStream.write(message + '\n')
}
} finally {
this.isProcessing = false

// Continue processing if queue still has messages
if (this.queue.length > 0) {
setImmediate(() => this.processQueue())
}
}
}

// Process queue synchronously (used when process exits)
processQueueSync () {
if (this.queue.length === 0) return

const messagesToProcess = [...this.queue]
this.queue = []

for (const message of messagesToProcess) {
fs.appendFileSync(LOG_FILE, message + '\n')
}
}

// Public methods
info (message, ...args) {
this.log('INFO', message, ...args)
}

warn (message, ...args) {
this.log('WARN', message, ...args)
}

error (message, ...args) {
this.log('ERROR', message, ...args)
}

debug (message, ...args) {
this.log('DEBUG', message, ...args)
}
}

// Export instance and set as global variable
const logger = new AsyncLogger()
global.__asyncLogger = logger

module.exports = logger
147 changes: 147 additions & 0 deletions reproduce-script/server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
const express = require('express')
const crypto = require('crypto')

// Function to create server instance
function createServer (port) {
const app = express()

// Track request counts to selectively delay certain requests
let requestCount = 0

// Global control variable to simulate server hanging
let shouldStall = false

// Toggle the stall status every 30 seconds
setInterval(() => {
shouldStall = !shouldStall
console.log(`Server hang status changed to: ${shouldStall ? 'hanging' : 'normal'}`)
}, 30000)

app.get('/files/:size', (req, res) => {
const sizeInKB = parseInt(req.params.size, 10)
const sizeInBytes = sizeInKB * 1024

requestCount++

// Set response headers
res.setHeader('Content-Type', 'application/octet-stream')
res.setHeader('Content-Length', sizeInBytes)

// Set response timeout for each request to prevent complete connection interruption
req.socket.setTimeout(300000) // 5 minute timeout

// Prevent errors caused by client-side connection interruption
req.on('close', () => {
console.log(`Request #${requestCount}: Connection closed by client`)
})

// Generate random data
const buffer = crypto.randomBytes(sizeInBytes)

// Apply slow transfer strategy for all large file requests
if (sizeInBytes > 100 * 1024) { // Files larger than 100KB
console.log(`Request #${requestCount}: Applying slow transfer (${sizeInKB}KB)`)

// Use smaller chunks and more precise control
const CHUNK_SIZE = 16 * 1024 // 16KB
let offset = 0
let chunkCount = 0

// Progress counter
const totalChunks = Math.ceil(sizeInBytes / CHUNK_SIZE)

function sendNextChunk () {
// Check if need to enter hanging state
if (shouldStall && chunkCount > 1) {
console.log(`Request #${requestCount}: Global hang signal detected, pausing data transfer [${chunkCount}/${totalChunks}]`)
// Pause for a very long time, but don't end the response, keeping the client hanging
setTimeout(sendNextChunk, 60000) // Check again after 1 minute
return
}

if (offset >= sizeInBytes) {
console.log(`Request #${requestCount}: Transfer completed`)
res.end()
return
}

try {
// Determine current chunk size
const end = Math.min(offset + CHUNK_SIZE, sizeInBytes)
const chunk = buffer.slice(offset, end)

// Use callback to ensure data is written
res.write(chunk, (err) => {
if (err) {
console.error(`Request #${requestCount}: Write error`, err)
try { res.end() } catch (e) {}
return
}

offset += chunk.length
chunkCount++

// Determine delay based on chunk number
let delay

// First two chunks sent quickly, subsequent chunks intentionally slowed
if (chunkCount <= 2) {
delay = 50 // Send first two chunks quickly
console.log(`Request #${requestCount}: Sending fast chunk ${chunkCount}/${totalChunks}, size: ${chunk.length / 1024}KB`)
} else if (chunkCount === 3) {
// After the third chunk, enter extra long delay to ensure client reader.read() will hang
delay = 120000 // 2 minute delay
console.log(`Request #${requestCount}: Entering extra long delay ${delay}ms`)
} else {
delay = 10000 // Subsequent chunks also slow
console.log(`Request #${requestCount}: Sending slow chunk ${chunkCount}/${totalChunks}, delay: ${delay}ms`)
}

// Set time to send the next chunk
setTimeout(sendNextChunk, delay)
})
} catch (err) {
console.error(`Request #${requestCount}: Send error`, err)
try { res.end() } catch (e) {}
}
}

// Start sending
sendNextChunk()
} else {
// Small files still have some delay to prevent all responses from completing too quickly
setTimeout(() => {
res.end(buffer)
}, Math.random() * 2000) // 0-2000ms random delay
}
})

return app.listen(port, () => {
console.log(`Server running on port ${port} - http://localhost:${port}/files/:size`)
})
}

// Start 4 server instances on different ports
const ports = [3000, 3001, 3002, 3003]
const servers = ports.map(port => createServer(port))

// Add overall startup success log
console.log('=================================================')
console.log('✅ All servers started successfully!')
console.log('=================================================')
console.log(`🚀 Started ${ports.length} test server instances:`)
ports.forEach(port => {
console.log(` - http://localhost:${port}`)
})
console.log('\n📄 Available endpoints:')
console.log(' - GET /files/:size - Returns random data of specified size (KB)')
console.log(' Example: http://localhost:3000/files/1024 will return 1MB data')
console.log('\n⚙️ Press Ctrl+C to stop all servers')
console.log('=================================================')

// Add graceful shutdown handling
process.on('SIGINT', () => {
console.log('Shutting down servers...')
servers.forEach(server => server.close())
process.exit()
})
Loading