Skip to content

Commit

Permalink
Implement strategies for resetting workflows (coinbase#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
DeRauk authored Sep 17, 2021
1 parent b7757cb commit a4824d4
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ _yardoc/

# rspec failure tracking
.rspec_status

.idea/*
50 changes: 42 additions & 8 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'temporal/workflow'
require 'temporal/workflow/history'
require 'temporal/workflow/execution_info'
require 'temporal/reset_strategy'

module Temporal
class Client
Expand Down Expand Up @@ -134,9 +135,16 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
end
end

def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason: 'manual reset')
workflow_task_id ||= get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
raise Error, 'Could not find a completed workflow task event' unless workflow_task_id
def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_workflow_task unless workflow_task_id

if strategy && workflow_task_id
raise ArgumentError, 'Please specify either :strategy or :workflow_task_id'
end

workflow_task_id ||= find_workflow_task(namespace, workflow_id, run_id, strategy)&.id
raise Error, 'Could not find an event to reset to' unless workflow_task_id

response = connection.reset_workflow_execution(
namespace: namespace,
Expand Down Expand Up @@ -195,6 +203,16 @@ def fail_activity(async_token, exception)
)
end

def get_workflow_history(namespace:, workflow_id:, run_id:)
history_response = connection.get_workflow_execution_history(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id
)

Workflow::History.new(history_response.history.events)
end

class ResultConverter
extend Concerns::Payloads
end
Expand All @@ -208,15 +226,31 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

def get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
history_response = connection.get_workflow_execution_history(
def find_workflow_task(namespace, workflow_id, run_id, strategy)
history = get_workflow_history(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id
)
history = Workflow::History.new(history_response.history.events)
workflow_task_event = history.get_last_completed_workflow_task
workflow_task_event&.id

# TODO: Move this into a separate class if it keeps growing
case strategy
when ResetStrategy::LAST_WORKFLOW_TASK
events = %[WORKFLOW_TASK_COMPLETED WORKFLOW_TASK_TIMED_OUT WORKFLOW_TASK_FAILED].freeze
history.events.select { |event| events.include?(event.type) }.last
when ResetStrategy::FIRST_WORKFLOW_TASK
events = %[WORKFLOW_TASK_COMPLETED WORKFLOW_TASK_TIMED_OUT WORKFLOW_TASK_FAILED].freeze
history.events.select { |event| events.include?(event.type) }.first
when ResetStrategy::LAST_FAILED_ACTIVITY
events = %[ACTIVITY_TASK_FAILED ACTIVITY_TASK_TIMED_OUT].freeze
failed_event = history.events.select { |event| events.include?(event.type) }.last
return unless failed_event

scheduled_event = history.find_event_by_id(failed_event.attributes.scheduled_event_id)
history.find_event_by_id(scheduled_event.attributes.workflow_task_completed_event_id)
else
raise ArgumentError, 'Unsupported reset strategy'
end
end
end
end
7 changes: 7 additions & 0 deletions lib/temporal/reset_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module Temporal
module ResetStrategy
LAST_WORKFLOW_TASK = :last_workflow_task
FIRST_WORKFLOW_TASK = :first_workflow_task
LAST_FAILED_ACTIVITY = :last_failed_activity
end
end
4 changes: 2 additions & 2 deletions lib/temporal/workflow/history.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ def initialize(events)
@iterator = @events.each
end

def last_completed_workflow_task
events.select { |event| event.type == 'WORKFLOW_TASK_COMPLETED' }.last
def find_event_by_id(id)
events.find { |event| event.id == id }
end

# It is very important to replay the History window by window in order to
Expand Down
33 changes: 16 additions & 17 deletions spec/fabricators/grpc/history_event_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,32 @@
end
end

Fabricator(:api_decision_task_scheduled_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_SCHEDULED }
decision_task_scheduled_event_attributes do |attrs|
Temporal::Api::History::V1::DecisionTaskScheduledEventAttributes.new(
Fabricator(:api_workflow_task_scheduled_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_SCHEDULED }
workflow_task_scheduled_event_attributes do |attrs|
Temporal::Api::History::V1::WorkflowTaskScheduledEventAttributes.new(
task_queue: Fabricate(:api_task_queue),
start_to_close_timeout: 15,
attempt: 0
)
end
end

Fabricator(:api_decision_task_started_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_STARTED }
decision_task_started_event_attributes do |attrs|
Temporal::Api::History::V1::DecisionTaskStartedEventAttributes.new(
Fabricator(:api_workflow_task_started_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_STARTED }
workflow_task_started_event_attributes do |attrs|
Temporal::Api::History::V1::WorkflowTaskStartedEventAttributes.new(
scheduled_event_id: attrs[:event_id] - 1,
identity: 'test-worker@test-host',
request_id: SecureRandom.uuid
)
end
end

Fabricator(:api_decision_task_completed_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_DECISION_TASK_COMPLETED }
decision_task_completed_event_attributes do |attrs|
Temporal::Api::History::V1::DecisionTaskCompletedEventAttributes.new(
Fabricator(:api_workflow_task_completed_event, from: :api_history_event) do
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_COMPLETED }
workflow_task_completed_event_attributes do |attrs|
Temporal::Api::History::V1::WorkflowTaskCompletedEventAttributes.new(
scheduled_event_id: attrs[:event_id] - 2,
started_event_id: attrs[:event_id] - 1,
identity: 'test-worker@test-host'
Expand All @@ -71,10 +71,10 @@
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_SCHEDULED }
activity_task_scheduled_event_attributes do |attrs|
Temporal::Api::History::V1::ActivityTaskScheduledEventAttributes.new(
activity_id: attrs[:event_id],
activity_type: Temporal::Api::History::V1::ActivityType.new(name: 'TestActivity'),
activity_id: attrs[:event_id].to_s,
activity_type: Temporal::Api::Common::V1::ActivityType.new(name: 'TestActivity'),
workflow_task_completed_event_id: attrs[:event_id] - 1,
domain: 'test-domain',
namespace: 'test-namespace',
task_queue: Fabricate(:api_task_queue)
)
end
Expand Down Expand Up @@ -107,8 +107,7 @@
event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_FAILED }
activity_task_failed_event_attributes do |attrs|
Temporal::Api::History::V1::ActivityTaskFailedEventAttributes.new(
reason: 'StandardError',
details: 'Activity failed',
failure: Temporal::Api::Failure::V1::Failure.new(message: "Activity failed"),
scheduled_event_id: attrs[:event_id] - 2,
started_event_id: attrs[:event_id] - 1,
identity: 'test-worker@test-host'
Expand Down
122 changes: 121 additions & 1 deletion spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
require 'securerandom'
require 'temporal/client'
require 'temporal/configuration'
require 'temporal/workflow'
require 'temporal/workflow/history'
require 'temporal/connection/grpc'

describe Temporal::Client do
subject { described_class.new(config) }

let(:config) { Temporal::Configuration.new }
let(:connection) { instance_double(Temporal::Connection::GRPC) }
let(:namespace) { 'default-test-namespace' }
let(:workflow_id) { SecureRandom.uuid }
let(:run_id) { SecureRandom.uuid }

class TestStartWorkflow < Temporal::Workflow
namespace 'default-test-namespace'
Expand All @@ -20,7 +25,12 @@ class TestStartWorkflow < Temporal::Workflow
.with(config.for_connection)
.and_return(connection)
end
after { subject.remove_instance_variable(:@connection) }

after do
if subject.instance_variable_get(:@connection)
subject.remove_instance_variable(:@connection)
end
end

describe '#start_workflow' do
let(:temporal_response) do
Expand Down Expand Up @@ -368,9 +378,38 @@ class NamespacedWorkflow < Temporal::Workflow
let(:temporal_response) do
Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionResponse.new(run_id: 'xxx')
end
let(:history) do
Temporal::Workflow::History.new([
Fabricate(:api_workflow_execution_started_event, event_id: 1),
Fabricate(:api_workflow_task_scheduled_event, event_id: 2),
Fabricate(:api_workflow_task_started_event, event_id: 3),
Fabricate(:api_workflow_task_completed_event, event_id: 4),
Fabricate(:api_activity_task_scheduled_event, event_id: 5),
Fabricate(:api_activity_task_started_event, event_id: 6),
Fabricate(:api_activity_task_completed_event, event_id: 7),
Fabricate(:api_workflow_task_scheduled_event, event_id: 8),
Fabricate(:api_workflow_task_started_event, event_id: 9),
Fabricate(:api_workflow_task_completed_event, event_id: 10),
Fabricate(:api_activity_task_scheduled_event, event_id: 11),
Fabricate(:api_activity_task_started_event, event_id: 12),
Fabricate(:api_activity_task_failed_event, event_id: 13),
Fabricate(:api_workflow_task_scheduled_event, event_id: 14),
Fabricate(:api_workflow_task_started_event, event_id: 15),
Fabricate(:api_workflow_task_completed_event, event_id: 16),
Fabricate(:api_workflow_execution_completed_event, event_id: 17)
])
end

before { allow(connection).to receive(:reset_workflow_execution).and_return(temporal_response) }

before do
allow(connection).to receive(:reset_workflow_execution).and_return(temporal_response)
allow(subject)
.to receive(:get_workflow_history)
.with(namespace: namespace, workflow_id: workflow_id, run_id: run_id)
.and_return(history)
end

context 'when workflow_task_id is provided' do
let(:workflow_task_id) { 42 }

Expand Down Expand Up @@ -403,6 +442,87 @@ class NamespacedWorkflow < Temporal::Workflow
expect(result).to eq('xxx')
end
end

context 'when neither strategy nor workflow_task_id is provided' do
it 'uses default strategy' do
subject.reset_workflow(namespace, workflow_id, run_id)

expect(connection).to have_received(:reset_workflow_execution).with(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
workflow_task_event_id: 16
)
end
end

context 'when both strategy and workflow_task_id are provided' do
it 'uses default strategy' do
expect do
subject.reset_workflow(
namespace,
workflow_id,
run_id,
strategy: :last_workflow_task,
workflow_task_id: 10
)
end.to raise_error(ArgumentError, 'Please specify either :strategy or :workflow_task_id')
end
end

context 'with a specified strategy' do
context ':last_workflow_task' do
it 'resets workflow' do
subject.reset_workflow(namespace, workflow_id, run_id, strategy: :last_workflow_task)

expect(connection).to have_received(:reset_workflow_execution).with(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
workflow_task_event_id: 16
)
end
end

context ':first_workflow_task' do
it 'resets workflow' do
subject.reset_workflow(namespace, workflow_id, run_id, strategy: :first_workflow_task)

expect(connection).to have_received(:reset_workflow_execution).with(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
workflow_task_event_id: 4
)
end
end


context ':last_failed_activity' do
it 'resets workflow' do
subject.reset_workflow(namespace, workflow_id, run_id, strategy: :last_failed_activity)

expect(connection).to have_received(:reset_workflow_execution).with(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
reason: 'manual reset',
workflow_task_event_id: 10
)
end
end

context 'unsupported strategy' do
it 'resets workflow' do
expect do
subject.reset_workflow(namespace, workflow_id, run_id, strategy: :foobar)
end.to raise_error(ArgumentError, 'Unsupported reset strategy')
end
end
end
end

describe '#terminate_workflow' do
Expand Down

0 comments on commit a4824d4

Please sign in to comment.