Skip to content

Commit 6e90e84

Browse files
Bidirectional streaming for pubsub (dapr#735)
* works Signed-off-by: Elena Kolevska <[email protected]> * works Signed-off-by: Elena Kolevska <[email protected]> * Sync bidi streaming and tests Signed-off-by: Elena Kolevska <[email protected]> * example fix Signed-off-by: Elena Kolevska <[email protected]> fixes typing Signed-off-by: Elena Kolevska <[email protected]> more readable example Signed-off-by: Elena Kolevska <[email protected]> linter Signed-off-by: Elena Kolevska <[email protected]> * examples fix Signed-off-by: Elena Kolevska <[email protected]> * Adds support for api token Signed-off-by: Elena Kolevska <[email protected]> * clean up Signed-off-by: Elena Kolevska <[email protected]> * Adds docs Signed-off-by: Elena Kolevska <[email protected]> * more small tweaks Signed-off-by: Elena Kolevska <[email protected]> * cleanups and tests Signed-off-by: Elena Kolevska <[email protected]> * Removes receive queue Signed-off-by: Elena Kolevska <[email protected]> * Adds `subscribe_with_handler` Signed-off-by: Elena Kolevska <[email protected]> * Fixes linter Signed-off-by: Elena Kolevska <[email protected]> * Fixes linter Signed-off-by: Elena Kolevska <[email protected]> * Adds async Signed-off-by: Elena Kolevska <[email protected]> * Adds tests for async streaming subscription Signed-off-by: Elena Kolevska <[email protected]> * Linter Signed-off-by: Elena Kolevska <[email protected]> * Split sync and async examples Signed-off-by: Elena Kolevska <[email protected]> * linter Signed-off-by: Elena Kolevska <[email protected]> * Adds interceptors to the async client for bidirectional streaming Signed-off-by: Elena Kolevska <[email protected]> * Removes unneeded class Signed-off-by: Elena Kolevska <[email protected]> * Removes async client Signed-off-by: Elena Kolevska <[email protected]> * Fixes missing docker-compose in examples (dapr#736) Signed-off-by: Elena Kolevska <[email protected]> * Removes async examples test Signed-off-by: Elena Kolevska <[email protected]> * Small cleanup Signed-off-by: Elena Kolevska <[email protected]> * Split up topic names between tests Signed-off-by: Elena Kolevska <[email protected]> * lint Signed-off-by: Elena Kolevska <[email protected]> * Revert "Removes async client" This reverts commit cb4b65b. Signed-off-by: Elena Kolevska <[email protected]> * Split up topic names between tests Signed-off-by: Elena Kolevska <[email protected]> * updates fake server to wait for confirmation message before sending new message Signed-off-by: Elena Kolevska <[email protected]> * Updates protos Signed-off-by: Elena Kolevska <[email protected]> * Adds stream cancelled error Signed-off-by: Elena Kolevska <[email protected]> * linter Signed-off-by: Elena Kolevska <[email protected]> --------- Signed-off-by: Elena Kolevska <[email protected]>
1 parent 0cd0482 commit 6e90e84

31 files changed

+2927
-1797
lines changed

dapr/aio/clients/grpc/client.py

+70-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from warnings import warn
2626

27-
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
27+
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
2828
from typing_extensions import Self
2929

3030
from google.protobuf.message import Message as GrpcMessage
@@ -39,12 +39,14 @@
3939
AioRpcError,
4040
)
4141

42+
from dapr.aio.clients.grpc.subscription import Subscription
4243
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
4344
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
4445
from dapr.clients.grpc._state import StateOptions, StateItem
4546
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
4647
from dapr.clients.health import DaprHealth
4748
from dapr.clients.retry import RetryPolicy
49+
from dapr.common.pubsub.subscription import StreamInactiveError
4850
from dapr.conf.helpers import GrpcEndpoint
4951
from dapr.conf import settings
5052
from dapr.proto import api_v1, api_service_v1, common_v1
@@ -94,6 +96,7 @@
9496
UnlockResponse,
9597
GetWorkflowResponse,
9698
StartWorkflowResponse,
99+
TopicEventResponse,
97100
)
98101

99102

@@ -482,6 +485,72 @@ async def publish_event(
482485

483486
return DaprResponse(await call.initial_metadata())
484487

488+
async def subscribe(
489+
self,
490+
pubsub_name: str,
491+
topic: str,
492+
metadata: Optional[dict] = None,
493+
dead_letter_topic: Optional[str] = None,
494+
) -> Subscription:
495+
"""
496+
Subscribe to a topic with a bidirectional stream
497+
498+
Args:
499+
pubsub_name (str): The name of the pubsub component.
500+
topic (str): The name of the topic.
501+
metadata (Optional[dict]): Additional metadata for the subscription.
502+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
503+
504+
Returns:
505+
Subscription: The Subscription object managing the stream.
506+
"""
507+
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
508+
await subscription.start()
509+
return subscription
510+
511+
async def subscribe_with_handler(
512+
self,
513+
pubsub_name: str,
514+
topic: str,
515+
handler_fn: Callable[..., TopicEventResponse],
516+
metadata: Optional[dict] = None,
517+
dead_letter_topic: Optional[str] = None,
518+
) -> Callable[[], Awaitable[None]]:
519+
"""
520+
Subscribe to a topic with a bidirectional stream and a message handler function
521+
522+
Args:
523+
pubsub_name (str): The name of the pubsub component.
524+
topic (str): The name of the topic.
525+
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
526+
metadata (Optional[dict]): Additional metadata for the subscription.
527+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
528+
529+
Returns:
530+
Callable[[], Awaitable[None]]: An async function to close the subscription.
531+
"""
532+
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)
533+
534+
async def stream_messages(sub: Subscription):
535+
while True:
536+
try:
537+
message = await sub.next_message()
538+
if message:
539+
response = await handler_fn(message)
540+
if response:
541+
await subscription.respond(message, response.status)
542+
else:
543+
continue
544+
except StreamInactiveError:
545+
break
546+
547+
async def close_subscription():
548+
await subscription.close()
549+
550+
asyncio.create_task(stream_messages(subscription))
551+
552+
return close_subscription
553+
485554
async def get_state(
486555
self,
487556
store_name: str,

dapr/aio/clients/grpc/interceptors.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from collections import namedtuple
1717
from typing import List, Tuple
1818

19-
from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails # type: ignore
19+
from grpc.aio import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, ClientCallDetails # type: ignore
2020

2121
from dapr.conf import settings
2222

@@ -51,7 +51,7 @@ def intercept_unary_unary(self, continuation, client_call_details, request):
5151
return continuation(client_call_details, request)
5252

5353

54-
class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor):
54+
class DaprClientInterceptorAsync(UnaryUnaryClientInterceptor, StreamStreamClientInterceptor):
5555
"""The class implements a UnaryUnaryClientInterceptor from grpc to add an interceptor to add
5656
additional headers to all calls as needed.
5757
@@ -115,8 +115,24 @@ async def intercept_unary_unary(self, continuation, client_call_details, request
115115
Returns:
116116
A response object after invoking the continuation callable
117117
"""
118+
new_call_details = await self._intercept_call(client_call_details)
119+
# Call continuation
120+
response = await continuation(new_call_details, request)
121+
return response
122+
123+
async def intercept_stream_stream(self, continuation, client_call_details, request):
124+
"""This method intercepts a stream-stream gRPC call. This is the implementation of the
125+
abstract method defined in StreamStreamClientInterceptor defined in grpc. This is invoked
126+
automatically by grpc based on the order in which interceptors are added to the channel.
127+
128+
Args:
129+
continuation: a callable to be invoked to continue with the RPC or next interceptor
130+
client_call_details: a ClientCallDetails object describing the outgoing RPC
131+
request: the request value for the RPC
118132
119-
# Pre-process or intercept call
133+
Returns:
134+
A response object after invoking the continuation callable
135+
"""
120136
new_call_details = await self._intercept_call(client_call_details)
121137
# Call continuation
122138
response = await continuation(new_call_details, request)

dapr/aio/clients/grpc/subscription.py

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import asyncio
2+
from grpc import StatusCode
3+
from grpc.aio import AioRpcError
4+
5+
from dapr.clients.grpc._response import TopicEventResponse
6+
from dapr.clients.health import DaprHealth
7+
from dapr.common.pubsub.subscription import (
8+
StreamInactiveError,
9+
SubscriptionMessage,
10+
StreamCancelledError,
11+
)
12+
from dapr.proto import api_v1, appcallback_v1
13+
14+
15+
class Subscription:
16+
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
17+
self._stub = stub
18+
self._pubsub_name = pubsub_name
19+
self._topic = topic
20+
self._metadata = metadata or {}
21+
self._dead_letter_topic = dead_letter_topic or ''
22+
self._stream = None
23+
self._send_queue = asyncio.Queue()
24+
self._stream_active = asyncio.Event()
25+
26+
async def start(self):
27+
async def outgoing_request_iterator():
28+
try:
29+
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
30+
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
31+
pubsub_name=self._pubsub_name,
32+
topic=self._topic,
33+
metadata=self._metadata,
34+
dead_letter_topic=self._dead_letter_topic,
35+
)
36+
)
37+
yield initial_request
38+
39+
while self._stream_active.is_set():
40+
try:
41+
response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0)
42+
yield response
43+
except asyncio.TimeoutError:
44+
continue
45+
except Exception as e:
46+
raise Exception(f'Error while writing to stream: {e}')
47+
48+
self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
49+
self._stream_active.set()
50+
await self._stream.read() # discard the initial message
51+
52+
async def reconnect_stream(self):
53+
await self.close()
54+
DaprHealth.wait_until_ready()
55+
print('Attempting to reconnect...')
56+
await self.start()
57+
58+
async def next_message(self):
59+
if not self._stream_active.is_set():
60+
raise StreamInactiveError('Stream is not active')
61+
62+
try:
63+
if self._stream is not None:
64+
message = await self._stream.read()
65+
if message is None:
66+
return None
67+
return SubscriptionMessage(message.event_message)
68+
except AioRpcError as e:
69+
if e.code() == StatusCode.UNAVAILABLE:
70+
print(
71+
f'gRPC error while reading from stream: {e.details()}, '
72+
f'Status Code: {e.code()}. '
73+
f'Attempting to reconnect...'
74+
)
75+
await self.reconnect_stream()
76+
elif e.code() == StatusCode.CANCELLED:
77+
raise StreamCancelledError('Stream has been cancelled')
78+
else:
79+
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
80+
except Exception as e:
81+
raise Exception(f'Error while fetching message: {e}')
82+
83+
return None
84+
85+
async def respond(self, message, status):
86+
try:
87+
status = appcallback_v1.TopicEventResponse(status=status.value)
88+
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
89+
id=message.id(), status=status
90+
)
91+
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
92+
if not self._stream_active.is_set():
93+
raise StreamInactiveError('Stream is not active')
94+
await self._send_queue.put(msg)
95+
except Exception as e:
96+
print(f"Can't send message: {e}")
97+
98+
async def respond_success(self, message):
99+
await self.respond(message, TopicEventResponse('success').status)
100+
101+
async def respond_retry(self, message):
102+
await self.respond(message, TopicEventResponse('retry').status)
103+
104+
async def respond_drop(self, message):
105+
await self.respond(message, TopicEventResponse('drop').status)
106+
107+
async def close(self):
108+
if self._stream:
109+
try:
110+
self._stream.cancel()
111+
self._stream_active.clear()
112+
except AioRpcError as e:
113+
if e.code() != StatusCode.CANCELLED:
114+
raise Exception(f'Error while closing stream: {e}')
115+
except Exception as e:
116+
raise Exception(f'Error while closing stream: {e}')

dapr/clients/grpc/client.py

+75-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
See the License for the specific language governing permissions and
1313
limitations under the License.
1414
"""
15-
15+
import threading
1616
import time
1717
import socket
1818
import json
@@ -41,6 +41,7 @@
4141
from dapr.clients.grpc._state import StateOptions, StateItem
4242
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
4343
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
44+
from dapr.clients.grpc.subscription import Subscription, StreamInactiveError
4445
from dapr.clients.grpc.interceptors import DaprClientInterceptor, DaprClientTimeoutInterceptor
4546
from dapr.clients.health import DaprHealth
4647
from dapr.clients.retry import RetryPolicy
@@ -85,6 +86,7 @@
8586
StartWorkflowResponse,
8687
EncryptResponse,
8788
DecryptResponse,
89+
TopicEventResponse,
8890
)
8991

9092

@@ -481,6 +483,78 @@ def publish_event(
481483

482484
return DaprResponse(call.initial_metadata())
483485

486+
def subscribe(
487+
self,
488+
pubsub_name: str,
489+
topic: str,
490+
metadata: Optional[MetadataTuple] = None,
491+
dead_letter_topic: Optional[str] = None,
492+
) -> Subscription:
493+
"""
494+
Subscribe to a topic with a bidirectional stream
495+
496+
Args:
497+
pubsub_name (str): The name of the pubsub component.
498+
topic (str): The name of the topic.
499+
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
500+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
501+
timeout (Optional[int]): The time in seconds to wait for a message before returning None
502+
If not set, the `next_message` method will block indefinitely
503+
until a message is received.
504+
505+
Returns:
506+
Subscription: The Subscription object managing the stream.
507+
"""
508+
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
509+
subscription.start()
510+
return subscription
511+
512+
def subscribe_with_handler(
513+
self,
514+
pubsub_name: str,
515+
topic: str,
516+
handler_fn: Callable[..., TopicEventResponse],
517+
metadata: Optional[MetadataTuple] = None,
518+
dead_letter_topic: Optional[str] = None,
519+
) -> Callable:
520+
"""
521+
Subscribe to a topic with a bidirectional stream and a message handler function
522+
523+
Args:
524+
pubsub_name (str): The name of the pubsub component.
525+
topic (str): The name of the topic.
526+
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
527+
metadata (Optional[MetadataTuple]): Additional metadata for the subscription.
528+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
529+
timeout (Optional[int]): The time in seconds to wait for a message before returning None
530+
If not set, the `next_message` method will block indefinitely
531+
until a message is received.
532+
"""
533+
subscription = self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)
534+
535+
def stream_messages(sub):
536+
while True:
537+
try:
538+
message = sub.next_message()
539+
if message:
540+
# Process the message
541+
response = handler_fn(message)
542+
if response:
543+
subscription.respond(message, response.status)
544+
else:
545+
# No message received
546+
continue
547+
except StreamInactiveError:
548+
break
549+
550+
def close_subscription():
551+
subscription.close()
552+
553+
streaming_thread = threading.Thread(target=stream_messages, args=(subscription,))
554+
streaming_thread.start()
555+
556+
return close_subscription
557+
484558
def get_state(
485559
self,
486560
store_name: str,

0 commit comments

Comments
 (0)