Skip to content

Commit

Permalink
Add workflow start delay option (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
santiagodoldan authored Dec 5, 2024
1 parent f41efb7 commit b5efd2c
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 22 deletions.
5 changes: 4 additions & 1 deletion lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def initialize(config)
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
# @option options [Hash] :search_attributes
# @option options [Integer] :start_delay determines the amount of seconds to wait before initiating a Workflow
#
# @return [String] workflow's run ID
def start_workflow(workflow, *input, options: {}, **args)
Expand All @@ -67,6 +68,7 @@ def start_workflow(workflow, *input, options: {}, **args)
headers: config.header_propagator_chain.inject(execution_options.headers),
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
start_delay: execution_options.start_delay
)
else
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
Expand All @@ -85,7 +87,8 @@ def start_workflow(workflow, *input, options: {}, **args)
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
signal_name: signal_name,
signal_input: signal_input
signal_input: signal_input,
start_delay: execution_options.start_delay
)
end

Expand Down
8 changes: 6 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def start_workflow_execution(
headers: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
start_delay: nil
)
request = Temporalio::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
identity: identity,
Expand All @@ -137,6 +138,7 @@ def start_workflow_execution(
workflow_execution_timeout: execution_timeout,
workflow_run_timeout: run_timeout,
workflow_task_timeout: task_timeout,
workflow_start_delay: start_delay,
request_id: SecureRandom.uuid,
header: Temporalio::Api::Common::V1::Header.new(
fields: converter.to_payload_map(headers || {})
Expand Down Expand Up @@ -379,7 +381,8 @@ def signal_with_start_workflow_execution(
headers: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
start_delay: nil
)
proto_header_fields = if headers.nil?
converter.to_payload_map({})
Expand All @@ -406,6 +409,7 @@ def signal_with_start_workflow_execution(
workflow_execution_timeout: execution_timeout,
workflow_run_timeout: run_timeout,
workflow_task_timeout: task_timeout,
workflow_start_delay: start_delay,
request_id: SecureRandom.uuid,
header: Temporalio::Api::Common::V1::Header.new(
fields: proto_header_fields
Expand Down
4 changes: 3 additions & 1 deletion lib/temporal/execution_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

module Temporal
class ExecutionOptions
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes,
:start_delay

def initialize(object, options, defaults = nil)
# Options are treated as overrides and take precedence
Expand All @@ -15,6 +16,7 @@ def initialize(object, options, defaults = nil)
@headers = options[:headers] || {}
@memo = options[:memo] || {}
@search_attributes = options[:search_attributes] || {}
@start_delay = options[:start_delay] || 0

# For Temporal::Workflow and Temporal::Activity use defined values as the next option
if has_executable_concern?(object)
Expand Down
37 changes: 23 additions & 14 deletions spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ def inject!(header)
subject.start_workflow(TestStartWorkflow, 42)
expect(connection)
.to have_received(:start_workflow_execution)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: [42],
task_timeout: config.timeouts[:task],
run_timeout: config.timeouts[:run],
execution_timeout: config.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: { 'test' => 'asdf' },
memo: {},
search_attributes: {},
)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: [42],
task_timeout: config.timeouts[:task],
run_timeout: config.timeouts[:run],
execution_timeout: config.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: { 'test' => 'asdf' },
memo: {},
search_attributes: {},
start_delay: 0
)
end
end

Expand Down Expand Up @@ -94,6 +95,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -109,6 +111,7 @@ def inject!(header)
workflow_id_reuse_policy: :reject,
memo: { 'MemoKey1' => 'MemoValue1' },
search_attributes: { 'SearchAttribute1' => 256 },
start_delay: 10
}
)

Expand All @@ -127,6 +130,7 @@ def inject!(header)
headers: { 'Foo' => 'Bar' },
memo: { 'MemoKey1' => 'MemoValue1' },
search_attributes: { 'SearchAttribute1' => 256 },
start_delay: 10
)
end

Expand Down Expand Up @@ -154,6 +158,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -175,6 +180,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -198,6 +204,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end
end
Expand Down Expand Up @@ -225,6 +232,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end
end
Expand Down Expand Up @@ -255,6 +263,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
search_attributes: {},
signal_name: 'the question',
signal_input: expected_signal_argument,
start_delay: 0
)
end

Expand Down
10 changes: 6 additions & 4 deletions spec/unit/lib/temporal/execution_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
task_queue: 'test-task-queue',
retry_policy: { interval: 1, backoff: 2, max_attempts: 5 },
timeouts: { start_to_close: 10 },
headers: { 'TestHeader' => 'Test' }
headers: { 'TestHeader' => 'Test' },
start_delay: 10
}
end

it 'is initialized with full options' do
expect(subject.name).to eq(options[:name])
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -113,12 +114,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts])
expect(subject.timeouts).to eq(options[:timeouts])
expect(subject.headers).to eq(options[:headers])
expect(subject.start_delay).to eq(options[:start_delay])
end
end

context 'when retry policy options are invalid' do
let(:options) { { retry_policy: { max_attempts: 10 } } }

it 'raises' do
expect { subject }.to raise_error(
Temporal::RetryPolicy::InvalidRetryPolicy,
Expand Down
4 changes: 4 additions & 0 deletions spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
execution_timeout: 1,
run_timeout: 2,
task_timeout: 3,
start_delay: 10,
memo: {},
search_attributes: {
'foo-int-attribute' => 256,
Expand All @@ -90,6 +91,7 @@
expect(request.workflow_execution_timeout.seconds).to eq(1)
expect(request.workflow_run_timeout.seconds).to eq(2)
expect(request.workflow_task_timeout.seconds).to eq(3)
expect(request.workflow_start_delay.seconds).to eq(10)
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
expect(request.search_attributes.indexed_fields).to eq({
'foo-int-attribute' => Temporalio::Api::Common::V1::Payload.new(data: '256', metadata: { 'encoding' => 'json/plain' }),
Expand Down Expand Up @@ -138,6 +140,7 @@
execution_timeout: 1,
run_timeout: 2,
task_timeout: 3,
start_delay: 10,
workflow_id_reuse_policy: :allow,
signal_name: 'the question',
signal_input: 'what do you get if you multiply six by nine?'
Expand All @@ -153,6 +156,7 @@
expect(request.workflow_execution_timeout.seconds).to eq(1)
expect(request.workflow_run_timeout.seconds).to eq(2)
expect(request.workflow_task_timeout.seconds).to eq(3)
expect(request.workflow_start_delay.seconds).to eq(10)
expect(request.signal_name).to eq('the question')
expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"')
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
Expand Down

0 comments on commit b5efd2c

Please sign in to comment.