Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more flexible request.files() #37

Merged
merged 5 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/tests-and-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ jobs:
strategy:
matrix:
python-version:
- '3.12'
- '3.11'
- '3.10'
- '3.9'
- '3.8'
- '3.7'
Expand All @@ -37,27 +37,27 @@ jobs:
python -m bandit --recursive tremolo
python -m pip install --upgrade flake8
python -m flake8 .
if: ${{ matrix.os == 'ubuntu-20.04' && matrix.python-version == '3.11' }}
if: ${{ matrix.os == 'ubuntu-20.04' && matrix.python-version == '3.12' }}
- name: Run tests
run: python alltests.py
if: ${{ matrix.python-version != '3.11' }}
if: ${{ matrix.python-version != '3.12' }}
- name: Run tests with coverage
run: |
python -m coverage run alltests.py
python -m coverage combine
mkdir artifact && mv .coverage artifact/.coverage.${{ matrix.os }}
if: ${{ matrix.python-version == '3.11' && !startsWith(matrix.os, 'windows-') }}
if: ${{ matrix.python-version == '3.12' && !startsWith(matrix.os, 'windows-') }}
- name: Run tests with coverage on Windows
run: |
python -m coverage run alltests.py
python -m coverage combine
mkdir artifact && move .coverage artifact\.coverage.windows
shell: cmd
if: ${{ matrix.python-version == '3.11' && startsWith(matrix.os, 'windows-') }}
if: ${{ matrix.python-version == '3.12' && startsWith(matrix.os, 'windows-') }}
- uses: actions/upload-artifact@v3
with:
path: artifact
if: ${{ matrix.python-version == '3.11' }}
if: ${{ matrix.python-version == '3.12' }}
report:
name: Upload Coverage to Codecov
needs: ['tests']
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='tremolo',
version='0.0.305',
version='0.0.306',
license='MIT',
author='nggit',
author_email='[email protected]',
Expand Down
14 changes: 14 additions & 0 deletions tests/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async def my_request_middleware(**server):
worker_ctx = server['worker']
worker_ctx.shared += 1
worker_ctx.socket_family = request.socket.family.name
request.protocol.options['max_queue_size'] = 128

assert request.ctx.foo == 'bar'

Expand Down Expand Up @@ -209,6 +210,9 @@ async def post_form(**server):
async def upload(content_type=b'application/octet-stream', **server):
request = server['request']

if request.query_string == b'maxqueue':
request.protocol.options['max_queue_size'] = 0

try:
size = int(request.query['size'][0])
yield (await request.read(0)) + (await request.read(size))
Expand All @@ -235,12 +239,22 @@ async def upload_multipart(stream=False, **server):
yield b''

# stream multipart file upload then send it back as csv
async for info, data in server['request'].files(1):
yield b'%s,%d,%s,%s\r\n' % (info['name'].encode(),
info['length'],
info['type'].encode(),
(data[:5] + data[-3:]))

async for info, data in server['request'].files():
yield b'%s,%d,%s,%s\r\n' % (info['name'].encode(),
info['length'],
info['type'].encode(),
(data[:5] + data[-3:]))

async for info, data in server['request'].files():
# should not raised
raise Exception('EOF!!!')


@app.route('/download')
async def download(**server):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ def test_post_upload2_ok_11(self):
)
self.assertEqual(read_chunked(body), create_dummy_body(65536))

def test_post_upload_maxqueue(self):
header, body = getcontents(
host=HTTP_HOST,
port=HTTP_PORT,
raw=b'POST /upload?maxqueue HTTP/1.0\r\nHost: localhost:%d\r\n'
b'Content-Length: 8192\r\n\r\n%s' % (
HTTP_PORT, create_dummy_body(8192))
)

self.assertEqual(header, b'')
self.assertEqual(body, b'')

def test_post_upload_multipart_11(self):
boundary = b'----MultipartBoundary'
header, body = getcontents(
Expand Down
2 changes: 1 addition & 1 deletion tremolo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.0.305'
__version__ = '0.0.306'

from .tremolo import Tremolo # noqa: E402
from . import exceptions # noqa: E402,F401
Expand Down
9 changes: 6 additions & 3 deletions tremolo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
print(' E.g. "/path/to/fullchain.pem"')
print(' --ssl-key SSL private key location')
print(' E.g. "/path/to/privkey.pem"')
print(' --debug Enable debug mode.')
print(' --debug Enable debug mode')
print(' Intended for development')
print(' --reload Enable auto reload on code changes.') # noqa: E501
print(' --reload Enable auto reload on code changes')
print(' Intended for development')
print(' --no-ws Disable built-in WebSocket support') # noqa: E501
print(' --no-ws Disable built-in WebSocket support')
print(' --log-level Defaults to "DEBUG". See')
print(' https://docs.python.org/3/library/logging.html#levels') # noqa: E501
print(' --download-rate Limits the sending speed to the client') # noqa: E501
Expand All @@ -46,6 +46,8 @@
print(' --buffer-size Defaults to 16384, or 16KiB')
print(' --client-max-body-size Defaults to 2 * 1048576, or 2MiB')
print(' --client-max-header-size Defaults to 8192, or 8KiB')
print(' --max-queue-size Maximum number of buffers in the queue') # noqa: E501
print(' Defaults to 128')
print(' --request-timeout Defaults to 30 (seconds)')
print(' --keepalive-timeout Defaults to 30 (seconds)')
print(' --keepalive-connections Maximum number of keep-alive connections') # noqa: E501
Expand Down Expand Up @@ -77,6 +79,7 @@
'--buffer-size',
'--client-max-body-size',
'--client-max-header-size',
'--max-queue-size',
'--request-timeout',
'--keepalive-timeout',
'--keepalive-connections',
Expand Down
14 changes: 10 additions & 4 deletions tremolo/lib/http_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,16 @@ async def put_to_queue(

while mv and queue is not None:
queue.put_nowait(mv[:buffer_size].tobytes())
await asyncio.sleep(
1 / (rate / max(queue.qsize(), 1) /
mv[:buffer_size].nbytes)
)
queue_size = queue.qsize()

if queue_size > self.options['max_queue_size']:
self.logger.error('%d exceeds the value of max_queue_size',
queue_size)
self.abort()
return

await asyncio.sleep(1 / (rate / max(queue_size, 1) /
mv[:buffer_size].nbytes))
mv = mv[buffer_size:]

if transport is not None and self.request is not None:
Expand Down
35 changes: 20 additions & 15 deletions tremolo/lib/http_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ async def form(self, limit=8 * 1048576):

return self.params['post']

async def files(self):
async def files(self, limit=1024):
if self.eof():
return

ct = parse_qs(
self.content_type.replace(b'; ', b'&').replace(b';', b'&')
.decode('latin-1')
Expand All @@ -311,45 +314,45 @@ async def files(self):
except KeyError:
raise BadRequest('missing boundary')

header = bytearray()
header = None
body = bytearray()

header_size = 0
body_size = 0
content_length = 0

agen = self.stream()
paused = False

while header != b'--%s--\r\n' % boundary:
if self._read_instance is None:
self._read_instance = self.stream()

while limit > 0 and self._read_buf != b'--%s--\r\n' % boundary:
data = b''

if not paused:
try:
data = await agen.__anext__()
data = await self._read_instance.__anext__()
except StopAsyncIteration:
if header_size == -1 or body_size == -1:
del body[:]
raise BadRequest('malformed multipart/form-data')

if isinstance(header, bytearray):
header.extend(data)
header_size = header.find(b'\r\n\r\n')
if header is None:
self._read_buf.extend(data)
header_size = self._read_buf.find(b'\r\n\r\n')

if header_size == -1:
if len(header) > 8192:
del header[:]
if len(self._read_buf) > 8192:
raise BadRequest('malformed multipart/form-data')

paused = False
else:
body = header[header_size + 4:]
body.extend(self._read_buf[header_size + 4:])
info = {}

if header_size <= 8192 and header.startswith(
if header_size <= 8192 and self._read_buf.startswith(
b'--%s\r\n' % boundary):
header = self.header.parse(
header,
self._read_buf,
header_size=header_size
).headers

Expand Down Expand Up @@ -380,8 +383,10 @@ async def files(self):

yield info, body[:body_size]

header = body[body_size + 2:]
self._read_buf[:] = body[body_size + 2:]
header = None
content_length = 0
paused = True
limit -= 1

del body[:]
5 changes: 2 additions & 3 deletions tremolo/lib/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ async def recv(self):
)

try:
await task
data = await task

self.protocol.queue[0].task_done()
except asyncio.CancelledError:
raise TimeoutError('recv timeout')
finally:
timer.cancel()

data = task.result()

if data is None:
break

Expand Down
1 change: 1 addition & 0 deletions tremolo/tremolo.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ async def _serve(self, host, port, **options):
client_max_header_size=options.get(
'client_max_header_size', 8192
),
max_queue_size=options.get('max_queue_size', 128),
request_timeout=options.get('request_timeout', 30),
keepalive_timeout=options.get(
'keepalive_timeout', 30
Expand Down