Skip to content

Commit

Permalink
release 0.0.306 (#37)
Browse files Browse the repository at this point in the history
* more flexible request.files()

* support python 3.12

* add max_queue_size option

---------

Co-authored-by: nggit <[email protected]>
  • Loading branch information
nggit and nggit committed Aug 8, 2024
1 parent f0a6aee commit b6f33ea
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 33 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/tests-and-coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ jobs:
strategy:
matrix:
python-version:
- '3.12'
- '3.11'
- '3.10'
- '3.9'
- '3.8'
- '3.7'
Expand All @@ -37,27 +37,27 @@ jobs:
python -m bandit --recursive tremolo
python -m pip install --upgrade flake8
python -m flake8 .
if: ${{ matrix.os == 'ubuntu-20.04' && matrix.python-version == '3.11' }}
if: ${{ matrix.os == 'ubuntu-20.04' && matrix.python-version == '3.12' }}
- name: Run tests
run: python alltests.py
if: ${{ matrix.python-version != '3.11' }}
if: ${{ matrix.python-version != '3.12' }}
- name: Run tests with coverage
run: |
python -m coverage run alltests.py
python -m coverage combine
mkdir artifact && mv .coverage artifact/.coverage.${{ matrix.os }}
if: ${{ matrix.python-version == '3.11' && !startsWith(matrix.os, 'windows-') }}
if: ${{ matrix.python-version == '3.12' && !startsWith(matrix.os, 'windows-') }}
- name: Run tests with coverage on Windows
run: |
python -m coverage run alltests.py
python -m coverage combine
mkdir artifact && move .coverage artifact\.coverage.windows
shell: cmd
if: ${{ matrix.python-version == '3.11' && startsWith(matrix.os, 'windows-') }}
if: ${{ matrix.python-version == '3.12' && startsWith(matrix.os, 'windows-') }}
- uses: actions/upload-artifact@v3
with:
path: artifact
if: ${{ matrix.python-version == '3.11' }}
if: ${{ matrix.python-version == '3.12' }}
report:
name: Upload Coverage to Codecov
needs: ['tests']
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name='tremolo',
version='0.0.305',
version='0.0.306',
license='MIT',
author='nggit',
author_email='[email protected]',
Expand Down
14 changes: 14 additions & 0 deletions tests/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async def my_request_middleware(**server):
worker_ctx = server['worker']
worker_ctx.shared += 1
worker_ctx.socket_family = request.socket.family.name
request.protocol.options['max_queue_size'] = 128

assert request.ctx.foo == 'bar'

Expand Down Expand Up @@ -209,6 +210,9 @@ async def post_form(**server):
async def upload(content_type=b'application/octet-stream', **server):
request = server['request']

if request.query_string == b'maxqueue':
request.protocol.options['max_queue_size'] = 0

try:
size = int(request.query['size'][0])
yield (await request.read(0)) + (await request.read(size))
Expand All @@ -235,12 +239,22 @@ async def upload_multipart(stream=False, **server):
yield b''

# stream multipart file upload then send it back as csv
async for info, data in server['request'].files(1):
yield b'%s,%d,%s,%s\r\n' % (info['name'].encode(),
info['length'],
info['type'].encode(),
(data[:5] + data[-3:]))

async for info, data in server['request'].files():
yield b'%s,%d,%s,%s\r\n' % (info['name'].encode(),
info['length'],
info['type'].encode(),
(data[:5] + data[-3:]))

async for info, data in server['request'].files():
# should not raised
raise Exception('EOF!!!')


@app.route('/download')
async def download(**server):
Expand Down
12 changes: 12 additions & 0 deletions tests/test_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ def test_post_upload2_ok_11(self):
)
self.assertEqual(read_chunked(body), create_dummy_body(65536))

def test_post_upload_maxqueue(self):
header, body = getcontents(
host=HTTP_HOST,
port=HTTP_PORT,
raw=b'POST /upload?maxqueue HTTP/1.0\r\nHost: localhost:%d\r\n'
b'Content-Length: 8192\r\n\r\n%s' % (
HTTP_PORT, create_dummy_body(8192))
)

self.assertEqual(header, b'')
self.assertEqual(body, b'')

def test_post_upload_multipart_11(self):
boundary = b'----MultipartBoundary'
header, body = getcontents(
Expand Down
2 changes: 1 addition & 1 deletion tremolo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.0.305'
__version__ = '0.0.306'

from .tremolo import Tremolo # noqa: E402
from . import exceptions # noqa: E402,F401
Expand Down
9 changes: 6 additions & 3 deletions tremolo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
print(' E.g. "/path/to/fullchain.pem"')
print(' --ssl-key SSL private key location')
print(' E.g. "/path/to/privkey.pem"')
print(' --debug Enable debug mode.')
print(' --debug Enable debug mode')
print(' Intended for development')
print(' --reload Enable auto reload on code changes.') # noqa: E501
print(' --reload Enable auto reload on code changes')
print(' Intended for development')
print(' --no-ws Disable built-in WebSocket support') # noqa: E501
print(' --no-ws Disable built-in WebSocket support')
print(' --log-level Defaults to "DEBUG". See')
print(' https://docs.python.org/3/library/logging.html#levels') # noqa: E501
print(' --download-rate Limits the sending speed to the client') # noqa: E501
Expand All @@ -46,6 +46,8 @@
print(' --buffer-size Defaults to 16384, or 16KiB')
print(' --client-max-body-size Defaults to 2 * 1048576, or 2MiB')
print(' --client-max-header-size Defaults to 8192, or 8KiB')
print(' --max-queue-size Maximum number of buffers in the queue') # noqa: E501
print(' Defaults to 128')
print(' --request-timeout Defaults to 30 (seconds)')
print(' --keepalive-timeout Defaults to 30 (seconds)')
print(' --keepalive-connections Maximum number of keep-alive connections') # noqa: E501
Expand Down Expand Up @@ -77,6 +79,7 @@
'--buffer-size',
'--client-max-body-size',
'--client-max-header-size',
'--max-queue-size',
'--request-timeout',
'--keepalive-timeout',
'--keepalive-connections',
Expand Down
14 changes: 10 additions & 4 deletions tremolo/lib/http_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,16 @@ async def put_to_queue(

while mv and queue is not None:
queue.put_nowait(mv[:buffer_size].tobytes())
await asyncio.sleep(
1 / (rate / max(queue.qsize(), 1) /
mv[:buffer_size].nbytes)
)
queue_size = queue.qsize()

if queue_size > self.options['max_queue_size']:
self.logger.error('%d exceeds the value of max_queue_size',
queue_size)
self.abort()
return

await asyncio.sleep(1 / (rate / max(queue_size, 1) /
mv[:buffer_size].nbytes))
mv = mv[buffer_size:]

if transport is not None and self.request is not None:
Expand Down
35 changes: 20 additions & 15 deletions tremolo/lib/http_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ async def form(self, limit=8 * 1048576):

return self.params['post']

async def files(self):
async def files(self, limit=1024):
if self.eof():
return

ct = parse_qs(
self.content_type.replace(b'; ', b'&').replace(b';', b'&')
.decode('latin-1')
Expand All @@ -311,45 +314,45 @@ async def files(self):
except KeyError:
raise BadRequest('missing boundary')

header = bytearray()
header = None
body = bytearray()

header_size = 0
body_size = 0
content_length = 0

agen = self.stream()
paused = False

while header != b'--%s--\r\n' % boundary:
if self._read_instance is None:
self._read_instance = self.stream()

while limit > 0 and self._read_buf != b'--%s--\r\n' % boundary:
data = b''

if not paused:
try:
data = await agen.__anext__()
data = await self._read_instance.__anext__()
except StopAsyncIteration:
if header_size == -1 or body_size == -1:
del body[:]
raise BadRequest('malformed multipart/form-data')

if isinstance(header, bytearray):
header.extend(data)
header_size = header.find(b'\r\n\r\n')
if header is None:
self._read_buf.extend(data)
header_size = self._read_buf.find(b'\r\n\r\n')

if header_size == -1:
if len(header) > 8192:
del header[:]
if len(self._read_buf) > 8192:
raise BadRequest('malformed multipart/form-data')

paused = False
else:
body = header[header_size + 4:]
body.extend(self._read_buf[header_size + 4:])
info = {}

if header_size <= 8192 and header.startswith(
if header_size <= 8192 and self._read_buf.startswith(
b'--%s\r\n' % boundary):
header = self.header.parse(
header,
self._read_buf,
header_size=header_size
).headers

Expand Down Expand Up @@ -380,8 +383,10 @@ async def files(self):

yield info, body[:body_size]

header = body[body_size + 2:]
self._read_buf[:] = body[body_size + 2:]
header = None
content_length = 0
paused = True
limit -= 1

del body[:]
5 changes: 2 additions & 3 deletions tremolo/lib/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,14 @@ async def recv(self):
)

try:
await task
data = await task

self.protocol.queue[0].task_done()
except asyncio.CancelledError:
raise TimeoutError('recv timeout')
finally:
timer.cancel()

data = task.result()

if data is None:
break

Expand Down
1 change: 1 addition & 0 deletions tremolo/tremolo.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ async def _serve(self, host, port, **options):
client_max_header_size=options.get(
'client_max_header_size', 8192
),
max_queue_size=options.get('max_queue_size', 128),
request_timeout=options.get('request_timeout', 30),
keepalive_timeout=options.get(
'keepalive_timeout', 30
Expand Down

0 comments on commit b6f33ea

Please sign in to comment.