Skip to content

Commit

Permalink
Support for invoking and processing queries (coinbase#141)
Browse files Browse the repository at this point in the history
* Support for invoking and processing queries, WIP

* Catch-all query handler support, feedback changes

Made a handful of changes on approach from the initial spike. This
is operating under an assumption that the added EventTarget type for
query is a valid approach

* Fixes for on_query interface, clean up workflow and spec

* Fix method signature on testing context

* Move catch-all handler back to block

Also adding a second targeted query handler to spec

* Use nil workflow class in test case

* Updates to remove catch all handling, add query reject handling

* More concise when no status returned from server

* More consistent raise message style

* Add test for reject condition not met

* Simplify legacy handling and use serializers for query protos

* Add specs for the new changes

* Test query result & freeze them

* Implement QueryRegistry

* Swap Context#query_handlers with QueryRegistry

* Add a spec for Workflow::Context

* Rename QueryFailedFailure error to QueryFailed

* Small cleanup items

* Update readme

Co-authored-by: antstorm <[email protected]>
  • Loading branch information
Dave Willett and antstorm authored Apr 15, 2022
1 parent 93c7102 commit 398fd24
Show file tree
Hide file tree
Showing 32 changed files with 848 additions and 79 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ Besides calling activities workflows can:
- Use timers
- Receive signals
- Execute other (child) workflows
- Respond to queries [not yet implemented]
- Respond to queries


## Activities
Expand Down
14 changes: 14 additions & 0 deletions examples/bin/query
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env ruby
require_relative '../init'

Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }

workflow_class_name, workflow_id, run_id, query, args = ARGV
workflow_class = Object.const_get(workflow_class_name)

if ![workflow_class, workflow_id, run_id, query].all?
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
end

result = Temporal.query_workflow(workflow_class, query, workflow_id, run_id, args)
puts result.inspect
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ worker.register_workflow(MetadataWorkflow)
worker.register_workflow(ParentCloseWorkflow)
worker.register_workflow(ParentWorkflow)
worker.register_workflow(ProcessFileWorkflow)
worker.register_workflow(QueryWorkflow)
worker.register_workflow(QuickTimeoutWorkflow)
worker.register_workflow(RandomlyFailingWorkflow)
worker.register_workflow(ReleaseWorkflow)
Expand Down
54 changes: 54 additions & 0 deletions examples/spec/integration/query_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require 'workflows/query_workflow'
require 'temporal/errors'

describe QueryWorkflow, :integration do
subject { described_class }

it 'returns the correct result for the queries' do
workflow_id, run_id = run_workflow(described_class)

# Query with nil workflow class
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'started'

# Query with arbitrary args
expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DETRATS'

# Query with no args
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 0

# Query with unregistered handler
expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')

Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id)

# Query for updated signal_count with an unsatisfied reject condition
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
.to eq 1

Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id)
wait_for_workflow_completion(workflow_id, run_id)

# Repeating original query scenarios above, expecting updated state and signal results
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
.to eq 'finished'

expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
'upcase', 'ignored', 'reverse'))
.to eq 'DEHSINIF'

expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
.to eq 2

expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')

# Now that the workflow is completed, test a query with a reject condition satisfied
expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
.to raise_error(Temporal::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
end
end
36 changes: 36 additions & 0 deletions examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class QueryWorkflow < Temporal::Workflow
attr_reader :state, :signal_count, :last_signal_received

def execute
@state = "started"
@signal_count = 0
@last_signal_received = nil

workflow.on_query("state") { |*args| apply_transforms(state, args) }
workflow.on_query("signal_count") { signal_count }

workflow.on_signal do |signal|
@signal_count += 1
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
@state = "finished"

{
signal_count: signal_count,
last_signal_received: last_signal_received,
final_state: state
}
end

private

def apply_transforms(value, transforms)
return value if value.nil? || transforms.empty?
transforms.inject(value) do |memo, input|
next memo unless memo.respond_to?(input)
memo.public_send(input)
end
end
end
5 changes: 3 additions & 2 deletions lib/temporal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Temporal
:describe_namespace,
:list_namespaces,
:signal_workflow,
:query_workflow,
:await_workflow_result,
:reset_workflow,
:terminate_workflow,
Expand Down Expand Up @@ -48,11 +49,11 @@ def metrics
end

private

def default_client
@default_client ||= Client.new(config)
end

def config
@config ||= Configuration.new
end
Expand Down
25 changes: 24 additions & 1 deletion lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,29 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac
)
end

# Issue a query against a running workflow
#
# @param workflow [Temporal::Workflow, nil] workflow class or nil
# @param query [String] name of the query to issue
# @param workflow_id [String]
# @param run_id [String]
# @param args [String, Array, nil] optional arguments for the query
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
# global default
# @param query_reject_condition [Symbol] check Temporal::Connection::GRPC::QUERY_REJECT_CONDITION
def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil, query_reject_condition: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)

connection.query_workflow(
namespace: namespace || execution_options.namespace,
workflow_id: workflow_id,
run_id: run_id,
query: query,
args: args,
query_reject_condition: query_reject_condition
)
end

# Long polls for a workflow to be completed and returns workflow's return value.
#
# @note This function times out after 30 seconds and throws Temporal::TimeoutError,
Expand Down Expand Up @@ -207,7 +230,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
timeout: timeout || max_timeout,
)
rescue GRPC::DeadlineExceeded => e
message = if timeout
message = if timeout
"Timed out after your specified limit of timeout: #{timeout} seconds"
else
"Timed out after #{max_timeout} seconds, which is the maximum supported amount."
Expand Down
8 changes: 8 additions & 0 deletions lib/temporal/concerns/payloads.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
from_payloads(payloads)&.first
end

def from_query_payloads(payloads)
from_payloads(payloads)&.first
end

def from_payload_map(payload_map)
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
end
Expand All @@ -45,6 +49,10 @@ def to_signal_payloads(data)
to_payloads([data])
end

def to_query_payloads(data)
to_payloads([data])
end

def to_payload_map(data)
data.transform_values(&method(:to_payload))
end
Expand Down
68 changes: 58 additions & 10 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class GRPC
close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
}.freeze

QUERY_REJECT_CONDITION = {
none: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NONE,
not_open: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_OPEN,
not_completed_cleanly: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY
}.freeze

DEFAULT_OPTIONS = {
max_page_size: 100
}.freeze
Expand Down Expand Up @@ -142,7 +148,7 @@ def get_workflow_execution_history(
event_type: :all,
timeout: nil
)
if wait_for_new_event
if wait_for_new_event
if timeout.nil?
# This is an internal error. Wrappers should enforce this.
raise "You must specify a timeout when wait_for_new_event = true."
Expand Down Expand Up @@ -183,13 +189,28 @@ def poll_workflow_task_queue(namespace:, task_queue:)
poll_request.execute
end

def respond_workflow_task_completed(namespace:, task_token:, commands:)
def respond_query_task_completed(namespace:, task_token:, query_result:)
query_result_proto = Serializer.serialize(query_result)
request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new(
task_token: task_token,
namespace: namespace,
completed_type: query_result_proto.result_type,
query_result: query_result_proto.answer,
error_message: query_result_proto.error_message,
)

client.respond_query_task_completed(request)
end

def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results: {})
request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new(
namespace: namespace,
identity: identity,
task_token: task_token,
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) },
query_results: query_results.transform_values { |value| Serializer.serialize(value) }
)

client.respond_workflow_task_completed(request)
end

Expand Down Expand Up @@ -452,16 +473,43 @@ def get_search_attributes
raise NotImplementedError
end

def respond_query_task_completed
raise NotImplementedError
end

def reset_sticky_task_queue
raise NotImplementedError
end

def query_workflow
raise NotImplementedError
def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new(
namespace: namespace,
execution: Temporal::Api::Common::V1::WorkflowExecution.new(
workflow_id: workflow_id,
run_id: run_id
),
query: Temporal::Api::Query::V1::WorkflowQuery.new(
query_type: query,
query_args: to_query_payloads(args)
)
)
if query_reject_condition
condition = QUERY_REJECT_CONDITION[query_reject_condition]
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition

request.query_reject_condition = condition
end

begin
response = client.query_workflow(request)
rescue ::GRPC::InvalidArgument => e
raise Temporal::QueryFailed, e.details
end

if response.query_rejected
rejection_status = response.query_rejected.status || 'not specified by server'
raise Temporal::QueryFailed, "Query rejected: status #{rejection_status}"
elsif !response.query_result
raise Temporal::QueryFailed, 'Invalid response from server'
else
from_query_payloads(response.query_result)
end
end

def describe_workflow_execution(namespace:, workflow_id:, run_id:)
Expand Down Expand Up @@ -534,7 +582,7 @@ def serialize_status_filter(value)

sym = Temporal::Workflow::Status::API_STATUS_MAP.invert[value]
status = Temporal::Api::Enums::V1::WorkflowExecutionStatus.resolve(sym)

Temporal::Api::Filter::V1::StatusFilter.new(status: status)
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/connection/serializer.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/workflow/command'
require 'temporal/workflow/query_result'
require 'temporal/connection/serializer/schedule_activity'
require 'temporal/connection/serializer/start_child_workflow'
require 'temporal/connection/serializer/request_activity_cancellation'
Expand All @@ -10,6 +11,8 @@
require 'temporal/connection/serializer/fail_workflow'
require 'temporal/connection/serializer/signal_external_workflow'
require 'temporal/connection/serializer/upsert_search_attributes'
require 'temporal/connection/serializer/query_answer'
require 'temporal/connection/serializer/query_failure'

module Temporal
module Connection
Expand All @@ -26,6 +29,8 @@ module Serializer
Workflow::Command::FailWorkflow => Serializer::FailWorkflow,
Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow,
Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes,
Workflow::QueryResult::Answer => Serializer::QueryAnswer,
Workflow::QueryResult::Failure => Serializer::QueryFailure,
}.freeze

def self.serialize(object)
Expand Down
19 changes: 19 additions & 0 deletions lib/temporal/connection/serializer/query_answer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
require 'temporal/connection/serializer/base'
require 'temporal/concerns/payloads'

module Temporal
module Connection
module Serializer
class QueryAnswer < Base
include Concerns::Payloads

def to_proto
Temporal::Api::Query::V1::WorkflowQueryResult.new(
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED,
answer: to_query_payloads(object.result)
)
end
end
end
end
end
16 changes: 16 additions & 0 deletions lib/temporal/connection/serializer/query_failure.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require 'temporal/connection/serializer/base'

module Temporal
module Connection
module Serializer
class QueryFailure < Base
def to_proto
Temporal::Api::Query::V1::WorkflowQueryResult.new(
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED,
error_message: object.error.message
)
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/temporal/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ClientVersionNotSupportedFailure < ApiError; end
class FeatureVersionNotSupportedFailure < ApiError; end
class NamespaceAlreadyExistsFailure < ApiError; end
class CancellationAlreadyRequestedFailure < ApiError; end
class QueryFailedFailure < ApiError; end
class QueryFailed < ApiError; end
class UnexpectedResponse < ApiError; end

end
Loading

0 comments on commit 398fd24

Please sign in to comment.