Skip to content

Commit 16c5e4e

Browse files
Remove use of thread locks in crt bindings
This primarily serves the purpose of removing thread-based locks from the CRT bindings, but it also ensures that the status code and headers have been received before returning from send.
1 parent 0c9d105 commit 16c5e4e

File tree

1 file changed

+115
-88
lines changed
  • packages/smithy-http/src/smithy_http/aio

1 file changed

+115
-88
lines changed

packages/smithy-http/src/smithy_http/aio/crt.py

Lines changed: 115 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
# pyright: reportMissingTypeStubs=false,reportUnknownMemberType=false
44
# flake8: noqa: F811
55
import asyncio
6+
from concurrent.futures import Future as ConcurrentFuture
67
from collections import deque
7-
from collections.abc import AsyncGenerator, AsyncIterable, Awaitable
8-
from concurrent.futures import Future
8+
from collections.abc import AsyncGenerator, AsyncIterable
99
from copy import deepcopy
1010
from io import BytesIO, BufferedIOBase
11-
from threading import Lock
1211
from typing import TYPE_CHECKING, Any
1312

1413
if TYPE_CHECKING:
@@ -61,25 +60,103 @@ def _initialize_default_loop(self) -> "crt_io.ClientBootstrap":
6160

6261

6362
class AWSCRTHTTPResponse(http_aio_interfaces.HTTPResponse):
64-
def __init__(self) -> None:
63+
def __init__(self, *, status: int, fields: Fields, body: "CRTResponseBody") -> None:
6564
_assert_crt()
66-
self._stream: crt_http.HttpClientStream | None = None
67-
self._status_code_future: Future[int] = Future()
68-
self._headers_future: Future[Fields] = Future()
69-
self._chunk_futures: list[Future[bytes]] = []
70-
self._received_chunks: list[bytes] = []
71-
self._chunk_lock: Lock = Lock()
72-
73-
def _set_stream(self, stream: "crt_http.HttpClientStream") -> None:
65+
self._status = status
66+
self._fields = fields
67+
self._body = body
68+
69+
@property
70+
def status(self) -> int:
71+
return self._status
72+
73+
@property
74+
def fields(self) -> Fields:
75+
return self._fields
76+
77+
@property
78+
def body(self) -> AsyncIterable[bytes]:
79+
return self.chunks()
80+
81+
@property
82+
def reason(self) -> str | None:
83+
"""Optional string provided by the server explaining the status."""
84+
# TODO: See how CRT exposes reason.
85+
return None
86+
87+
async def chunks(self) -> AsyncGenerator[bytes, None]:
88+
while True:
89+
chunk = await self._body.next()
90+
if chunk:
91+
yield chunk
92+
else:
93+
break
94+
95+
def __repr__(self) -> str:
96+
return (
97+
f"AWSCRTHTTPResponse("
98+
f"status={self.status}, "
99+
f"fields={self.fields!r}, body=...)"
100+
)
101+
102+
103+
class CRTResponseBody:
104+
def __init__(self) -> None:
105+
self._stream: "crt_http.HttpClientStream | None" = None
106+
self._chunk_futures: deque[ConcurrentFuture[bytes]] = deque()
107+
108+
# deque is thread safe and the crt is only going to be writing
109+
# with one thread anyway, so we *shouldn't* need to gate this
110+
# behind a lock. In an ideal world, the CRT would expose
111+
# an interface that better matches python's async.
112+
self._received_chunks: deque[bytes] = deque()
113+
114+
def set_stream(self, stream: "crt_http.HttpClientStream") -> None:
74115
if self._stream is not None:
75116
raise SmithyHTTPException("Stream already set on AWSCRTHTTPResponse object")
76117
self._stream = stream
77118
self._stream.completion_future.add_done_callback(self._on_complete)
78119
self._stream.activate()
79120

80-
def _on_headers(
121+
def on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
122+
# TODO: update back pressure window once CRT supports it
123+
if self._chunk_futures:
124+
future = self._chunk_futures.popleft()
125+
future.set_result(chunk)
126+
else:
127+
self._received_chunks.append(chunk)
128+
129+
async def next(self) -> bytes:
130+
if self._stream is None:
131+
raise SmithyHTTPException("Stream not set")
132+
133+
# TODO: update backpressure window once CRT supports it
134+
if self._received_chunks:
135+
return self._received_chunks.popleft()
136+
elif self._stream.completion_future.done():
137+
return b""
138+
else:
139+
future = ConcurrentFuture[bytes]()
140+
self._chunk_futures.append(future)
141+
return await asyncio.wrap_future(future)
142+
143+
def _on_complete(
144+
self, completion_future: ConcurrentFuture[int]
145+
) -> None: # pragma: crt-callback
146+
for future in self._chunk_futures:
147+
future.set_result(b"")
148+
self._chunk_futures.clear()
149+
150+
151+
class CRTResponseFactory:
152+
def __init__(self, body: CRTResponseBody) -> None:
153+
self._body = body
154+
self._response_future = ConcurrentFuture[AWSCRTHTTPResponse]()
155+
156+
def on_response(
81157
self, status_code: int, headers: list[tuple[str, str]], **kwargs: Any
82158
) -> None: # pragma: crt-callback
159+
print("on-response")
83160
fields = Fields()
84161
for header_name, header_val in headers:
85162
try:
@@ -90,76 +167,25 @@ def _on_headers(
90167
values=[header_val],
91168
kind=FieldPosition.HEADER,
92169
)
93-
self._status_code_future.set_result(status_code)
94-
self._headers_future.set_result(fields)
95-
96-
def _on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
97-
with self._chunk_lock:
98-
# TODO: update back pressure window once CRT supports it
99-
if self._chunk_futures:
100-
future = self._chunk_futures.pop(0)
101-
future.set_result(chunk)
102-
else:
103-
self._received_chunks.append(chunk)
104-
105-
def _get_chunk_future(self) -> Future[bytes]:
106-
if self._stream is None:
107-
raise SmithyHTTPException("Stream not set")
108-
with self._chunk_lock:
109-
future: Future[bytes] = Future()
110-
# TODO: update backpressure window once CRT supports it
111-
if self._received_chunks:
112-
chunk = self._received_chunks.pop(0)
113-
future.set_result(chunk)
114-
elif self._stream.completion_future.done():
115-
future.set_result(b"")
116-
else:
117-
self._chunk_futures.append(future)
118-
return future
119-
120-
def _on_complete(
121-
self, completion_future: Future[int]
122-
) -> None: # pragma: crt-callback
123-
with self._chunk_lock:
124-
if self._chunk_futures:
125-
future = self._chunk_futures.pop(0)
126-
future.set_result(b"")
127-
128-
@property
129-
def body(self) -> AsyncIterable[bytes]:
130-
return self.chunks()
131-
132-
@property
133-
def status(self) -> int:
134-
"""The 3 digit response status code (1xx, 2xx, 3xx, 4xx, 5xx)."""
135-
return self._status_code_future.result()
136170

137-
@property
138-
def fields(self) -> Fields:
139-
"""List of HTTP header fields."""
140-
if self._stream is None:
141-
raise SmithyHTTPException("Stream not set")
142-
if not self._headers_future.done():
143-
raise SmithyHTTPException("Headers not received yet")
144-
return self._headers_future.result()
171+
self._response_future.set_result(
172+
AWSCRTHTTPResponse(
173+
status=status_code,
174+
fields=fields,
175+
body=self._body,
176+
)
177+
)
145178

146-
@property
147-
def reason(self) -> str | None:
148-
"""Optional string provided by the server explaining the status."""
149-
# TODO: See how CRT exposes reason.
150-
return None
179+
async def await_response(self) -> AWSCRTHTTPResponse:
180+
print(f"Initial status: {self._response_future._state}")
181+
return await asyncio.wrap_future(self._response_future)
151182

152-
def get_chunk(self) -> Awaitable[bytes]:
153-
future = self._get_chunk_future()
154-
return asyncio.wrap_future(future)
183+
def set_done_callback(self, stream: "crt_http.HttpClientStream") -> None:
184+
stream.completion_future.add_done_callback(self._cancel)
155185

156-
async def chunks(self) -> AsyncGenerator[bytes, None]:
157-
while True:
158-
chunk = await self.get_chunk()
159-
if chunk:
160-
yield chunk
161-
else:
162-
break
186+
def _cancel(self, completion_future: ConcurrentFuture[int | Exception]) -> None:
187+
if not self._response_future.done():
188+
self._response_future.cancel()
163189

164190

165191
ConnectionPoolKey = tuple[str, str, int | None]
@@ -208,20 +234,21 @@ async def send(
208234
"""
209235
crt_request = await self._marshal_request(request)
210236
connection = await self._get_connection(request.destination)
211-
crt_response = AWSCRTHTTPResponse()
237+
response_body = CRTResponseBody()
238+
response_factory = CRTResponseFactory(response_body)
212239
crt_stream = connection.request(
213240
crt_request,
214-
crt_response._on_headers, # pyright: ignore[reportPrivateUsage]
215-
crt_response._on_body, # pyright: ignore[reportPrivateUsage]
241+
response_factory.on_response,
242+
response_body.on_body,
216243
)
217-
crt_response._set_stream(crt_stream) # pyright: ignore[reportPrivateUsage]
218-
return crt_response
244+
response_factory.set_done_callback(crt_stream)
245+
response_body.set_stream(crt_stream)
246+
return await response_factory.await_response()
219247

220248
async def _create_connection(
221249
self, url: core_interfaces.URI
222250
) -> "crt_http.HttpClientConnection":
223-
"""Builds and validates connection to ``url``, returns it as
224-
``asyncio.Future``"""
251+
"""Builds and validates connection to ``url``"""
225252
connect_future = self._build_new_connection(url)
226253
connection = await asyncio.wrap_future(connect_future)
227254
self._validate_connection(connection)
@@ -241,7 +268,7 @@ async def _get_connection(
241268

242269
def _build_new_connection(
243270
self, url: core_interfaces.URI
244-
) -> Future["crt_http.HttpClientConnection"]:
271+
) -> ConcurrentFuture["crt_http.HttpClientConnection"]:
245272
if url.scheme == "http":
246273
port = self._HTTP_PORT
247274
tls_connection_options = None
@@ -258,7 +285,7 @@ def _build_new_connection(
258285
if url.port is not None:
259286
port = url.port
260287

261-
connect_future: Future[crt_http.HttpClientConnection] = (
288+
connect_future: ConcurrentFuture[crt_http.HttpClientConnection] = (
262289
crt_http.HttpClientConnection.new(
263290
bootstrap=self._client_bootstrap,
264291
host_name=url.host,

0 commit comments

Comments
 (0)