Skip to content

Commit 1322497

Browse files
adamruzickaezr-ondrej
authored andcommitted
Orchestrator crash recovery (#340)
* Run only one sidekiq thread for orchestrator The orchestrator jobs only take messages from redis and put them into the orchestrator's mailbox. By reducing the number of threads to 1, we rule out possibility of shuffling order of incoming messages. * Make orchestrator discard items submitted by another orchestrator When the orchestrator boots up, it puts a DrainMarker job onto the default queue, puts itself into recovery mode and starts processing messages. When in recovery mode, it discards any incoming WorkerDone jobs. This means anything that was being done while there was no orchestrator gets only processed by the workers. When in recovery, the orchestrator should still be able to acquire completely new jobs and being in recovery should only impact already running jobs. When a worker receives the DrainMarker job, it replies to orchestrator with a StartupComplete job. Upon receiving the StartupComplete job, the orchestrator performs world invalidation and tries to resume all execution plans which were being managed by the previous orchestrator. When the world invalidation is done, the orchestrator goes out of recovery. This "round trip" is done to ensure we get into as consistent state as possible. After the DrainMarker-StartupComplete exchange, there will be at most n inconsistent execution plans, where n is the number of worker threads. This is just a heuristic and not a complete solution which starts to break when more queues are added. Upon receiving a WorkerDone from a worker with world_id different from the current orchestrator's one, the orchestrator tries to resume the execution plan, unless it already holds its execution lock.
1 parent 6cdd2e7 commit 1322497

File tree

9 files changed

+83
-16
lines changed

9 files changed

+83
-16
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ services:
3434
orchestrator:
3535
<<: *common
3636
command: |
37-
sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator
37+
sidekiq -r ./examples/remote_executor.rb -q dynflow_orchestrator -c 1
3838
worker:
3939
<<: *common
4040
command: |

examples/remote_executor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def initialize_sidekiq_orchestrator
6969
config.connector = connector
7070
config.executor = ::Dynflow::Executors::Sidekiq::Core
7171
config.process_role = :orchestrator
72+
config.auto_validity_check = false
7273
end
7374
end
7475

lib/dynflow/director.rb

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ class Director
2121
UnprocessableEvent = Class.new(Dynflow::Error)
2222

2323
class WorkItem < Serializable
24-
attr_reader :execution_plan_id, :queue
24+
attr_reader :execution_plan_id, :queue, :sender_orchestrator_id
2525

26-
def initialize(execution_plan_id, queue)
26+
def initialize(execution_plan_id, queue, sender_orchestrator_id)
2727
@execution_plan_id = execution_plan_id
2828
@queue = queue
29+
@sender_orchestrator_id = sender_orchestrator_id
2930
end
3031

3132
def world
@@ -46,7 +47,8 @@ def execute
4647
def to_hash
4748
{ class: self.class.name,
4849
execution_plan_id: execution_plan_id,
49-
queue: queue }
50+
queue: queue,
51+
sender_orchestrator_id: sender_orchestrator_id }
5052
end
5153

5254
def self.new_from_hash(hash, *_args)
@@ -57,8 +59,8 @@ def self.new_from_hash(hash, *_args)
5759
class StepWorkItem < WorkItem
5860
attr_reader :step
5961

60-
def initialize(execution_plan_id, step, queue)
61-
super(execution_plan_id, queue)
62+
def initialize(execution_plan_id, step, queue, sender_orchestrator_id)
63+
super(execution_plan_id, queue, sender_orchestrator_id)
6264
@step = step
6365
end
6466

@@ -73,15 +75,16 @@ def to_hash
7375
def self.new_from_hash(hash, *_args)
7476
self.new(hash[:execution_plan_id],
7577
Serializable.from_hash(hash[:step], hash[:execution_plan_id], Dynflow.process_world),
76-
hash[:queue])
78+
hash[:queue],
79+
hash[:sender_orchestrator_id])
7780
end
7881
end
7982

8083
class EventWorkItem < StepWorkItem
8184
attr_reader :event, :request_id
8285

83-
def initialize(request_id, execution_plan_id, step, event, queue)
84-
super(execution_plan_id, step, queue)
86+
def initialize(request_id, execution_plan_id, step, event, queue, sender_orchestrator_id)
87+
super(execution_plan_id, step, queue, sender_orchestrator_id)
8588
@event = event
8689
@request_id = request_id
8790
end
@@ -99,16 +102,17 @@ def self.new_from_hash(hash, *_args)
99102
hash[:execution_plan_id],
100103
Serializable.from_hash(hash[:step], hash[:execution_plan_id], Dynflow.process_world),
101104
Dynflow.serializer.load(hash[:event]),
102-
hash[:queue])
105+
hash[:queue],
106+
hash[:sender_orchestrator_id])
103107
end
104108
end
105109

106110
class FinalizeWorkItem < WorkItem
107111
attr_reader :finalize_steps_data
108112

109113
# @param finalize_steps_data - used to pass the result steps from the worker back to orchestrator
110-
def initialize(execution_plan_id, queue, finalize_steps_data = nil)
111-
super(execution_plan_id, queue)
114+
def initialize(execution_plan_id, queue, sender_orchestrator_id, finalize_steps_data = nil)
115+
super(execution_plan_id, queue, sender_orchestrator_id)
112116
@finalize_steps_data = finalize_steps_data
113117
end
114118

@@ -124,7 +128,7 @@ def to_hash
124128
end
125129

126130
def self.new_from_hash(hash, *_args)
127-
self.new(hash[:execution_plan_id], hash[:queue], hash[:finalize_steps_data])
131+
self.new(*hash.values_at(:execution_plan_id, :queue, :sender_orchestrator_id, :finalize_steps_data))
128132
end
129133
end
130134

lib/dynflow/director/execution_plan_manager.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def restart
3131
end
3232

3333
def prepare_next_step(step)
34-
StepWorkItem.new(execution_plan.id, step, step.queue).tap do |work|
34+
StepWorkItem.new(execution_plan.id, step, step.queue, @world.id).tap do |work|
3535
@running_steps_manager.add(step, work)
3636
end
3737
end
@@ -112,7 +112,7 @@ def start_finalize
112112
return if execution_plan.finalize_flow.empty?
113113
raise 'finalize phase already started' if @finalize_manager
114114
@finalize_manager = SequentialManager.new(@world, execution_plan)
115-
[FinalizeWorkItem.new(execution_plan.id, execution_plan.finalize_steps.first.queue)]
115+
[FinalizeWorkItem.new(execution_plan.id, execution_plan.finalize_steps.first.queue, @world.id)]
116116
end
117117

118118
def finish

lib/dynflow/director/running_steps_manager.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def event(event)
9898
def create_next_event_work_item(step)
9999
event = @events.shift(step.id)
100100
return unless event
101-
work = EventWorkItem.new(event.request_id, event.execution_plan_id, step, event.event, step.queue)
101+
work = EventWorkItem.new(event.request_id, event.execution_plan_id, step, event.event, step.queue, @world.id)
102102
@work_items.push(step.id, work)
103103
work
104104
end

lib/dynflow/executors/sidekiq/core.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def initialize(world, *_args)
2121
wait_for_orchestrator_lock
2222
super
2323
schedule_update_telemetry
24+
begin_startup!
2425
end
2526

2627
def heartbeat
@@ -56,6 +57,29 @@ def update_telemetry
5657
schedule_update_telemetry
5758
end
5859

60+
def work_finished(work)
61+
# If the work item is sent in reply to a request from the current orchestrator, proceed
62+
if work.sender_orchestrator_id == @world.id
63+
super
64+
else
65+
# If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery
66+
# If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining.
67+
handle_unknown_work_item(work) unless @recovery
68+
end
69+
end
70+
71+
def begin_startup!
72+
WorkerJobs::DrainMarker.perform_async(@world.id)
73+
@recovery = true
74+
end
75+
76+
def startup_complete
77+
logger.info('Performing validity checks')
78+
@world.perform_validity_checks
79+
logger.info('Finished performing validity checks')
80+
@recovery = false
81+
end
82+
5983
private
6084

6185
def fallback_queue
@@ -69,6 +93,20 @@ def schedule_update_telemetry
6993
def telemetry_options(queue)
7094
{ queue: queue.to_s, world: @world.id }
7195
end
96+
97+
# We take a look if an execution lock is already being held by an orchestrator (it should be the current one). If no lock is held
98+
# we try to resume the execution plan if possible
99+
def handle_unknown_work_item(work)
100+
# We are past recovery now, if we receive an event here, the execution plan will be most likely paused
101+
# We can either try to rescue it or turn it over to stopped
102+
execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name,
103+
id: "execution-plan:#{work.execution_plan_id}").first
104+
if execution_lock.nil?
105+
plan = @world.persistence.load_execution_plan(work.execution_plan_id)
106+
should_resume = !plan.error? || plan.prepare_for_rescue == :running
107+
@world.execute(plan.id) if should_resume
108+
end
109+
end
72110
end
73111
end
74112
end

lib/dynflow/executors/sidekiq/orchestrator_jobs.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ def perform(error, work_item)
3535
Dynflow.process_world.executor.core.tell([:handle_persistence_error, error, work_item])
3636
end
3737
end
38+
39+
class StartupComplete < InternalJobBase
40+
sidekiq_options queue: :dynflow_orchestrator
41+
42+
# @param request_envelope [Dispatcher::Request] - request to handle on orchestrator side
43+
# usually to start new execution or to pass some event
44+
def perform(world_id)
45+
if Dynflow.process_world.id == world_id
46+
Dynflow.process_world.executor.core.tell([:startup_complete])
47+
else
48+
logger.warn("Received startup complete for a different world #{world_id}, discarding.")
49+
end
50+
end
51+
end
3852
end
3953
end
4054
end

lib/dynflow/executors/sidekiq/worker_jobs.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ def with_telemetry(work_item)
3030
end
3131
end
3232
end
33+
34+
class DrainMarker < InternalJobBase
35+
def perform(world_id)
36+
OrchestratorJobs::StartupComplete.perform_async(world_id)
37+
end
38+
end
3339
end
3440
end
3541
end

test/executor_test.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ module ExecutorTest
2727
::Dynflow.instance_variable_set('@process_world', nil)
2828
end
2929

30+
before do
31+
executor.any_instance.stubs(:begin_startup!)
32+
end
33+
3034
let(:world) do
3135
WorldFactory.create_world do |c|
3236
c.executor = executor

0 commit comments

Comments
 (0)