Skip to content

Commit 28f7884

Browse files
Remove use of thread futures in crt bindings
This primarily serves the purpose of removing thread-based futures from the CRT bindings, but it also ensures that the status code and headers have been received before returning from send.
1 parent 0c9d105 commit 28f7884

File tree

1 file changed

+110
-92
lines changed
  • packages/smithy-http/src/smithy_http/aio

1 file changed

+110
-92
lines changed

packages/smithy-http/src/smithy_http/aio/crt.py

Lines changed: 110 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
# pyright: reportMissingTypeStubs=false,reportUnknownMemberType=false
44
# flake8: noqa: F811
55
import asyncio
6+
from asyncio import Future
7+
from concurrent.futures import Future as ConcurrentFuture
68
from collections import deque
7-
from collections.abc import AsyncGenerator, AsyncIterable, Awaitable
8-
from concurrent.futures import Future
9+
from collections.abc import AsyncGenerator, AsyncIterable
910
from copy import deepcopy
1011
from io import BytesIO, BufferedIOBase
11-
from threading import Lock
1212
from typing import TYPE_CHECKING, Any
1313

1414
if TYPE_CHECKING:
@@ -61,23 +61,100 @@ def _initialize_default_loop(self) -> "crt_io.ClientBootstrap":
6161

6262

6363
class AWSCRTHTTPResponse(http_aio_interfaces.HTTPResponse):
64-
def __init__(self) -> None:
64+
def __init__(self, *, status: int, fields: Fields, body: "CRTResponseBody") -> None:
6565
_assert_crt()
66+
self._status = status
67+
self._fields = fields
68+
self._body = body
69+
70+
@property
71+
def status(self) -> int:
72+
return self.status
73+
74+
@property
75+
def fields(self) -> Fields:
76+
return self._fields
77+
78+
@property
79+
def body(self) -> AsyncIterable[bytes]:
80+
return self.chunks()
81+
82+
@property
83+
def reason(self) -> str | None:
84+
"""Optional string provided by the server explaining the status."""
85+
# TODO: See how CRT exposes reason.
86+
return None
87+
88+
async def chunks(self) -> AsyncGenerator[bytes, None]:
89+
while True:
90+
chunk = await self._body.next()
91+
if chunk:
92+
yield chunk
93+
else:
94+
break
95+
96+
def __repr__(self) -> str:
97+
return (
98+
f"AWSCRTHTTPResponse("
99+
f"status={self.status}, "
100+
f"fields={self.fields!r}, body=...)"
101+
)
102+
103+
104+
class CRTResponseBody:
105+
def __init__(self) -> None:
66106
self._stream: crt_http.HttpClientStream | None = None
67-
self._status_code_future: Future[int] = Future()
68-
self._headers_future: Future[Fields] = Future()
69-
self._chunk_futures: list[Future[bytes]] = []
70-
self._received_chunks: list[bytes] = []
71-
self._chunk_lock: Lock = Lock()
107+
self._chunk_futures: deque[Future[bytes]] = deque()
72108

73-
def _set_stream(self, stream: "crt_http.HttpClientStream") -> None:
109+
# deque is thread safe and the crt is only going to be writing
110+
# with one thread anyway, so we *shouldn't* need to gate this
111+
# behind a lock. In an ideal world, the CRT would expose
112+
# an interface that better matches python's async.
113+
self._received_chunks: deque[bytes] = deque()
114+
115+
def set_stream(self, stream: "crt_http.HttpClientStream") -> None:
74116
if self._stream is not None:
75117
raise SmithyHTTPException("Stream already set on AWSCRTHTTPResponse object")
76118
self._stream = stream
77119
self._stream.completion_future.add_done_callback(self._on_complete)
78120
self._stream.activate()
79121

80-
def _on_headers(
122+
def on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
123+
# TODO: update back pressure window once CRT supports it
124+
if self._chunk_futures:
125+
future = self._chunk_futures.popleft()
126+
future.set_result(chunk)
127+
else:
128+
self._received_chunks.append(chunk)
129+
130+
async def next(self) -> bytes:
131+
if self._stream is None:
132+
raise SmithyHTTPException("Stream not set")
133+
134+
# TODO: update backpressure window once CRT supports it
135+
if self._received_chunks:
136+
return self._received_chunks.popleft()
137+
elif self._stream.completion_future.done():
138+
return b""
139+
else:
140+
future: Future[bytes] = Future()
141+
self._chunk_futures.append(future)
142+
return await future
143+
144+
def _on_complete(
145+
self, completion_future: ConcurrentFuture[int]
146+
) -> None: # pragma: crt-callback
147+
for future in self._chunk_futures:
148+
future.set_result(b"")
149+
self._chunk_futures.clear()
150+
151+
152+
class CRTResponseFactory:
153+
def __init__(self, body: CRTResponseBody) -> None:
154+
self._body = body
155+
self.response_future = Future[AWSCRTHTTPResponse]()
156+
157+
def on_response(
81158
self, status_code: int, headers: list[tuple[str, str]], **kwargs: Any
82159
) -> None: # pragma: crt-callback
83160
fields = Fields()
@@ -90,76 +167,14 @@ def _on_headers(
90167
values=[header_val],
91168
kind=FieldPosition.HEADER,
92169
)
93-
self._status_code_future.set_result(status_code)
94-
self._headers_future.set_result(fields)
95-
96-
def _on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
97-
with self._chunk_lock:
98-
# TODO: update back pressure window once CRT supports it
99-
if self._chunk_futures:
100-
future = self._chunk_futures.pop(0)
101-
future.set_result(chunk)
102-
else:
103-
self._received_chunks.append(chunk)
104-
105-
def _get_chunk_future(self) -> Future[bytes]:
106-
if self._stream is None:
107-
raise SmithyHTTPException("Stream not set")
108-
with self._chunk_lock:
109-
future: Future[bytes] = Future()
110-
# TODO: update backpressure window once CRT supports it
111-
if self._received_chunks:
112-
chunk = self._received_chunks.pop(0)
113-
future.set_result(chunk)
114-
elif self._stream.completion_future.done():
115-
future.set_result(b"")
116-
else:
117-
self._chunk_futures.append(future)
118-
return future
119-
120-
def _on_complete(
121-
self, completion_future: Future[int]
122-
) -> None: # pragma: crt-callback
123-
with self._chunk_lock:
124-
if self._chunk_futures:
125-
future = self._chunk_futures.pop(0)
126-
future.set_result(b"")
127-
128-
@property
129-
def body(self) -> AsyncIterable[bytes]:
130-
return self.chunks()
131-
132-
@property
133-
def status(self) -> int:
134-
"""The 3 digit response status code (1xx, 2xx, 3xx, 4xx, 5xx)."""
135-
return self._status_code_future.result()
136-
137-
@property
138-
def fields(self) -> Fields:
139-
"""List of HTTP header fields."""
140-
if self._stream is None:
141-
raise SmithyHTTPException("Stream not set")
142-
if not self._headers_future.done():
143-
raise SmithyHTTPException("Headers not received yet")
144-
return self._headers_future.result()
145-
146-
@property
147-
def reason(self) -> str | None:
148-
"""Optional string provided by the server explaining the status."""
149-
# TODO: See how CRT exposes reason.
150-
return None
151170

152-
def get_chunk(self) -> Awaitable[bytes]:
153-
future = self._get_chunk_future()
154-
return asyncio.wrap_future(future)
155-
156-
async def chunks(self) -> AsyncGenerator[bytes, None]:
157-
while True:
158-
chunk = await self.get_chunk()
159-
if chunk:
160-
yield chunk
161-
else:
162-
break
171+
self.response_future.set_result(
172+
AWSCRTHTTPResponse(
173+
status=status_code,
174+
fields=fields,
175+
body=self._body,
176+
)
177+
)
163178

164179

165180
ConnectionPoolKey = tuple[str, str, int | None]
@@ -208,22 +223,21 @@ async def send(
208223
"""
209224
crt_request = await self._marshal_request(request)
210225
connection = await self._get_connection(request.destination)
211-
crt_response = AWSCRTHTTPResponse()
226+
response_body = CRTResponseBody()
227+
response_factory = CRTResponseFactory(response_body)
212228
crt_stream = connection.request(
213229
crt_request,
214-
crt_response._on_headers, # pyright: ignore[reportPrivateUsage]
215-
crt_response._on_body, # pyright: ignore[reportPrivateUsage]
230+
response_factory.on_response,
231+
response_body.on_body,
216232
)
217-
crt_response._set_stream(crt_stream) # pyright: ignore[reportPrivateUsage]
218-
return crt_response
233+
response_body.set_stream(crt_stream) # pyright: ignore[reportPrivateUsage]
234+
return await response_factory.response_future
219235

220236
async def _create_connection(
221237
self, url: core_interfaces.URI
222-
) -> "crt_http.HttpClientConnection":
223-
"""Builds and validates connection to ``url``, returns it as
224-
``asyncio.Future``"""
225-
connect_future = self._build_new_connection(url)
226-
connection = await asyncio.wrap_future(connect_future)
238+
) -> crt_http.HttpClientConnection:
239+
"""Builds and validates connection to ``url``"""
240+
connection = await self._build_new_connection(url)
227241
self._validate_connection(connection)
228242
return connection
229243

@@ -258,7 +272,11 @@ def _build_new_connection(
258272
if url.port is not None:
259273
port = url.port
260274

261-
connect_future: Future[crt_http.HttpClientConnection] = (
275+
# CRT returns a concurrent future, which is based on threads, which
276+
# is not something we want to work with in async. Here we're declaring
277+
# its type, but we're immeditately going to wrap it with an asyncio
278+
# future that will prevent any compatibility issues.
279+
connect_future: ConcurrentFuture[crt_http.HttpClientConnection] = (
262280
crt_http.HttpClientConnection.new(
263281
bootstrap=self._client_bootstrap,
264282
host_name=url.host,
@@ -267,7 +285,7 @@ def _build_new_connection(
267285
tls_connection_options=tls_connection_options,
268286
)
269287
)
270-
return connect_future
288+
return asyncio.wrap_future(connect_future)
271289

272290
def _validate_connection(self, connection: "crt_http.HttpClientConnection") -> None:
273291
"""Validates an existing connection against the client config.

0 commit comments

Comments
 (0)