Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix connection and timeout handling issues in high concurrency scenarios for undici #4094

Open
wants to merge 1 commit into
base: release/v6.21.2
Choose a base branch
from

Conversation

fwx5618177
Copy link

This relates to...

#4089

Rationale

Current issues with undici in high concurrency scenarios:

  1. Request states not properly reset, leading to stuck requests
  2. Queue processing interruption without cleanup, causing resource leaks
  3. Reader promises remaining in pending state indefinitely

Expected Behavior

After fixes, the behavior should be:

  • Request states properly reset and cleaned up
  • Queued requests properly processed or cleaned up
  • Timeout mechanisms working correctly to prevent stuck requests

Changes

  1. Added state reset logic in _resume() method
  2. Enhanced queue cleanup in connect() timeout handler
  3. Improved error handling in onError() for connection failures
  4. Added timeout protection in connect() for stuck requests
  5. Fixed resource cleanup in kDestroy symbol method

Features

  1. State Management Optimization
function _resume (client, sync) {
  while (true) {
    if (client[kConnecting] && !client[kHTTPContext]) {
      client[kConnecting] = false 
      client[kResuming] = 0
    }
    // ...existing code...
  }
}
  1. Request Queue Management
async function connect (client) {
  const timeout = setTimeout(() => {
    if (client[kResuming] === 2) {
      // Timeout handling and queue cleanup
      const err = new Error('Request timeout')
      const requests = client[kQueue].splice(client[kPendingIdx])
      for (const req of requests) {
        util.errorRequest(client, req, err)
      }
    }
  }, 30000)
  // ...existing code...
}

Bug Fixes

  1. State Reset Issues
  • Added connection state checks
  • Proper timeout handling
  • Legacy state cleanup
  1. Queue Processing Issues
  • Handle interrupted request queues
  • Cleanup timed-out requests
  • Proper resource release
  1. Promise Deadlock Issues
  • Added timeout protection
  • Handle abnormal states
  • Improved error handling

Breaking Changes and Deprecations

None.

Status

Copy link
Member

@metcoder95 metcoder95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is the way to go, the changes essentially changes the way the queue handles itself and its different states; the error from the issue seems to root more into a different path (Fetch ready - Stream reader).

@fwx5618177
Copy link
Author

I don't believe this is the way to go, the changes essentially changes the way the queue handles itself and its different states; the error from the issue seems to root more into a different path (Fetch ready - Stream reader).

Got it. I would simulate and reproduce the specific issue, and then follow this reasoning.

@fwx5618177
Copy link
Author

fwx5618177 commented Mar 24, 2025

@metcoder95

I reproduce this issue by using express.

Here are code:

server.js

const express = require('express')
const crypto = require('crypto')

// Function to create server instance
function createServer(port) {
  const app = express()
  
  // Track request counts to selectively delay certain requests
  let requestCount = 0;
  
  // Global control variable to simulate server hanging
  let shouldStall = false;
  
  // Toggle the stall status every 30 seconds
  setInterval(() => {
    shouldStall = !shouldStall;
    console.log(`Server hang status changed to: ${shouldStall ? 'hanging' : 'normal'}`);
  }, 30000);

  app.get('/files/:size', (req, res) => {
    const sizeInKB = parseInt(req.params.size, 10)
    const sizeInBytes = sizeInKB * 1024
    
    requestCount++;
    
    // Set response headers
    res.setHeader('Content-Type', 'application/octet-stream')
    res.setHeader('Content-Length', sizeInBytes)
    
    // Set response timeout for each request to prevent complete connection interruption
    req.socket.setTimeout(300000); // 5 minute timeout
    
    // Prevent errors caused by client-side connection interruption
    req.on('close', () => {
      console.log(`Request #${requestCount}: Connection closed by client`);
    });

    // Generate random data
    const buffer = crypto.randomBytes(sizeInBytes)
    
    // Apply slow transfer strategy for all large file requests
    if (sizeInBytes > 100 * 1024) {  // Files larger than 100KB
      console.log(`Request #${requestCount}: Applying slow transfer (${sizeInKB}KB)`);
      
      // Use smaller chunks and more precise control
      const CHUNK_SIZE = 16 * 1024; // 16KB
      let offset = 0;
      let chunkCount = 0;
      
      // Progress counter
      const totalChunks = Math.ceil(sizeInBytes / CHUNK_SIZE);
      
      function sendNextChunk() {
        // Check if need to enter hanging state
        if (shouldStall && chunkCount > 1) {
          console.log(`Request #${requestCount}: Global hang signal detected, pausing data transfer [${chunkCount}/${totalChunks}]`);
          // Pause for a very long time, but don't end the response, keeping the client hanging
          setTimeout(sendNextChunk, 60000);  // Check again after 1 minute
          return;
        }
        
        if (offset >= sizeInBytes) {
          console.log(`Request #${requestCount}: Transfer completed`);
          res.end();
          return;
        }
        
        try {
          // Determine current chunk size
          const end = Math.min(offset + CHUNK_SIZE, sizeInBytes);
          const chunk = buffer.slice(offset, end);
          
          // Use callback to ensure data is written
          res.write(chunk, (err) => {
            if (err) {
              console.error(`Request #${requestCount}: Write error`, err);
              try { res.end(); } catch (e) {}
              return;
            }
            
            offset += chunk.length;
            chunkCount++;
            
            // Determine delay based on chunk number
            let delay;
            
            // First two chunks sent quickly, subsequent chunks intentionally slowed
            if (chunkCount <= 2) {
              delay = 50; // Send first two chunks quickly
              console.log(`Request #${requestCount}: Sending fast chunk ${chunkCount}/${totalChunks}, size: ${chunk.length/1024}KB`);
            } else if (chunkCount === 3) {
              // After the third chunk, enter extra long delay to ensure client reader.read() will hang
              delay = 120000; // 2 minute delay
              console.log(`Request #${requestCount}: Entering extra long delay ${delay}ms`);
            } else {
              delay = 10000; // Subsequent chunks also slow
              console.log(`Request #${requestCount}: Sending slow chunk ${chunkCount}/${totalChunks}, delay: ${delay}ms`);
            }
            
            // Set time to send the next chunk
            setTimeout(sendNextChunk, delay);
          });
        } catch (err) {
          console.error(`Request #${requestCount}: Send error`, err);
          try { res.end(); } catch (e) {}
        }
      }
      
      // Start sending
      sendNextChunk();
    } else {
      // Small files still have some delay to prevent all responses from completing too quickly
      setTimeout(() => {
        res.end(buffer);
      }, Math.random() * 2000); // 0-2000ms random delay
    }
  })

  return app.listen(port, () => {
    console.log(`Server running on port ${port} - http://localhost:${port}/files/:size`)
  })
}

// Start 4 server instances on different ports
const ports = [3000, 3001, 3002, 3003]
const servers = ports.map(port => createServer(port))

// Add overall startup success log
console.log('=================================================')
console.log('✅ All servers started successfully!')
console.log('=================================================')
console.log(`🚀 Started ${ports.length} test server instances:`)
ports.forEach(port => {
  console.log(`   - http://localhost:${port}`)
})
console.log('\n📄 Available endpoints:')
console.log('   - GET /files/:size - Returns random data of specified size (KB)')
console.log('     Example: http://localhost:3000/files/1024 will return 1MB data')
console.log('\n⚙️  Press Ctrl+C to stop all servers')
console.log('=================================================')

// Add graceful shutdown handling
process.on('SIGINT', () => {
  console.log('Shutting down servers...')
  servers.forEach(server => server.close())
  process.exit()
})

test.js

// First load the async logger
require('./asyncLogger');

const undici = require('./index.js')

// Use global async logger, fallback to console if it doesn't exist
const logger = global.__asyncLogger || console;
logger.info('Test started - Using logger:', global.__asyncLogger ? 'global.__asyncLogger' : 'console');

async function downloadSingleFile(url, index) {
  const startTime = Date.now()
  const timeout = 5000; // Increased to 5 seconds
  logger.info(`[${index}] Starting request to ${url} at ${new Date().toISOString()}`)

  const controller = new AbortController()
  const reqTimer = setTimeout(() => {
    logger.info(`[${index}] Aborting request after 5000ms`)
    controller.abort()
  }, timeout)

  try {
    const response = await undici.fetch(url, {
      signal: controller.signal,
      headers: {},
      bodyTimeout: 500000, // Increased to 500 seconds to ensure no reading timeout interruption
      headersTimeout: 60000 // Increased header timeout
    })

    logger.info(`[${index}] Got response in ${Date.now() - startTime}ms, status: ${response.status}`)

    if (response.status >= 200 && response.status < 300) {
      logger.info(`[${index}] Starting to read body at ${new Date().toISOString()}`)
      const textStartTime = Date.now()
      
      // Add extra monitoring to detect long-running text() calls
      const textTimeout = setTimeout(() => {
        logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for 5 seconds without completing!`)
      }, 5000)
      
      // Additional: Pass request index to response object for use in readAllBytes
      // This is a special handling for undici's internal implementation
      if (response.body && response.body.getReader) {
        const originalGetReader = response.body.getReader.bind(response.body);
        response.body.getReader = function() {
          const reader = originalGetReader();
          // Inject index into reader object for tracking
          reader.__tempIndexIdx = index;
          
          // Additionally record read start time
          reader.__readStartTime = Date.now();
          
          // Add diagnostic log
          logger.info(`[${index}] Created reader instance, start time: ${new Date(reader.__readStartTime).toISOString()}`);
          
          return reader;
        };
      }
      
      // Increase timeout monitoring frequency
      const textTimeoutInterval = setInterval(() => {
        const elapsedTime = Date.now() - textStartTime;
        logger.warn(`[${index}] ⚠️ WARNING: response.text() has been running for ${elapsedTime/1000} seconds without completing!`);
      }, 3000);
      
      // This may get stuck, which is the issue we're trying to reproduce
      try {
        const text = await response.text();
        clearInterval(textTimeoutInterval);
        clearTimeout(textTimeout);
        
        const textDuration = Date.now() - textStartTime
        logger.info(`[${index}] Completed reading body in ${textDuration}ms, length: ${text.length}`)
        
        // Record longer response times
        if (textDuration > 2000) {
          logger.warn(`[${index}] ⚠️ Slow text() operation detected: ${textDuration}ms for ${url}`)
        }
        
        return { success: true, length: text.length, duration: textDuration }
      } catch (textError) {
        clearInterval(textTimeoutInterval);
        clearTimeout(textTimeout);
        logger.error(`[${index}] Text reading error:`, textError);
        throw textError;
      }
    } else {
      return { success: false, status: response.status }
    }
  } catch (err) {
    logger.error(`[${index}] Error:`, {
      name: err.name,
      message: err.message,
      cause: err.cause,
      timestamp: new Date().toISOString()
    })
    return { success: false, error: err.message }
  } finally {
    clearTimeout(reqTimer)
  }
}

async function runTest() {
  // Force requests for large files to increase probability of triggering the issue
  const totalRequests = 100 // Reduce number of requests, but each is a large file
  
  // Simulate 4 different hosts
  const hosts = [
    'http://localhost:3000',
    'http://localhost:3001',
    'http://localhost:3002',
    'http://localhost:3003'
  ]
  
  // Generate requests, all for large files
  const requests = Array.from({ length: totalRequests }, (_, i) => {
    // All requests for large files
    const size = Math.floor(Math.random() * 4000) + 2000; // 2MB-6MB
    
    const host = hosts[i % hosts.length]
    return `${host}/files/${size}`
  })

  // Use lower concurrency, focusing on fewer large files
  const concurrency = 10 // Reduce concurrency to allow server to process
  const results = []
  let completedRequests = 0

  // Implement more aggressive concurrency model
  const batches = []
  for (let i = 0; i < requests.length; i += concurrency) {
    batches.push(requests.slice(i, i + concurrency))
  }

  // Use concurrency pattern closer to the issue description
  logger.info(`Starting test with ${requests.length} total requests, concurrency: ${concurrency}`)
  logger.info(`Using ${hosts.length} different hosts`)

  // Add functionality to periodically record current status
  const statusInterval = setInterval(() => {
    logger.info(`Current progress: ${completedRequests}/${totalRequests} completed`);
    logMemoryUsage();
  }, 5000);

  for (let batchIndex = 0; batchIndex < batches.length; batchIndex++) {
    const batch = batches[batchIndex]
    const batchStartTime = Date.now()

    logger.info(`Starting batch ${batchIndex + 1}/${batches.length}, size: ${batch.length}`)

    // Start all requests simultaneously
    const batchPromises = batch.map((url, idx) => {
      const requestIndex = batchIndex * concurrency + idx
      return downloadSingleFile(url, requestIndex)
        .then(result => {
          completedRequests++
          if (completedRequests % 10 === 0) {
            logger.info(`Progress: ${completedRequests}/${totalRequests} completed`)
          }
          return result
        })
    })

    // Wait for all requests in this batch to complete
    const batchResults = await Promise.all(batchPromises)
    results.push(...batchResults)

    logger.info(`Completed batch ${batchIndex + 1}/${batches.length} in ${Date.now() - batchStartTime}ms`)
  }

  clearInterval(statusInterval);
  return results
}

// Add memory usage monitoring
function logMemoryUsage() {
  const used = process.memoryUsage()
  logger.info('Memory usage:', {
    rss: `${Math.round(used.rss / 1024 / 1024)} MB`,
    heapTotal: `${Math.round(used.heapTotal / 1024 / 1024)} MB`,
    heapUsed: `${Math.round(used.heapUsed / 1024 / 1024)} MB`,
    external: `${Math.round(used.external / 1024 / 1024)} MB`
  })
}

// Record memory usage every 10 seconds
const memoryInterval = setInterval(logMemoryUsage, 10000)

runTest().then((results) => {
  clearInterval(memoryInterval)
  
  const successes = results.filter(r => r.success).length
  const failures = results.filter(r => !r.success).length
  
  // Calculate statistics for text() operations
  const textDurations = results
    .filter(r => r.success && r.duration)
    .map(r => r.duration)
  
  const avgDuration = textDurations.reduce((sum, d) => sum + d, 0) / (textDurations.length || 1)
  const maxDuration = Math.max(...(textDurations.length ? textDurations : [0]))
  const slowResponses = results.filter(r => r.success && r.duration > 2000).length

  console.log('\nTest Summary:')
  console.log(`Total requests: ${results.length}`)
  console.log(`Successful: ${successes}`)
  console.log(`Failed: ${failures}`)
  console.log(`Average text() duration: ${avgDuration.toFixed(2)}ms`)
  console.log(`Maximum text() duration: ${maxDuration}ms`)
  console.log(`Slow responses (>2000ms): ${slowResponses}`)
  
  logMemoryUsage()
}).catch(console.error)

asyncLogger.js

const fs = require('fs');
const path = require('path');
const util = require('util');

// Create log directory
const LOG_DIR = path.join(__dirname, 'logs');
if (!fs.existsSync(LOG_DIR)) {
  fs.mkdirSync(LOG_DIR);
}

// Log file path
const LOG_FILE = path.join(LOG_DIR, `test-${new Date().toISOString().replace(/:/g, '-')}.log`);

// Create write stream
const logStream = fs.createWriteStream(LOG_FILE, { flags: 'a' });

// Async logger
class AsyncLogger {
  constructor() {
    this.queue = [];
    this.isProcessing = false;
    
    // Write logs from queue to file every second
    setInterval(() => this.processQueue(), 1000);
    
    // Ensure all logs are written when process exits
    process.on('exit', () => {
      this.processQueueSync();
    });
  }

  // Format log message
  formatMessage(level, message, ...args) {
    const timestamp = new Date().toISOString();
    const formattedArgs = args.map(arg => {
      if (typeof arg === 'object') {
        return util.inspect(arg, { depth: null });
      }
      return arg;
    }).join(' ');
    
    return `[${timestamp}] [${level}] ${message} ${formattedArgs}`.trim();
  }

  // Add log to queue
  log(level, message, ...args) {
    const formattedMessage = this.formatMessage(level, message, ...args);
    
    // Output to console
    if (level === 'ERROR') {
      console.error(formattedMessage);
    } else if (level === 'WARN') {
      console.warn(formattedMessage);
    } else {
      console.log(formattedMessage);
    }
    
    // Add to queue
    this.queue.push(formattedMessage);
    
    // Start processing if queue is not already being processed
    if (!this.isProcessing) {
      this.processQueue();
    }
  }

  // Process queue asynchronously
  async processQueue() {
    if (this.isProcessing || this.queue.length === 0) return;
    
    this.isProcessing = true;
    
    try {
      const messagesToProcess = [...this.queue];
      this.queue = [];
      
      // Write to file
      for (const message of messagesToProcess) {
        logStream.write(message + '\n');
      }
    } finally {
      this.isProcessing = false;
      
      // Continue processing if queue still has messages
      if (this.queue.length > 0) {
        setImmediate(() => this.processQueue());
      }
    }
  }

  // Process queue synchronously (used when process exits)
  processQueueSync() {
    if (this.queue.length === 0) return;
    
    const messagesToProcess = [...this.queue];
    this.queue = [];
    
    for (const message of messagesToProcess) {
      fs.appendFileSync(LOG_FILE, message + '\n');
    }
  }

  // Public methods
  info(message, ...args) {
    this.log('INFO', message, ...args);
  }

  warn(message, ...args) {
    this.log('WARN', message, ...args);
  }

  error(message, ...args) {
    this.log('ERROR', message, ...args);
  }

  debug(message, ...args) {
    this.log('DEBUG', message, ...args);
  }
}

// Export instance and set as global variable
const logger = new AsyncLogger();
global.__asyncLogger = logger;

module.exports = logger;

we need to update lib/web/fetch/util.js

const sleep = (ms)=>{
  return new Promise((resolve) => setTimeout(() => resolve({value: false}), ms));
}

/**
 * @see https://streams.spec.whatwg.org/#readablestreamdefaultreader-read-all-bytes
 * @see https://streams.spec.whatwg.org/#read-loop
 * @param {ReadableStreamDefaultReader} reader
 */
async function readAllBytes (reader) {
  const bytes = []
  let byteLength = 0
  const logger = global.__asyncLogger;
  const tempIndexIdx = reader["__tempIndexIdx"] //this is injected from the download index
  logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step0 ${!!reader}`)
  let times = 0
  let timesInfo = []
  while (true) {
    times++;
    logger.info(`fetch enter readAllBytes with dix ${tempIndexIdx} step0 times=${times}`)
    // Significantly reduce timeout time to ensure it triggers before reader.read()
    const { done, value: chunk } = await Promise.race([reader.read(), sleep(100)])
    timesInfo.push({
      time: new Date().toISOString(),
      times,
      length: byteLength
    })
    logger.info(`fetch enter readAllBytes with idx chunk: ${chunk === false}`)
    if(chunk === false){
      try{
        logger.info(`fetch enter controller readAllBytes with idx ${tempIndexIdx} step3 timeout ${!!reader} times=${times} length=${byteLength} ${JSON.stringify(timesInfo)}`)
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 info 1`, {reader})
        const infoOfReader = {}
        const symbolKey = Object.getOwnPropertySymbols(reader)
        const kStateSymbol = symbolKey.find(sym toString().includes('kType'))
        const kStreamSymbol = symbolKey.find(sym toString().includes('kState'))
        const _innerState = reader[kStateSymbol]
        const _innerReader = reader[kStreamSymbol]
        infoOfReader.state = _innerState
        infoOfReader.state = _innerState
        const waitingLength = _innerReader.readRequests?.length || -1
        const waitingReq = _innerReader.readRequests[0]
        const waitingReqPromimse = waitingReq?.promise
        let waitingReqStatus = 'none';
        if(waitingReqPromimse){
          // Increase chance to detect pending status by increasing timeout to 500ms
          waitingReqStatus = await Promise.race([
            waitingReqPromimse.then(() => "fulfilled").catch(() => "rejected"),
            new Promise(resolve => setTimeout(() => resolve("pending"), 500))
          ]);
        }
        infoOfReader.waitingLength = waitingLength
        const _stream = _innerReader.stream
        const streamKeys = Object.getOwnPropertySymbols(_stream)
        const kStreamStatus = streamKeys.find(sym toString().includes('kState'))
        const stateOfStream = _stream[kStreamStatus]
        infoOfReader.stateOfStream = stateOfStream
        const controller = stateOfStream.controller
        const stateOfController = controller[kStreamStatus]
        infoOfReader.controllerState = stateOfController
        infoOfReader.waitingReqStatus = waitingReqStatus
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} status=${waitingReqStatus} step3 info ${JSON.stringify(infoOfReader)}`)
      }catch(e){
        logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step3 timeout error ${e.message}`, e)
      }
     //when hangs happened, it will await here, so I can catch the hang.
      await sleep(600000)
    }
    if (done) {
      logger.info(`fetch enter readAllBytes with idx ${tempIndexIdx} step2 done`)
      // 1. Call successSteps with bytes.
      return Buffer.concat(bytes, byteLength)
    }

    // 1. If chunk is not a Uint8Array object, call failureSteps
    //    with a TypeError and abort these steps.
    if (!isUint8Array(chunk)) {
      throw new TypeError('Received non-Uint8Array chunk')
    }

    // 2. Append the bytes represented by chunk to bytes.
    bytes.push(chunk)
    byteLength += chunk.length

    // 3. Read-loop given reader, bytes, successSteps, and failureSteps.
  }
}

The root cause of the problem is:

  1. Server stalls after sending partial data
  • In our server simulation code, it intentionally adds a 120-second (2-minute) delay after sending the first few data chunks
  • This situation simulates a "half-open connection" problem in real-world scenarios, where the TCP connection remains open but data transfer is interrupted
  1. Read operation blocks indefinitely
  • reader.read() is a blocking operation that waits until new data arrives or the stream ends
  • But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely

@fwx5618177 fwx5618177 changed the base branch from main to release/v6.21.2 March 24, 2025 14:31
@fwx5618177 fwx5618177 requested a review from metcoder95 March 24, 2025 14:31
@ronag
Copy link
Member

ronag commented Mar 24, 2025

reader.read() is a blocking operation that waits until new data arrives or the stream ends
But if the server neither sends new data nor closes the connection, this read operation will hang indefinitely

We have a header & body read timeout in undici. Why doesn't that trigger?

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this sleep solution is the correct. Can you create a reproducible (failing) test for this problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants