Skip to content

Commit c577b2b

Browse files
Allow passing custom stub to execute operations (#85)
* Allow passing custom stub to execute operations --------- Co-authored-by: Bernd Verst <[email protected]>
1 parent c54c386 commit c577b2b

File tree

3 files changed

+65
-6
lines changed

3 files changed

+65
-6
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from typing import Any, Callable, Protocol
2+
3+
4+
class ProtoTaskHubSidecarServiceStub(Protocol):
5+
"""A stub class matching the TaskHubSidecarServiceStub generated from the .proto file.
6+
Allows the use of TaskHubGrpcWorker methods when a real sidecar stub is not available.
7+
"""
8+
Hello: Callable[..., Any]
9+
StartInstance: Callable[..., Any]
10+
GetInstance: Callable[..., Any]
11+
RewindInstance: Callable[..., Any]
12+
WaitForInstanceStart: Callable[..., Any]
13+
WaitForInstanceCompletion: Callable[..., Any]
14+
RaiseEvent: Callable[..., Any]
15+
TerminateInstance: Callable[..., Any]
16+
SuspendInstance: Callable[..., Any]
17+
ResumeInstance: Callable[..., Any]
18+
QueryInstances: Callable[..., Any]
19+
PurgeInstances: Callable[..., Any]
20+
GetWorkItems: Callable[..., Any]
21+
CompleteActivityTask: Callable[..., Any]
22+
CompleteOrchestratorTask: Callable[..., Any]
23+
CompleteEntityTask: Callable[..., Any]
24+
StreamInstanceHistory: Callable[..., Any]
25+
CreateTaskHub: Callable[..., Any]
26+
DeleteTaskHub: Callable[..., Any]
27+
SignalEntity: Callable[..., Any]
28+
GetEntity: Callable[..., Any]
29+
QueryEntities: Callable[..., Any]
30+
CleanEntityStorage: Callable[..., Any]
31+
AbandonTaskActivityWorkItem: Callable[..., Any]
32+
AbandonTaskOrchestratorWorkItem: Callable[..., Any]
33+
AbandonTaskEntityWorkItem: Callable[..., Any]

durabletask/worker.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from durabletask.internal.helpers import new_timestamp
2525
from durabletask.entities import DurableEntity, EntityLock, EntityInstanceId, EntityContext
2626
from durabletask.internal.orchestration_entity_context import OrchestrationEntityContext
27+
from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub
2728
import durabletask.internal.helpers as ph
2829
import durabletask.internal.exceptions as pe
2930
import durabletask.internal.orchestrator_service_pb2 as pb
@@ -631,7 +632,7 @@ def stop(self):
631632
def _execute_orchestrator(
632633
self,
633634
req: pb.OrchestratorRequest,
634-
stub: stubs.TaskHubSidecarServiceStub,
635+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
635636
completionToken,
636637
):
637638
try:
@@ -679,7 +680,7 @@ def _execute_orchestrator(
679680
def _cancel_orchestrator(
680681
self,
681682
req: pb.OrchestratorRequest,
682-
stub: stubs.TaskHubSidecarServiceStub,
683+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
683684
completionToken,
684685
):
685686
stub.AbandonTaskOrchestratorWorkItem(
@@ -692,7 +693,7 @@ def _cancel_orchestrator(
692693
def _execute_activity(
693694
self,
694695
req: pb.ActivityRequest,
695-
stub: stubs.TaskHubSidecarServiceStub,
696+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
696697
completionToken,
697698
):
698699
instance_id = req.orchestrationInstance.instanceId
@@ -725,7 +726,7 @@ def _execute_activity(
725726
def _cancel_activity(
726727
self,
727728
req: pb.ActivityRequest,
728-
stub: stubs.TaskHubSidecarServiceStub,
729+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
729730
completionToken,
730731
):
731732
stub.AbandonTaskActivityWorkItem(
@@ -738,7 +739,7 @@ def _cancel_activity(
738739
def _execute_entity_batch(
739740
self,
740741
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
741-
stub: stubs.TaskHubSidecarServiceStub,
742+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
742743
completionToken,
743744
):
744745
if isinstance(req, pb.EntityRequest):
@@ -807,7 +808,7 @@ def _execute_entity_batch(
807808
def _cancel_entity_batch(
808809
self,
809810
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
810-
stub: stubs.TaskHubSidecarServiceStub,
811+
stub: Union[stubs.TaskHubSidecarServiceStub, ProtoTaskHubSidecarServiceStub],
811812
completionToken,
812813
):
813814
stub.AbandonTaskEntityWorkItem(
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
5+
from typing import get_type_hints
6+
7+
from durabletask.internal.orchestrator_service_pb2_grpc import TaskHubSidecarServiceStub
8+
from durabletask.internal.proto_task_hub_sidecar_service_stub import ProtoTaskHubSidecarServiceStub
9+
10+
11+
def test_proto_task_hub_shim_is_compatible():
12+
"""Test that ProtoTaskHubSidecarServiceStub is compatible with TaskHubSidecarServiceStub."""
13+
protocol_attrs = set(get_type_hints(ProtoTaskHubSidecarServiceStub).keys())
14+
15+
# Instantiate TaskHubSidecarServiceStub with a dummy channel to get its attributes
16+
class TestChannel():
17+
def unary_unary(self, *args, **kwargs):
18+
pass
19+
20+
def unary_stream(self, *args, **kwargs):
21+
pass
22+
impl_attrs = TaskHubSidecarServiceStub(TestChannel()).__dict__.keys()
23+
24+
# Check missing
25+
assert protocol_attrs == impl_attrs

0 commit comments

Comments
 (0)