Skip to content

Commit a7fb4dd

Browse files
author
gabi
committed
lease implementation
1 parent 7156c7e commit a7fb4dd

14 files changed

+353
-104
lines changed

Diff for: .coveragerc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
[run]
22
branch=True
33
source=rsocket
4-
parallel = true
54

65
[coverage:report]
76
skip_empty = true
87
exclude_lines =
98
pragma: no cover
9+
pass
1010
@(abc\.)?abstractmethod
1111

1212
[html]

Diff for: examples/client.py

+7-10
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,16 @@ def on_subscribe(self, subscription):
3030

3131

3232
async def download(reader, writer):
33-
socket = RSocketClient(reader, writer)
33+
async with RSocketClient(reader, writer) as client:
34+
payload = Payload(b'The quick brown fox', b'meta')
3435

35-
payload = Payload(b'The quick brown fox', b'meta')
36+
result = await client.request_response(payload)
37+
logging.info('RR: {}'.format(result))
3638

37-
result = await socket.request_response(payload)
38-
logging.info('RR: {}'.format(result))
39+
completion_event = Event()
40+
client.request_stream(payload).subscribe(StreamSubscriber(completion_event))
3941

40-
completion_event = Event()
41-
socket.request_stream(payload).subscribe(StreamSubscriber(completion_event))
42-
43-
await completion_event.wait()
44-
45-
await socket.close()
42+
await completion_event.wait()
4643

4744

4845
if __name__ == '__main__':

Diff for: examples/run_against_example_java_server.py

+11-12
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,21 @@ def on_error(self, exception: Exception):
3434
completion_event.set()
3535

3636
connection = await asyncio.open_connection('localhost', 6565)
37-
client = RSocketClient(*connection,
38-
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA.value.name,
39-
data_encoding=WellKnownMimeTypes.APPLICATION_JSON.value.name)
37+
async with RSocketClient(*connection,
38+
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA.value.name,
39+
data_encoding=WellKnownMimeTypes.APPLICATION_JSON.value.name) as client:
40+
metadata = CompositeMetadata()
41+
metadata.append(RoutingMetadata(['investigation.getInvestigationByContext']))
4042

41-
metadata = CompositeMetadata()
42-
metadata.append(RoutingMetadata(['investigation.getInvestigationByContext']))
43+
body = bytes(bytearray(map(ord, json.dumps({'active': True}))))
4344

44-
body = bytes(bytearray(map(ord, json.dumps({'active': True}))))
45+
request = Payload(body, metadata.serialize())
4546

46-
request = Payload(body, metadata.serialize())
47+
subscriber = Subscriber()
48+
client.request_stream(request).subscribe(subscriber)
49+
await completion_event.wait()
4750

48-
subscriber = Subscriber()
49-
client.request_stream(request).subscribe(subscriber)
50-
await completion_event.wait()
51-
52-
assert len(subscriber.values) == 2
51+
assert len(subscriber.values) == 2
5352

5453

5554
asyncio.run(example())

Diff for: rsocket/exceptions.py

+21
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,36 @@ class RSocketAuthenticationError(RSocketError):
1515
pass
1616

1717

18+
class RSocketLeaseNotImplementedError(RSocketError):
19+
pass
20+
21+
22+
class RSocketLeaseNotReceivedTimeoutError(RSocketError):
23+
pass
24+
25+
1826
class RSocketValueErrorException(RSocketError):
1927
pass
2028

2129

30+
class RSocketConnectionRejected(RSocketError):
31+
pass
32+
33+
2234
class RSocketProtocolException(RSocketError):
2335
def __init__(self, error_code: ErrorCode, data: Optional[str] = None):
2436
self.error_code = error_code
2537
self.data = data
2638

39+
def __str__(self) -> str:
40+
return 'RSocket error %s(%s): "%s"' % (self.error_code.name, self.error_code.value, self.data or '')
41+
42+
43+
class RSocketRejected(RSocketProtocolException):
44+
def __init__(self, stream_id: Optional[int] = None):
45+
super().__init__(ErrorCode.REJECTED)
46+
self.stream_id = stream_id
47+
2748

2849
class RSocketFrameFragmentDifferentType(RSocketError):
2950
pass

Diff for: rsocket/frame.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from typing import Tuple
55

66
from rsocket.error_codes import ErrorCode
7-
from rsocket.exceptions import RSocketProtocolException
7+
from rsocket.exceptions import RSocketProtocolException, RSocketRejected
88

99
PROTOCOL_MAJOR_VERSION = 1
1010
PROTOCOL_MINOR_VERSION = 0
@@ -605,6 +605,9 @@ def exception_to_error_frame(stream_id: int, exception: Exception) -> ErrorFrame
605605

606606

607607
def error_frame_to_exception(frame: ErrorFrame) -> Exception:
608+
if frame.error_code == ErrorCode.REJECTED:
609+
return RSocketRejected(frame.stream_id)
610+
608611
if frame.error_code != ErrorCode.APPLICATION_ERROR:
609612
return RSocketProtocolException(frame.error_code)
610613

Diff for: rsocket/lease.py

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import abc
2+
from datetime import timedelta, datetime
3+
from typing import Optional
4+
5+
from rsocket.exceptions import RSocketRejected
6+
from rsocket.frame import LeaseFrame
7+
from rsocket.helpers import to_milliseconds
8+
9+
MAX_31_BIT = pow(2, 31) - 1
10+
11+
12+
class Lease(metaclass=abc.ABCMeta):
13+
@abc.abstractmethod
14+
def assert_request_allowed(self, stream_id: Optional[int] = None):
15+
...
16+
17+
@abc.abstractmethod
18+
def to_frame(self) -> LeaseFrame:
19+
...
20+
21+
22+
class NullLease(Lease):
23+
def assert_request_allowed(self, stream_id: Optional[int] = None):
24+
pass
25+
26+
def to_frame(self) -> LeaseFrame:
27+
frame = LeaseFrame()
28+
frame.number_of_requests = MAX_31_BIT
29+
frame.time_to_live = MAX_31_BIT
30+
return frame
31+
32+
33+
class DefinedLease(Lease):
34+
__slots__ = (
35+
'_maximum_request_count',
36+
'_request_counter',
37+
'_maximum_lease_time',
38+
'_lease_created_at'
39+
)
40+
41+
def __init__(self,
42+
maximum_request_count: int = MAX_31_BIT,
43+
maximum_lease_time: timedelta = timedelta(milliseconds=MAX_31_BIT)):
44+
self._maximum_request_count = maximum_request_count
45+
self._maximum_lease_time = maximum_lease_time
46+
self._lease_created_at = datetime.now()
47+
self._request_counter = 0
48+
49+
def assert_request_allowed(self, stream_id: Optional[int] = None):
50+
if not self._is_request_allowed():
51+
raise RSocketRejected(stream_id)
52+
53+
def _is_request_allowed(self) -> bool:
54+
if self._lease_created_at + self._maximum_lease_time <= datetime.now():
55+
return False
56+
57+
self._request_counter += 1
58+
59+
if self._request_counter > self._maximum_request_count:
60+
return False
61+
62+
return True
63+
64+
def to_frame(self) -> LeaseFrame:
65+
frame = LeaseFrame()
66+
frame.number_of_requests = self._maximum_request_count
67+
frame.time_to_live = to_milliseconds(self._maximum_lease_time)
68+
return frame

Diff for: rsocket/request_handler.py

+9-10
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
from reactivestreams.publisher import Publisher
77
from reactivestreams.subscriber import Subscriber
88
from reactivestreams.subscription import DefaultSubscription
9+
from rsocket.error_codes import ErrorCode
10+
from rsocket.exceptions import RSocketProtocolException
911
from rsocket.extensions.composite_metadata import CompositeMetadata
10-
from rsocket.frame import LeaseFrame
12+
from rsocket.lease import Lease
1113
from rsocket.payload import Payload
1214

1315

@@ -23,8 +25,9 @@ async def on_setup(self,
2325
metadata_encoding: bytes):
2426
...
2527

26-
async def supply_lease(self):
27-
"""Not implemented by default"""
28+
@abstractmethod
29+
async def supply_lease(self) -> Lease:
30+
...
2831

2932
@abstractmethod
3033
async def on_metadata_push(self, metadata: Payload):
@@ -56,13 +59,6 @@ def _parse_composite_metadata(self, metadata: bytes) -> CompositeMetadata:
5659
composite_metadata.parse(metadata)
5760
return composite_metadata
5861

59-
def _send_lease(self, stream: int, time_to_live: int, number_of_requests: int):
60-
lease = LeaseFrame()
61-
lease.stream_id = stream
62-
lease.time_to_live = time_to_live
63-
lease.number_of_requests = number_of_requests
64-
self.socket.send_frame(lease)
65-
6662

6763
class BaseRequestHandler(RequestHandler):
6864
class UnimplementedPublisher(Publisher, DefaultSubscription):
@@ -76,6 +72,9 @@ async def on_setup(self,
7672
metadata_encoding: bytes):
7773
"""Nothing to do on setup by default"""
7874

75+
async def supply_lease(self) -> Lease:
76+
raise RSocketProtocolException(error_code=ErrorCode.UNSUPPORTED_SETUP)
77+
7978
async def request_channel(self, payload: Payload) -> Tuple[Publisher, Subscriber]:
8079
raise RuntimeError('Not implemented')
8180

Diff for: rsocket/routing/routing_request_handler.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
from rsocket.extensions.composite_metadata import CompositeMetadata
1111
from rsocket.extensions.mimetypes import WellKnownMimeTypes
1212
from rsocket.extensions.routing import RoutingMetadata
13-
from rsocket.frame import LeaseFrame
14-
from rsocket.helpers import to_milliseconds
13+
from rsocket.lease import DefinedLease
1514
from rsocket.logger import logger
1615
from rsocket.payload import Payload
1716
from rsocket.routing.request_router import RequestRouter
@@ -47,12 +46,11 @@ def __init__(self,
4746
self._lease_ttl = lease_ttl
4847
self._lease_max_requests = lease_max_requests
4948

50-
async def supply_lease(self):
51-
if self._lease_ttl is not None and self._lease_max_requests is not None:
52-
frame = LeaseFrame()
53-
frame.number_of_requests = self._lease_max_requests
54-
frame.time_to_live = to_milliseconds(self._lease_ttl)
55-
self.socket.send_frame(frame)
49+
async def supply_lease(self) -> DefinedLease:
50+
return DefinedLease(
51+
self._lease_max_requests,
52+
self._lease_ttl
53+
)
5654

5755
# noinspection PyAttributeOutsideInit
5856
async def on_setup(self,

0 commit comments

Comments
 (0)