Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dj-static==0.0.6
requests==2.32.3
django-apscheduler==0.7.0
pytz==2024.2
aiohttp==3.9.3
157 changes: 157 additions & 0 deletions stress_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
Stress Test Script

This script performs load testing on the eDrop API endpoint by sending concurrent requests
at a specified rate for a specified duration.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a warning here that this should be run when mocking REDCap as well as GBF.


Usage:
python stress_test.py --host HOST_URL [options]

WARNING:
This stress test script is intended for use in a mock environment where both
REDCap and GBF integrations are mocked.

Options:
--host Base host URL (required)
--rps Requests per second (default: 20)
--duration Test duration in seconds (default: 60)

Examples:
# Run against your eDrop host with default settings (20 rps, 60 seconds)
python stress_test.py --host http://your-edrop-api-host/edrop/

# Run with custom settings
python stress_test.py --host http://your-edrop-api-host/edrop/ --rps 30 --duration 120

# Run a quick test with lower values
python stress_test.py --host http://your-edrop-api-host/edrop/ --rps 5 --duration 10

Results:
- Prints real-time progress during the test
- Shows summary statistics after completion
- Saves detailed results to 'load_test_results.json'
"""

import aiohttp
import asyncio
import time
from datetime import datetime
import json
import argparse

def parse_args():
parser = argparse.ArgumentParser(description='Run stress test for edrop API')
parser.add_argument('--host', required=True,
help='Base host URL (required)')
parser.add_argument('--rps', type=int, default=20,
help='Requests per second (default: 20)')
parser.add_argument('--duration', type=int, default=60,
help='Total duration in seconds (default: 60)')
return parser.parse_args()


BASE_TEST_PAYLOAD = {
'instrument': 'contact',
'project_id': 'test',
'project_url': 'http://test.com',
'contact_complete': '2'
}

async def make_request(session, request_id, base_url):
"""
Sends a single POST request to the API, returning detailed results
including status code, response time, and response body.
"""
# Vary the record field to ensure each request is unique
payload = BASE_TEST_PAYLOAD.copy()
payload['record'] = str(999999 + request_id)

start_time = time.time()
try:
async with session.post(base_url, data=payload) as response:
duration = time.time() - start_time
status = response.status
try:
text = await response.text()
except:
text = "Could not read response"

return {
'request_id': request_id,
'timestamp': datetime.now().isoformat(),
'status': status,
'duration': duration,
'response': text
}
except Exception as e:
return {
'request_id': request_id,
'timestamp': datetime.now().isoformat(),
'status': 'error',
'duration': time.time() - start_time,
'response': str(e)
}

async def run_load_test(base_url, requests_per_second, duration):
results = []
request_counter = 0

# Calculate delay between requests to maintain the RPS
delay = 1.0 / requests_per_second

async with aiohttp.ClientSession() as session:
start_time = time.time()

while time.time() - start_time < duration:
tasks = []
batch_start = time.time()

# Create a single request task each loop iteration
task = asyncio.create_task(make_request(session, request_counter, base_url))
tasks.append(task)
request_counter += 1

# Wait for the request to complete
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)

# Calculate sleep time to maintain rate
elapsed = time.time() - batch_start
if elapsed < delay:
await asyncio.sleep(delay - elapsed)

print(f"\rRequests sent: {request_counter}, Elapsed time: {int(time.time() - start_time)}s", end='')

return results

async def main():
args = parse_args()

# Target URL <host>/api/order/create
base_url = f"{args.host}api/order/create"
requests_per_second = args.rps
duration = args.duration

print(f"Starting load test - {requests_per_second} requests/second for {duration} seconds")
print(f"Target URL: {base_url}")
results = await run_load_test(base_url, requests_per_second, duration)

# Analyze results
total_requests = len(results)
successful_requests = sum(1 for r in results if isinstance(r['status'], int) and 200 <= r['status'] < 300)
avg_duration = sum(r['duration'] for r in results) / len(results)

print("\n\nTest Results:")
print(f"Total Requests: {total_requests}")
print(f"Successful Requests: {successful_requests}")
print(f"Average Response Time: {avg_duration:.3f}s")

# Save detailed results to file
with open('load_test_results.json', 'w') as f:
json.dump(results, f, indent=2)

print("\nDetailed results saved to load_test_results.json")

if __name__ == "__main__":
asyncio.run(main())