Skip to content

Commit 338707d

Browse files
authored
Merge pull request #340 from d2army/transfers-master
Making more deterministic workflow resiliency
2 parents 80f0063 + 0c52a62 commit 338707d

File tree

11 files changed

+295
-42
lines changed

11 files changed

+295
-42
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Changelog
22

3+
## 0.0.4
4+
- Patching
5+
36
## 0.0.1
47
- First release
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module Temporal
2+
module Concerns
3+
module InputDeserializer
4+
def deserialize(input)
5+
JSON.deserialize(input)
6+
rescue Oj::ParseError
7+
# Copied over from the Cadence side, similar situation happening with Temporal
8+
#
9+
# cadence official go-client serializes / deserializes input in a different format than this ruby client
10+
# adding additional deserialization logic here to help read input that is passed from go-client
11+
# https://github.com/uber-go/cadence-client/blob/0.18.x/internal/encoding.go#L45-L58
12+
#
13+
# this ruby client serializes / deserializes everything as one big string like below:
14+
# [1012474654, "second input"]
15+
#
16+
# while go client serializes input as separate input followed by line break
17+
# 1012474654
18+
# second input
19+
args = input.split(/\n/)
20+
res = args.map do |arg|
21+
JSON.deserialize(arg)
22+
end
23+
end
24+
end
25+
end
26+
end

lib/temporal/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module Temporal
2-
VERSION = '0.0.3'.freeze
2+
VERSION = '0.0.4'.freeze
33
end

lib/temporal/workflow/history/event.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1+
require 'temporal/concerns/input_deserializer'
2+
13
module Temporal
24
class Workflow
35
class History
46
class Event
7+
include ::Temporal::Concerns::InputDeserializer
8+
include Concerns::Payloads
59
EVENT_TYPES = %w[
610
ACTIVITY_TASK_STARTED
711
ACTIVITY_TASK_COMPLETED
@@ -52,6 +56,19 @@ def originating_event_id
5256
end
5357
end
5458

59+
def target_attributes
60+
case type
61+
when 'ACTIVITY_TASK_SCHEDULED'
62+
{
63+
activity_id: attributes.activity_id&.to_i, # activity_id is a string from thrift
64+
activity_type: attributes.activity_type&.name,
65+
input: from_payloads(attributes.input)
66+
}
67+
else
68+
{}
69+
end
70+
end
71+
5572
private
5673

5774
def extract_attributes(raw_event)

lib/temporal/workflow/history/event_target.rb

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ class Workflow
55
class History
66
class EventTarget
77
class UnexpectedEventType < InternalError; end
8+
class UnexpectedCommandType < InternalError; end
89

910
ACTIVITY_TYPE = :activity
1011
CANCEL_ACTIVITY_REQUEST_TYPE = :cancel_activity_request
@@ -19,7 +20,7 @@ class UnexpectedEventType < InternalError; end
1920
UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE = :upsert_search_attributes_request
2021

2122
# NOTE: The order is important, first prefix match wins (will be a longer match)
22-
TARGET_TYPES = {
23+
EVENT_TARGET_TYPES = {
2324
'ACTIVITY_TASK_CANCEL_REQUESTED' => CANCEL_ACTIVITY_REQUEST_TYPE,
2425
'ACTIVITY_TASK' => ACTIVITY_TYPE,
2526
'REQUEST_CANCEL_ACTIVITY_TASK' => CANCEL_ACTIVITY_REQUEST_TYPE,
@@ -38,25 +39,58 @@ class UnexpectedEventType < InternalError; end
3839
'WORKFLOW_EXECUTION' => WORKFLOW_TYPE,
3940
}.freeze
4041

41-
attr_reader :id, :type
42+
WORKFLOW_TARGET_TYPES = {
43+
'Temporal::Workflow::Command::ScheduleActivity' => ACTIVITY_TYPE,
44+
'Temporal::Workflow::Command::RequestActivityCancellation' => CANCEL_ACTIVITY_REQUEST_TYPE,
45+
'Temporal::Workflow::Command::RecordMarker' => MARKER_TYPE,
46+
'Temporal::Workflow::Command::StartTimer' => TIMER_TYPE,
47+
'Temporal::Workflow::Command::CancelTimer' => CANCEL_TIMER_REQUEST_TYPE,
48+
'Temporal::Workflow::Command::CompleteWorkflow' => WORKFLOW_TYPE,
49+
'Temporal::Workflow::Command::FailWorkflow' => WORKFLOW_TYPE,
50+
'Temporal::Workflow::Command::StartChildWorkflow' => CHILD_WORKFLOW_TYPE,
51+
'Temporal::Workflow::Command::SignalExternalWorkflow' => EXTERNAL_WORKFLOW_TYPE,
52+
'Temporal::Workflow::Command::CancelExternalWorkflow' => CANCEL_EXTERNAL_WORKFLOW_REQUEST_TYPE,
53+
'Temporal::Workflow::Command::UpsertSearchAttributes' => UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE,
54+
'Temporal::Workflow::Command::ContinueAsNew' => WORKFLOW_TYPE,
55+
}.freeze
56+
57+
COMMAND_ATTRIBUTE_LISTS = {
58+
'Temporal::Workflow::Command::ScheduleActivity' => [:activity_id, :activity_type, :input],
59+
}
60+
attr_reader :id, :type, :attributes
4261

4362
def self.workflow
4463
@workflow ||= new(1, WORKFLOW_TYPE)
4564
end
4665

4766
def self.from_event(event)
48-
_, target_type = TARGET_TYPES.find { |type, _| event.type.start_with?(type) }
67+
_, target_type = EVENT_TARGET_TYPES.find { |type, _| event.type.start_with?(type) }
4968

5069
unless target_type
5170
raise UnexpectedEventType, "Unexpected event #{event.type}"
5271
end
5372

54-
new(event.originating_event_id, target_type)
73+
new(event.originating_event_id, target_type, attributes: event.target_attributes)
74+
end
75+
76+
def self.from_command(command_id, command)
77+
78+
command_type = command.class.name
79+
target_type = WORKFLOW_TARGET_TYPES[command_type]
80+
81+
unless target_type
82+
raise UnexpectedCommandType, "Unexpected command type #{command_type}"
83+
end
84+
85+
attribute_list = COMMAND_ATTRIBUTE_LISTS.fetch(command_type, [])
86+
87+
new(command_id, target_type, attributes: command.to_h.slice(*attribute_list))
5588
end
5689

57-
def initialize(id, type)
90+
def initialize(id, type, attributes: {})
5891
@id = id
5992
@type = type
93+
@attributes = attributes
6094

6195
freeze
6296
end
@@ -74,7 +108,7 @@ def hash
74108
end
75109

76110
def to_s
77-
"#{type} (#{id})"
111+
"#{type}: #{id} (#{attributes})"
78112
end
79113
end
80114
end

lib/temporal/workflow/state_manager.rb

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def schedule(command)
6666
validate_append_command(command)
6767
commands << [command_id, command]
6868

69-
return [event_target_from(command_id, command), cancelation_id]
69+
[History::EventTarget.from_command(command_id, command), cancelation_id]
7070
end
7171

7272
def release?(release_name)
@@ -309,32 +309,6 @@ def apply_event(event)
309309
end
310310
end
311311

312-
def event_target_from(command_id, command)
313-
target_type =
314-
case command
315-
when Command::ScheduleActivity
316-
History::EventTarget::ACTIVITY_TYPE
317-
when Command::RequestActivityCancellation
318-
History::EventTarget::CANCEL_ACTIVITY_REQUEST_TYPE
319-
when Command::RecordMarker
320-
History::EventTarget::MARKER_TYPE
321-
when Command::StartTimer
322-
History::EventTarget::TIMER_TYPE
323-
when Command::CancelTimer
324-
History::EventTarget::CANCEL_TIMER_REQUEST_TYPE
325-
when Command::CompleteWorkflow, Command::FailWorkflow
326-
History::EventTarget::WORKFLOW_TYPE
327-
when Command::StartChildWorkflow
328-
History::EventTarget::CHILD_WORKFLOW_TYPE
329-
when Command::UpsertSearchAttributes
330-
History::EventTarget::UPSERT_SEARCH_ATTRIBUTES_REQUEST_TYPE
331-
when Command::SignalExternalWorkflow
332-
History::EventTarget::EXTERNAL_WORKFLOW_TYPE
333-
end
334-
335-
History::EventTarget.new(command_id, target_type)
336-
end
337-
338312
def dispatch(history_target, name, *attributes)
339313
dispatcher.dispatch(history_target, name, attributes)
340314
end
@@ -352,8 +326,9 @@ def discard_command(history_target)
352326
"A command in the history of previous executions, #{history_target}, was not scheduled upon replay. " + NONDETERMINISM_ERROR_SUGGESTION
353327
end
354328

355-
replay_target = event_target_from(replay_command_id, replay_command)
356-
if history_target != replay_target
329+
replay_target = History::EventTarget.from_command(replay_command_id, replay_command)
330+
331+
if history_target != replay_target || history_target.attributes != replay_target.attributes
357332
raise NonDeterministicWorkflowError,
358333
"Unexpected command. The replaying code is issuing: #{replay_target}, "\
359334
"but the history of previous executions recorded: #{history_target}. " + NONDETERMINISM_ERROR_SUGGESTION

lib/temporal/workflow/task_processor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def queue_time_ms
9494

9595
def fetch_full_history
9696
events = task.history.events.to_a
97+
9798
next_page_token = task.next_page_token
9899
while !next_page_token.empty? do
99100
response = connection.get_workflow_execution_history(

spec/fabricators/grpc/history_event_fabricator.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
require 'securerandom'
22
require 'temporal/concerns/payloads'
3+
require 'temporal/json'
34

45
class TestSerializer
56
extend Temporal::Concerns::Payloads
@@ -10,9 +11,10 @@ class TestSerializer
1011
Fabricator(:api_history_event, from: Temporalio::Api::History::V1::HistoryEvent) do
1112
event_id { 1 }
1213
event_time { Time.now }
14+
transient input: nil
1315
end
1416

15-
Fabricator(:api_workflow_execution_started_event, from: :api_history_event) do
17+
Fabricator(:api_workflow_execution_started_eevent, from: :api_history_event) do
1618
transient :headers, :search_attributes
1719
event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED }
1820
event_time { Time.now }
@@ -24,7 +26,7 @@ class TestSerializer
2426
Temporalio::Api::History::V1::WorkflowExecutionStartedEventAttributes.new(
2527
workflow_type: Fabricate(:api_workflow_type),
2628
task_queue: Fabricate(:api_task_queue),
27-
input: nil,
29+
input: Temporal::JSON.serialize(attrs[:input]),
2830
workflow_execution_timeout: 60,
2931
workflow_task_timeout: 15,
3032
original_execution_run_id: SecureRandom.uuid,

spec/unit/lib/temporal/workflow/history/event_spec.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,31 @@
4141
it { is_expected.to eq(raw_event.event_id) }
4242
end
4343
end
44+
45+
46+
describe '#target_attributes' do
47+
subject { described_class.new(raw_event).target_attributes }
48+
49+
context 'when event is ActivityTaskScheduled' do
50+
let(:input) { ['foo', 'bar', { 'foo' => 'bar' }] }
51+
let(:raw_event) do
52+
Fabricate(:activity_task_scheduled_event_thrift, eventId: 42, input: input)
53+
end
54+
55+
it {
56+
is_expected.to eq({ activity_id: 42, activity_type: 'TestActivity', input: input })
57+
}
58+
end
59+
60+
context 'when event is WorkflowTaskScheduled' do
61+
let(:input) { ['foo', 'bar', { 'foo' => 'bar' }] }
62+
let(:raw_event) do
63+
Fabricate(:workflow_task_scheduled_event_thrift, eventId: 42)
64+
end
65+
66+
it {
67+
is_expected.to eq({})
68+
}
69+
end
70+
end
4471
end

spec/unit/lib/temporal/workflow/history/event_target_spec.rb

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,105 @@
11
require 'temporal/workflow/history/event_target'
22
require 'temporal/workflow/history/event'
3+
require 'temporal/workflow/command'
34

45
describe Temporal::Workflow::History::EventTarget do
56
describe '.from_event' do
67
subject { described_class.from_event(event) }
78
let(:event) { Temporal::Workflow::History::Event.new(raw_event) }
89

910
context 'when event is TIMER_STARTED' do
10-
let(:raw_event) { Fabricate(:api_timer_started_event) }
11+
let(:raw_event) { Fabricate(:api_timer_started_event, eventId: 42) }
1112

12-
it 'sets type to timer' do
13+
it 'sets id and type' do
14+
expect(subject.id).to eq(42)
1315
expect(subject.type).to eq(described_class::TIMER_TYPE)
16+
expect(subject.attributes).to eq({})
1417
end
1518
end
1619

1720
context 'when event is TIMER_CANCELED' do
18-
let(:raw_event) { Fabricate(:api_timer_canceled_event) }
21+
let(:raw_event) { Fabricate(:api_timer_canceled_event, eventId: 42) }
1922

20-
it 'sets type to cancel_timer_request' do
23+
it 'sets id and type' do
24+
expect(subject.id).to eq(42)
2125
expect(subject.type).to eq(described_class::CANCEL_TIMER_REQUEST_TYPE)
26+
expect(subject.attributes).to eq({})
27+
end
28+
end
29+
30+
context 'when event is ScheduleActivity' do
31+
let(:input) { ['foo', 'bar', { 'foo' => 'bar' }] }
32+
let(:raw_event) { Fabricate(:activity_task_scheduled_event_thrift, eventId: 42, input: input) }
33+
34+
it 'sets id, type and attributes' do
35+
expect(subject.id).to eq(42)
36+
expect(subject.type).to eq(described_class::ACTIVITY_TYPE)
37+
expect(subject.attributes).to eq({ activity_id: 42, activity_type: 'TestActivity', input: input })
38+
end
39+
end
40+
end
41+
42+
describe '.from_decision' do
43+
subject { described_class.from_command(42, decision) }
44+
45+
context 'when decision is ScheduleActivity' do
46+
let(:raw_decision) { { activity_type: 'foo', activity_id: 123, input: ['bar'], domain: 'domain' } }
47+
let(:decision) { Temporal::Workflow::Command::ScheduleActivity.new(**raw_decision) }
48+
49+
it 'sets id, type' do
50+
expect(subject.id).to eq(42)
51+
expect(subject.type).to eq(described_class::ACTIVITY_TYPE)
52+
end
53+
54+
it 'sets and slice the attributes' do
55+
expect(raw_decision).to include(subject.attributes)
56+
expect(subject.attributes.keys).to eq(%i[activity_id activity_type input])
57+
end
58+
end
59+
60+
context 'when decision is StartTimer' do
61+
let(:raw_decision) { { timeout: 10, timer_id: 123 } }
62+
let(:decision) { Temporal::Workflow::Command::StartTimer.new(**raw_decision) }
63+
64+
it 'sets id, type' do
65+
expect(subject.id).to eq(42)
66+
expect(subject.type).to eq(described_class::TIMER_TYPE)
67+
end
68+
69+
it 'sets empty attributes' do
70+
expect(subject.attributes.keys).to eq([])
71+
end
72+
end
73+
end
74+
75+
describe '#==' do
76+
subject do
77+
described_class.new(id, type, attributes: attributes) ==
78+
described_class.new(42, 'type', attributes: { foo: 'bar' })
79+
end
80+
let(:id) { 42 }
81+
let(:type) { 'type' }
82+
let(:attributes) { { foo: 'bar' } }
83+
84+
context 'when all value are the same' do
85+
it 'returns true' do
86+
expect(subject).to eq(true)
87+
end
88+
end
89+
90+
context 'when id are different' do
91+
let(:id) { 1 }
92+
93+
it 'returns false' do
94+
expect(subject).to eq(false)
95+
end
96+
end
97+
98+
context 'when type are different' do
99+
let(:type) { 'other_type' }
100+
101+
it 'returns false' do
102+
expect(subject).to eq(false)
22103
end
23104
end
24105

0 commit comments

Comments
 (0)