-
Notifications
You must be signed in to change notification settings - Fork 127
Open
Description
This is a consistently reproducible issue that causes significant delays when sending data across multiple threads.
Scenario Reproduction:
server.py
: A server implemented with FastAPI to receive data.
# server.py
import time
from fastapi import FastAPI, Request
from loguru import logger
app = FastAPI()
@app.post("/upload")
async def upload_file(request: Request):
req_id = request.headers.get("x-request-id", "N/A")
start_time_str = request.headers.get("x-start-time", "0")
content_length = request.headers.get("content-length", "0")
logger.info(f"Request [{req_id}] received, Headers: x-start-time={start_time_str}, content-length={content_length}, attempting to read the request body...")
received_size = 0
start_read_time = time.time()
async for chunk in request.stream():
received_size += len(chunk)
end_read_time = time.time()
read_duration = end_read_time - start_read_time
logger.debug(
f"Request [{req_id}] finished reading body. "
f"Total size received: {received_size / (1024*1024):.2f} MB. "
f"Read duration: {read_duration:.2f} seconds."
)
return {
"message": "Upload received successfully after a long wait.",
"request_id": req_id,
"received_size": received_size
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
client.py
: Implements 2 threads to simultaneously post 2048MiB of data (the larger the data, the longer the lock duration) toserver.py
.
# client.py
import threading
import time
import os
import uuid
import httpx
from loguru import logger
SERVER_URL = "http://127.0.0.1:8000/upload"
NUM_THREADS = 2
DATA_SIZE_BYTES = 2 * 1024 * 1024 * 1024 # 2048 MiB
TIMEOUT_SECONDS = 120
class DataGenerator:
def __init__(self, size):
logger.info(f"Generating a {size / (1024*1024):.2f} MB data chunk in memory...")
self._data = os.urandom(size)
self._size = size
self._pos = 0
logger.info("Data generation complete.")
def __iter__(self):
self._pos = 0
return self
def __next__(self):
if self._pos >= self._size:
raise StopIteration
chunk = self._data[self._pos:]
self._pos = self._size
return chunk
def __len__(self):
return self._size
def send_request(thread_name: str):
logger.info("Thread started.")
try:
data_to_send = DataGenerator(DATA_SIZE_BYTES)
headers = {
"Content-Type": "application/octet-stream",
"x-request-id": str(uuid.uuid4()),
"x-start-time": str(time.time())
}
logger.info(f"Starting POST request with request_id: {headers['x-request-id']}")
start_time = time.time()
with httpx.Client() as client:
response = client.post(SERVER_URL, headers=headers, data=data_to_send, timeout=TIMEOUT_SECONDS)
end_time = time.time()
duration = end_time - start_time
logger.info(f"Request completed in {duration:.2f} seconds, Response status: {response.status_code}, Response body: {response.json()}")
except Exception as e:
logger.error(f"A general exception occurred: {e}", exc_info=True)
if __name__ == "__main__":
logger.info(f"Starting {NUM_THREADS} threads to send {DATA_SIZE_BYTES / (1024*1024):.2f} MB of data each.")
total_mem_gb = (NUM_THREADS * DATA_SIZE_BYTES) / (1024**3)
logger.warning(f"This script will attempt to allocate approximately {total_mem_gb:.2f} GB of RAM.")
threads = []
for i in range(NUM_THREADS):
thread_name = f"SenderThread-{i+1}"
thread = threading.Thread(target=send_request, name=thread_name, args=(thread_name,))
threads.append(thread)
thread.start()
# time.sleep(0.1)
for thread in threads:
thread.join()
logger.info("All threads have completed their tasks.")
Pip Dependency Installation:
# httpcore version is 1.0.5
pip install "fastapi[all]" uvicorn loguru httpx==0.27.0
Start the server:
python server.py
Start the client:
python client.py
The same issue is encountered when changing NUM_THREADS=1
in client.py
(with a 2GB data packet).
Metadata
Metadata
Assignees
Labels
No labels