3
3
import asyncio
4
4
import logging
5
5
from collections .abc import Callable
6
+ from typing import Protocol
6
7
7
- from smithy_core .aio .interfaces import AsyncByteStream
8
+ from smithy_core .aio .interfaces import AsyncByteStream , AsyncWriter
9
+ from smithy_core .aio .interfaces .eventstream import EventPublisher , EventReceiver
8
10
from smithy_core .codecs import Codec
9
11
from smithy_core .deserializers import DeserializeableShape , ShapeDeserializer
10
12
from smithy_core .exceptions import ExpectationNotMetException
11
13
from smithy_core .serializers import SerializeableShape
12
- from smithy_core .aio .interfaces .eventstream import EventPublisher , EventReceiver
13
- from smithy_core .aio .interfaces import AsyncWriter
14
14
15
- from .._private .serializers import EventSerializer as _EventSerializer
16
15
from .._private .deserializers import EventDeserializer as _EventDeserializer
16
+ from .._private .serializers import EventSerializer as _EventSerializer
17
17
from ..events import Event , EventHeaderEncoder , EventMessage
18
18
from ..exceptions import EventError
19
19
20
- from typing import Protocol
21
-
22
20
logger = logging .getLogger (__name__ )
23
21
24
22
25
23
class EventSigner (Protocol ):
26
24
"""A signer to manage credentials and EventMessages for an Event Stream lifecyle."""
27
25
28
- def sign_event (
26
+ async def sign_event (
29
27
self ,
30
28
* ,
31
29
event_message : EventMessage ,
@@ -50,7 +48,7 @@ def __init__(
50
48
51
49
async def send (self , event : E ) -> None :
52
50
if self ._closed :
53
- raise IOError ("Attempted to write to closed stream." )
51
+ raise OSError ("Attempted to write to closed stream." )
54
52
logger .debug ("Preparing to publish event: %s" , event )
55
53
event .serialize (self ._serializer )
56
54
result = self ._serializer .get_result ()
@@ -60,19 +58,18 @@ async def send(self, event: E) -> None:
60
58
)
61
59
if self ._signer is not None :
62
60
encoder = self ._serializer .event_header_encoder_cls
63
- result = await self ._signer .sign_event ( # type: ignore
61
+ result = await self ._signer .sign_event (
64
62
event_message = result ,
65
63
event_encoder_cls = encoder ,
66
64
)
67
65
68
- assert isinstance (result , EventMessage )
69
66
encoded_result = result .encode ()
70
67
try :
71
68
logger .debug ("Publishing serialized event: %s" , result )
72
69
await self ._writer .write (encoded_result )
73
70
except Exception as e :
74
71
await self .close ()
75
- raise IOError ("Failed to write to stream." ) from e
72
+ raise OSError ("Failed to write to stream." ) from e
76
73
77
74
async def close (self ) -> None :
78
75
if self ._closed :
@@ -111,7 +108,7 @@ async def receive(self) -> E | None:
111
108
except Exception as e :
112
109
await self .close ()
113
110
if not isinstance (e , EventError ):
114
- raise IOError ("Failed to read from stream." ) from e
111
+ raise OSError ("Failed to read from stream." ) from e
115
112
raise
116
113
117
114
if event is None :
0 commit comments