Skip to content

Commit 2c647fb

Browse files
committed
Add tests for new event types
1 parent c8f82cc commit 2c647fb

File tree

2 files changed

+89
-1
lines changed

2 files changed

+89
-1
lines changed

durabletask/internal/helpers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ def new_orchestrator_started_event(timestamp: Optional[datetime] = None) -> pb.H
2020
return pb.HistoryEvent(eventId=-1, timestamp=ts, orchestratorStarted=pb.OrchestratorStartedEvent())
2121

2222

23+
def new_orchestrator_completed_event() -> pb.HistoryEvent:
24+
return pb.HistoryEvent(eventId=-1, timestamp=timestamp_pb2.Timestamp(),
25+
orchestratorCompleted=pb.OrchestratorCompletedEvent())
26+
27+
2328
def new_execution_started_event(name: str, instance_id: str, encoded_input: Optional[str] = None,
2429
tags: Optional[dict[str, str]] = None) -> pb.HistoryEvent:
2530
return pb.HistoryEvent(
@@ -119,6 +124,18 @@ def new_failure_details(ex: Exception) -> pb.TaskFailureDetails:
119124
)
120125

121126

127+
def new_event_sent_event(event_id: int, instance_id: str, input: str):
128+
return pb.HistoryEvent(
129+
eventId=event_id,
130+
timestamp=timestamp_pb2.Timestamp(),
131+
eventSent=pb.EventSentEvent(
132+
name="",
133+
input=get_string_value(input),
134+
instanceId=instance_id
135+
)
136+
)
137+
138+
122139
def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
123140
return pb.HistoryEvent(
124141
eventId=-1,

tests/durabletask/test_orchestration_executor.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import durabletask.internal.helpers as helpers
1111
import durabletask.internal.orchestrator_service_pb2 as pb
12-
from durabletask import task, worker
12+
from durabletask import task, worker, entities
1313

1414
logging.basicConfig(
1515
format='%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s',
@@ -1183,6 +1183,77 @@ def orchestrator(ctx: task.OrchestrationContext, _):
11831183
assert str(ex) in complete_action.failureDetails.errorMessage
11841184

11851185

1186+
def test_orchestrator_completed_no_effect():
1187+
def dummy_activity(ctx, _):
1188+
pass
1189+
1190+
def orchestrator(ctx: task.OrchestrationContext, orchestrator_input):
1191+
yield ctx.call_activity(dummy_activity, input=orchestrator_input)
1192+
1193+
registry = worker._Registry()
1194+
name = registry.add_orchestrator(orchestrator)
1195+
1196+
encoded_input = json.dumps(42)
1197+
new_events = [
1198+
helpers.new_orchestrator_started_event(),
1199+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input),
1200+
helpers.new_orchestrator_completed_event()]
1201+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1202+
result = executor.execute(TEST_INSTANCE_ID, [], new_events)
1203+
actions = result.actions
1204+
1205+
assert len(actions) == 1
1206+
assert type(actions[0]) is pb.OrchestratorAction
1207+
assert actions[0].id == 1
1208+
assert actions[0].HasField("scheduleTask")
1209+
assert actions[0].scheduleTask.name == task.get_name(dummy_activity)
1210+
assert actions[0].scheduleTask.input.value == encoded_input
1211+
1212+
1213+
def test_entity_lock_created_as_event():
1214+
test_entity_id = entities.EntityInstanceId("Counter", "myCounter")
1215+
1216+
def orchestrator(ctx: task.OrchestrationContext, _):
1217+
entity_id = test_entity_id
1218+
with (yield ctx.lock_entities([entity_id])):
1219+
return (yield ctx.call_entity(entity_id, "set", 1))
1220+
1221+
registry = worker._Registry()
1222+
name = registry.add_orchestrator(orchestrator)
1223+
1224+
new_events = [
1225+
helpers.new_orchestrator_started_event(),
1226+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, None),
1227+
]
1228+
1229+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1230+
result1 = executor.execute(TEST_INSTANCE_ID, [], new_events)
1231+
actions = result1.actions
1232+
assert len(actions) == 1
1233+
assert type(actions[0]) is pb.OrchestratorAction
1234+
assert actions[0].id == 1
1235+
assert actions[0].HasField("sendEntityMessage")
1236+
assert actions[0].sendEntityMessage.HasField("entityLockRequested")
1237+
1238+
old_events = new_events
1239+
event_sent_input = {
1240+
"id": actions[0].sendEntityMessage.entityLockRequested.criticalSectionId,
1241+
}
1242+
new_events = [
1243+
helpers.new_event_sent_event(1, str(test_entity_id), json.dumps(event_sent_input)),
1244+
helpers.new_event_raised_event(event_sent_input["id"], None),
1245+
]
1246+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1247+
actions = result.actions
1248+
1249+
assert len(actions) == 1
1250+
assert type(actions[0]) is pb.OrchestratorAction
1251+
assert actions[0].id == 2
1252+
assert actions[0].HasField("sendEntityMessage")
1253+
assert actions[0].sendEntityMessage.HasField("entityOperationCalled")
1254+
assert actions[0].sendEntityMessage.entityOperationCalled.targetInstanceId.value == str(test_entity_id)
1255+
1256+
11861257
def get_and_validate_complete_orchestration_action_list(expected_action_count: int, actions: list[pb.OrchestratorAction]) -> pb.CompleteOrchestrationAction:
11871258
assert len(actions) == expected_action_count
11881259
assert type(actions[-1]) is pb.OrchestratorAction

0 commit comments

Comments
 (0)