diff --git a/lib/orocos/async/async.rb b/lib/orocos/async/async.rb index 7a921eac..b84b5ae6 100644 --- a/lib/orocos/async/async.rb +++ b/lib/orocos/async/async.rb @@ -30,7 +30,7 @@ # puts state # end # reader.read do |value| -# puts value +# puts value # end # # If a method call needs the remote Orocos Task which is currently not reachable @@ -44,13 +44,13 @@ # short on worker threads. # # # these events are generated by polling -# task.on_connect do +# task.on_connect do # puts "connected" # end -# task.on_disconnect do +# task.on_disconnect do # puts "disconnected" # end -# task.on_reconnect do +# task.on_reconnect do # puts "reconnected" # end # @@ -65,12 +65,22 @@ # port.on_new_data do |data| # puts data # end -# +# # The polling frequency can be changed by setting the period attribute of each # asynchronous object. # module Orocos::Async - KNOWN_ERRORS = [Orocos::ComError,Orocos::NotFound,Typelib::NotFound,Orocos::TypekitTypeNotFound,Orocos::TypekitTypeNotExported,Orocos::StateTransitionFailed,Orocos::ConnectionFailed,OroGen::DefinitionTypekitNotFound] + KNOWN_ERRORS = [ + Orocos::ComError, + Orocos::NotFound, + Typelib::NotFound, + Orocos::TypekitTypeNotFound, + Orocos::TypekitTypeNotExported, + Orocos::StateTransitionFailed, + Orocos::ConnectionFailed, + OroGen::DefinitionTypekitNotFound + ] + class << self extend ::Forwardable diff --git a/lib/orocos/async/attributes.rb b/lib/orocos/async/attributes.rb index 3a115f40..5ed78f9b 100644 --- a/lib/orocos/async/attributes.rb +++ b/lib/orocos/async/attributes.rb @@ -17,25 +17,26 @@ def initialize(async_task,attribute,options=Hash.new) disable_emitting do reachable!(attribute) end - @poll_timer = @event_loop.async_every(method(:raw_read), {:period => period, :start => false, - :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error| + + @poll_timer = @event_loop.async_every( + proc { @delegator_obj.raw_read }, + { period: period, start: false, + known_errors: Orocos::Async::KNOWN_ERRORS } + ) do |data, error| if error @poll_timer.cancel self.period = @poll_timer.period - @event_loop.once do - event :error,error - end - else - if data - if @raw_last_sample != data - @raw_last_sample = data - event :raw_change,data - event :change,Typelib.to_ruby(data) - end + event :error, error + elsif data + if @raw_last_sample != data + @raw_last_sample = data + event :raw_change, data + event :change, Typelib.to_ruby(data) end end end @poll_timer.doc = attribute.full_name + @task.on_unreachable do unreachable! end @@ -92,14 +93,10 @@ def wait(timeout = 5.0) def really_add_listener(listener) super if listener.event == :raw_change - if !@poll_timer.running? - @poll_timer.start(period) - end + @poll_timer.start(period) unless @poll_timer.running? listener.call(@raw_last_sample) if @raw_last_sample && listener.use_last_value? elsif listener.event == :change - if !@poll_timer.running? - @poll_timer.start(period) - end + @poll_timer.start(period) unless @poll_timer.running? listener.call(Typelib.to_ruby(@raw_last_sample)) if @raw_last_sample && listener.use_last_value? end end diff --git a/lib/orocos/async/name_service.rb b/lib/orocos/async/name_service.rb index 0e7b14e8..5b245e94 100644 --- a/lib/orocos/async/name_service.rb +++ b/lib/orocos/async/name_service.rb @@ -36,7 +36,7 @@ class NameServiceBase < ObjectBase # context proxy # @return [void] define_events :task_added - + # @!method on_task_added # # Registers an event callback that will receive task names when the task @@ -50,34 +50,37 @@ class NameServiceBase < ObjectBase self.default_period = 1.0 - def initialize(name_service,options = Hash.new) - @options ||= Kernel.validate_options options,:period => default_period,:start => false,:sync_key => nil,:known_errors => Orocos::Async::KNOWN_ERRORS,:event_loop => Orocos::Async.event_loop - @stored_names ||= Set.new - _,options_async = Kernel.filter_options @options,:event_loop=>nil - super(name_service.name,@options[:event_loop]) - disable_emitting do - reachable! name_service - end - @watchdog_timer = @event_loop.async_every method(:names),options_async do |names| - names.each do |name| - n = @stored_names.add? name - event :task_added,name if n - end - @stored_names.delete_if do |name| - if !names.include?(name) - event :task_removed,name + def initialize(name_service, options = {}) + @options = Kernel.validate_options( + options, { period: default_period, start: false, sync_key: nil, + known_errors: Orocos::Async::KNOWN_ERRORS, + event_loop: Orocos::Async.event_loop } + ) + + @stored_names = Set.new + _, options_async = Kernel.filter_options @options, event_loop: nil + super(name_service.name, @options[:event_loop]) + + disable_emitting { reachable!(name_service) } + @watchdog_timer = + @event_loop.async_every(method(:names), options_async) do |names| + names.each do |name| + event(:task_added, name) if @stored_names.add?(name) + end + + @stored_names.delete_if do |name| + next if names.include?(name) + + event :task_removed, name true - else - false end end - end @watchdog_timer.doc = name - @task_context_proxies = Array.new + @task_context_proxies = [] end def really_add_listener(listener) - if listener.event == :task_added || listener.event == :task_removed + if listener.event == :task_added || listener.event == :task_removed @watchdog_timer.start unless @watchdog_timer.running? if listener.use_last_value? && !@stored_names.empty? @stored_names.each do |name| @@ -132,14 +135,14 @@ class NameService < NameServiceBase define_events :name_service_added, :name_service_removed def initialize(*name_services) - options = if name_services.last.is_a? Hash + options = if name_services.last.kind_of?(Hash) name_services.pop else - Hash.new + {} end - name_services = name_services.map { |ns| ns.to_async } - name_service = Orocos::NameService.new *name_services - super(name_service,options) + + name_services = [Orocos::NameService.new(*name_services)] + super(name_services.first, options) end def clear @@ -147,9 +150,9 @@ def clear orig_clear end - def proxy(name,options = Hash.new) - if(name_services.empty?) - Vizkit.error "Orocos is not initialized!" unless Orocos.initialized? + def proxy(name, options = {}) + if name_services.empty? + Orocos.error "Orocos is not initialized!" unless Orocos.initialized? raise "No name service available." end super @@ -173,18 +176,16 @@ def add_listener(listener) # # Emits the name_service_added event def add(name_service) - name_service = name_service.to_async orig_add(name_service) - event :name_service_added,name_service + event :name_service_added, name_service end # (see Orocos::NameServiceBase#add_front) # # Emits the name_service_added event def add_front(name_service) - name_service = name_service.to_async orig_add_front(name_service) - event :name_service_added,name_service + event :name_service_added, name_service end # (see Orocos::NameServiceBase#remove) @@ -193,195 +194,57 @@ def add_front(name_service) def remove(name_service) removed = false name_services.delete_if do |ns| - if name_service == ns || ns.delegator_obj == ns - true - end + removed = true if name_service == ns end + if removed - event :name_service_removed,name_service + event :name_service_removed, name_service true end end - private - # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop, :known_errors => Orocos::Async::KNOWN_ERRORS do - methods = Orocos::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::NameService.instance_methods + [:method_missing] - def_delegator :add,:alias => :orig_add - def_delegator :add_front,:alias => :orig_add_front - def_delegator :clear,:alias => :orig_clear - def_delegators methods - end - end - - module Local - class NameService < NameServiceBase - extend Utilrb::EventLoop::Forwardable - - def initialize(options = Hash.new) - options,other_options = Kernel.filter_options options, - :tasks => Array.new - - name_service = Orocos::Local::NameService.new options[:tasks] - super(name_service,other_options) - end - - def get(name,options=Hash.new,&block) - async_options,other_options = Kernel.filter_options options, - :sync_key => nil, - :raise => nil, - :event_loop => @event_loop, - :period => nil, - :wait => nil - - if block - p = proc do |task,error| - task = task.to_async(async_options) unless error - if block.arity == 2 - block.call task,error - elsif !error - block.call task - end - end - orig_get name,other_options,&p - else - task = orig_get name,other_options - task.to_async(async_options) - end - end - - private - # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop,:known_errors=> Orocos::Async::KNOWN_ERRORS do - methods = Orocos::Local::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::Local::NameService.instance_methods + [:method_missing] - def_delegators methods - def_delegator :get,:alias => :orig_get - end - end - end - - # Base class for name services that are accessed remotely (e.g. over the - # network) - class RemoteNameService < NameServiceBase - extend Utilrb::EventLoop::Forwardable - - def initialize(name_service,options = Hash.new) - options = Kernel.validate_options options, - :reconnect => true, - :known_errors => Array.new - - @reconnect = options.delete(:reconnect) - options[:known_errors].concat([Orocos::ComError,Orocos::NotFound]) - super(name_service,options) - @namespace = name_service.namespace - end - - # True if this name service should automatically reconnect - # @return [Boolean] - def reconnect?; @reconnect end - - def unreachable!(options = Hash.new) - @watchdog_timer.stop - if !valid_delegator? - raise "This should never happen. There must be always a valid delegator obj" - end - - if reconnect? && options.has_key?(:error) - obj = @delegator_obj - obj.reset - timer = @event_loop.async_every obj.method(:names),:period => 1.0,:sync_key => nil,:known_errors => [Orocos::NotFound,Orocos::ComError] do |names,error| - if error - obj.reset - else - reachable!(obj) - @watchdog_timer.start - timer.stop - end - end - timer.doc = "NameService #{name} reconnect" - else - end - super - end - - def name - @delegator_obj.name - end - - def get(name,options=Hash.new,&block) - async_options,other_options = Kernel.filter_options options, - :sync_key => nil,:raise => nil,:event_loop => @event_loop, - :period => nil,:wait => nil + def get(name, options = {}, &block) + async_options, other_options = Kernel.filter_options( + options, + { + sync_key: nil, raise: nil, event_loop: @event_loop, + period: nil, wait: nil + } + ) if block - p = proc do |task,error| - async_options[:use] = task - atask = if !error - task.to_async(async_options) - end + p = proc do |task, error| + task = task.to_async(async_options) unless error if block.arity == 2 - block.call atask,error + block.call task, error elsif !error - block.call atask + block.call task end - end - orig_get name,other_options,&p + end + orig_get name, other_options, &p else - task = orig_get name,other_options - task.to_async(Hash[:use => task].merge(async_options)) + task = orig_get name, other_options + task.to_async(async_options) end end - private # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop, :known_errors => [Orocos::ComError,Orocos::NotFound], :on_error => :error do - methods = Orocos::NameServiceBase.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::RemoteNameService.instance_methods + [:method_missing] - thread_safe do - def_delegators methods - def_delegator :get,:alias => :orig_get - end - end - - def error(e) - emit_error e if !e.is_a? Orocos::NotFound + forward_to :@delegator_obj,:@event_loop, :known_errors => Orocos::Async::KNOWN_ERRORS do + methods = Orocos::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} + methods -= Orocos::Async::NameService.instance_methods + [:method_missing] + def_delegator :add, alias: :orig_add + def_delegator :add_front, alias: :orig_add_front + def_delegator :clear, alias: :orig_clear + def_delegator :get, alias: :orig_get + def_delegators methods end end module CORBA - class << self - def name_service=(service) - Orocos::Async.name_service.name_services.each_with_index do |i,val| - if val == @delegator_obj - Orocos::Async.name_service.name_services[i] = service - break - end - end - reachable! service - end - def name_service - @name_service ||= NameService.new(Orocos::CORBA.name_service.ip) - end - - def get(name,options =Hash.new) - name_service.get(name,options) - end - - def proxy(name,options = Hash.new) - name_service.proxy(name,options) - end - end - - class NameService < RemoteNameService - extend Utilrb::EventLoop::Forwardable - - def initialize(host = "", reconnect: true) - name_service = Orocos::CORBA::NameService.new(host) - super(name_service, reconnect: reconnect) + class NameService + def self.new(*args) + Orocos::CORBA::NameService.new(*args).to_async end end end end - diff --git a/lib/orocos/async/object_base.rb b/lib/orocos/async/object_base.rb index 117636d2..da7b8b21 100644 --- a/lib/orocos/async/object_base.rb +++ b/lib/orocos/async/object_base.rb @@ -1,18 +1,15 @@ module Orocos::Async - class EventListener attr_reader :event attr_reader :last_args - def initialize(obj,event,use_last_value=true,&block) + def initialize(obj, event, use_last_value = true, &block) + raise ArgumentError, "no object given" unless obj + @block = block - if !obj - raise ArgumentError, "no object given" - end @obj = obj @event = event @use_last_value = use_last_value - @last_args end # returns true if the listener shall be called @@ -39,8 +36,7 @@ def start(use_last_value = @use_last_value) self end - #return true if the listener is listing to - #the event + # Whether the listener is currently started def listening? @obj.listener?(self) end @@ -52,25 +48,32 @@ def call(*args) end end + # Null object for the delegated object in ObjectBase class DelegatorDummy attr_reader :event_loop attr_reader :name + def initialize(parent,name,event_loop) @parent = parent @name = name @event_loop = event_loop end + def respond_to_missing?(m, _include_private = false) + true + end + def method_missing(m,*args,&block) return super if m == :to_ary + error = Orocos::NotFound.new "#{@name} is not reachable while accessing #{m}" error.set_backtrace(Kernel.caller) - if !block + raise error unless block + + @event_loop.defer on_error: @parent.method(:emit_error), + callback: block, + known_errors: Orocos::Async::KNOWN_ERRORS do raise error - else - @event_loop.defer :on_error => @parent.method(:emit_error),:callback => block,:known_errors => Orocos::Async::KNOWN_ERRORS do - raise error - end end end end @@ -156,7 +159,6 @@ def validate_event(name) attr_reader :event_loop attr_reader :options - attr_accessor :emitting attr_accessor :name define_events :error,:reachable,:unreachable @@ -170,40 +172,64 @@ def validate_event(name) # @return [Array] attr_reader :pending_adds - def initialize(name,event_loop) - raise ArgumentError, "no name was given" if !name + def initialize(name, event_loop) + raise ArgumentError, "object name cannot be nil" unless name + @listeners ||= Hash.new{ |hash,key| hash[key] = []} - @proxy_listeners ||= Hash.new{|hash,key| hash[key] = Hash.new} + @proxy_listeners ||= Hash.new{ |hash,key| hash[key] = {} } @name ||= name @event_loop ||= event_loop - @options ||= Hash.new - @emitting = true - @pending_adds = Array.new + @options ||= {} + @pending_adds = [] + invalidate_delegator! on_error do |e| - if e.kind_of?(Orocos::ComError) - unreachable!(:error => e) - end + unreachable!(error: e) if e.kind_of?(Orocos::ComError) + end + end + + def self.main_thread_call(name) + alias_name = "__main_thread_call_#{name}" + return if method_defined?(alias_name) + + alias_method alias_name, name + define_method name do |*args, **kw, &block| + return send(alias_name, *args, **kw, &block) if @event_loop.thread? + + raise "#{name} called outside of event loop thread" end + name + end + + def assert_in_event_loop_thread + @event_loop.validate_thread end def invalidate_delegator! - @delegator_obj = DelegatorDummy.new self,@name,@event_loop + @delegator_obj = DelegatorDummy.new(self, @name, @event_loop) end - # sets @emitting to value for the time the given block is called - def emitting(value,&block) - old,@emitting = @emitting,value + def emitting? + !Thread.current["__#{self}_disable_emitting"] + end + + # Disable event emission while the block is called + # + # This is thread-safe + def emitting(value, &block) + old = emitting? + Thread.current["__#{self}_disable_emitting"] = !value + instance_eval(&block) ensure - @emitting = old + Thread.current["__#{self}_disable_emitting"] = !old end def disable_emitting(&block) - emitting(false,&block) + emitting(false, &block) end - #returns true if the event is known + # Returns true if the event is known def valid_event?(event) self.class.valid_event?(event) end @@ -216,24 +242,24 @@ def event_names self.class.event_names end - def on_event(event,use_last_value=true,&block) + main_thread_call def on_event(event, use_last_value = true, &block) event = validate_event event - EventListener.new(self,event,use_last_value,&block).start + EventListener.new(self, event, use_last_value, &block).start end # returns the number of listener for the given event - def number_of_listeners(event) + main_thread_call def number_of_listeners(event) event = validate_event event @listeners[event].size end - #returns true if the listener is active - def listener?(listener) + # Returns true if the listener is active + main_thread_call def listener?(listener) @listeners[listener.event].include? listener end - #returns the listeners for the given event - def listeners(event) + # Returns the listeners for the given event + main_thread_call def listeners(event) event = validate_event event @listeners[event] end @@ -241,10 +267,11 @@ def listeners(event) # adds a listener to obj and proxies # event like it would be emitted from self # - # if no listener is registererd to event it + # if no listener is registererd to event it # also removes the listener from obj - def proxy_event(obj,*events) + main_thread_call def proxy_event(obj,*events) return if obj == self + events = events.flatten events.each do |e| if existing = @proxy_listeners[obj].delete(e) @@ -257,8 +284,9 @@ def proxy_event(obj,*events) end end - def remove_proxy_event(obj,*events) + main_thread_call def remove_proxy_event(obj,*events) return if obj == self + events = events.flatten if events.empty? remove_proxy_event(obj,@proxy_listeners[obj].keys) @@ -272,17 +300,23 @@ def remove_proxy_event(obj,*events) end end - def add_listener(listener) - event = validate_event listener.event + main_thread_call def add_listener(listener) + validate_event(listener.event) return listener if pending_adds.include? listener + pending_adds << listener + + # We queue the addition so that evenst that are currently queued are not + # given to the new listeners. event_loop.once do expected = pending_adds.shift # 'expected' is nil if the listener has been removed before this # block got processed if expected if expected != listener - raise RuntimeError, "internal error in #{self}#add_listener: pending addition and expected addition mismatch" + raise RuntimeError, + "internal error in #{self}#add_listener: "\ + "pending addition and expected addition mismatch" end really_add_listener(listener) end @@ -290,16 +324,16 @@ def add_listener(listener) listener end - def really_add_listener(listener) + main_thread_call def really_add_listener(listener) if listener.use_last_value? if listener.event == :reachable listener.call if valid_delegator? elsif listener.event == :unreachable - listener.call if !valid_delegator? + listener.call unless valid_delegator? end end @proxy_listeners.each do |obj,listeners| - if l = listeners[listener.event] + if (l = listeners[listener.event]) if listener.use_last_value? && !listener.last_args # replay last value if requested obj.really_add_listener(listener) @@ -308,14 +342,17 @@ def really_add_listener(listener) l.start(false) unless l.listening? end end - @listeners[listener.event] << listener unless @listeners[listener.event].include?(listener) + unless @listeners[listener.event].include?(listener) + @listeners[listener.event] << listener + end listener end - def remove_listener(listener) - if idx = pending_adds.index(listener) + main_thread_call def remove_listener(listener) + if (idx = pending_adds.index(listener)) pending_adds[idx] = nil end + @listeners[listener.event].delete listener # Check whether the only listeners left are proxy listeners. If they @@ -332,10 +369,11 @@ def remove_listener(listener) # calls all listener which are registered for the given event # the next step def event(event_name,*args,&block) + return unless emitting? + validate_event event_name - return unless @emitting @event_loop.once do - process_event event_name,*args,&block + process_event event_name, *args, &block end self end @@ -347,52 +385,58 @@ def wait(timeout = 5.0) @event_loop.wait_for do if timeout && timeout <= Time.now-time Utilrb::EventLoop.cleanup_backtrace do - raise Orocos::NotFound,"#{self.class}: #{respond_to?(:full_name) ? full_name : name} is not reachable after #{timeout} seconds" + name = respond_to?(:full_name) ? full_name : name + raise Orocos::NotFound, + "#{self.class}: #{name} is not reachable "\ + "after #{timeout} seconds" end end - reachable? + valid_delegator? end self end - # TODO CODE BLOCK - def reachable?(&block) - valid_delegator? + main_thread_call def reachable? + if block_given? + yield(valid_delegator?) + else + valid_delegator? + end end - def reachable!(obj,options = Hash.new) + main_thread_call def reachable!(obj, _options = {}) @delegator_obj = obj event :reachable if valid_delegator? end - def unreachable!(options = Hash.new) - if valid_delegator? - invalidate_delegator! - event :unreachable - end + main_thread_call def unreachable!(_options = {}) + return unless valid_delegator? + + invalidate_delegator! + event :unreachable end def valid_delegator? - !@delegator_obj.is_a? DelegatorDummy + !@delegator_obj.kind_of?(DelegatorDummy) end - def remove_all_listeners - !@listeners.each do |event,listeners| - while !listeners.empty? - remove_listener listeners.first - end + main_thread_call def remove_all_listeners + @listeners.dup.each_value do |listeners| + remove_listener(listeners.first) until listeners.empty? end end private + # calls all listener which are registered for the given event - def process_event(event_name,*args,&block) - event = validate_event event_name + main_thread_call def process_event(event_name, *args, &block) + validate_event(event_name) args = block.call if block - #@listeners have to be cloned because it might get modified - #by listener.call + + # @listeners have to be cloned because it might get modified + # by listener.call @listeners[event_name].clone.each do |listener| - listener.call *args + listener.call(*args) end self end diff --git a/lib/orocos/async/orocos.rb b/lib/orocos/async/orocos.rb index 3fc8213f..a92b11aa 100644 --- a/lib/orocos/async/orocos.rb +++ b/lib/orocos/async/orocos.rb @@ -79,12 +79,12 @@ class TaskContext def to_async(options = Hash.new) options[:name] ||= name options[:ior] ||= ior - Orocos::Async::CORBA::TaskContext.new(options) + Orocos::Async::CORBA::TaskContext.new(options.merge(use: self)) end def to_proxy(options = Hash.new) options[:use] ||= to_async - # use name service to check if there is already + # use name service to check if there is already # a proxy for the task Orocos::Async.proxy(name,options) end diff --git a/lib/orocos/async/ports.rb b/lib/orocos/async/ports.rb index cf76ee72..eec575de 100644 --- a/lib/orocos/async/ports.rb +++ b/lib/orocos/async/ports.rb @@ -361,7 +361,7 @@ def add_listener(listener) def remove_listener(listener) super - if number_of_listeners(:data) == 0 && number_of_listeners(:raw_data) == 0 && @global_reader + if number_of_listeners(:data) == 0 && number_of_listeners(:raw_data) == 0 && @global_reader&.valid_delegator? remove_proxy_event(@global_reader) @global_reader.disconnect{} # call it asynchron @global_reader = nil diff --git a/lib/orocos/async/task_context.rb b/lib/orocos/async/task_context.rb index eecb1a38..1580022d 100644 --- a/lib/orocos/async/task_context.rb +++ b/lib/orocos/async/task_context.rb @@ -1,6 +1,8 @@ module Orocos::Async::CORBA class TaskContext < Orocos::Async::TaskContextBase + attr_reader :ior + # A TaskContext # # If not specified the default option settings are: @@ -25,23 +27,19 @@ class TaskContext < Orocos::Async::TaskContextBase # @overload initialize(options) # @overload initialize(task,options) # @option options [#ior,#name] :task a task context. - def initialize(ior,options=Hash.new) - if ior.respond_to?(:ior) - ior = ior.ior - end - ior,options = if ior.is_a? Hash - [nil,ior] - else - [ior,options] - end - ior ||= if options.has_key? :ior - options[:ior] - elsif options.has_key? :use - options[:use].ior - end + def initialize(ior, options = {}) + ior = ior.ior if ior.respond_to?(:ior) + ior, options = + if ior.kind_of?(Hash) + [nil, ior] + else + [ior, options] + end + + ior ||= options[:ior] || options[:use]&.ior name = options[:name] || ior - super(name,options.merge(:ior => ior)) @ior = ior.to_str + super(name, options.merge(ior: ior)) end def really_add_listener(listener) @@ -51,16 +49,8 @@ def really_add_listener(listener) # to prevent different behaviors depending on # the calling order if listener.use_last_value? && listener.event == :state_change - state = @mutex.synchronize do - @delegator_obj.current_state if valid_delegator? - end - event_loop.once{listener.call state} if state - end - end - - def ior - @mutex.synchronize do - @ior.dup if @ior + state = @delegator_obj.current_state if valid_delegator? + event_loop.once{listener.call(state)} if state end end @@ -69,9 +59,7 @@ def ior # @option options [String] name the task name # @option options [String] ior the task IOR def configure_delegation(options = Hash.new) - options = Kernel.validate_options options, - :name=> nil, - :ior => nil + options = Kernel.validate_options options, name: nil, ior: nil ior = options[:ior] @ior,@name = if valid_delegator? @@ -82,9 +70,7 @@ def configure_delegation(options = Hash.new) [ior, @name] end - if !@ior - raise ArgumentError, "no IOR or task has been given" - end + raise ArgumentError, "no IOR or task has been given" unless @ior end def respond_to_missing?(method_name, include_private = false) @@ -105,7 +91,9 @@ def method_missing(m,*args) # Called by #task_context to create the underlying task context object def access_remote_task_context - Orocos::TaskContext.new @ior ,:name => @name + task = Orocos::TaskContext.new(@ior, name: @name) + name = @name || task.name + [task, name] end # add methods which forward the call to the underlying task context diff --git a/lib/orocos/async/task_context_base.rb b/lib/orocos/async/task_context_base.rb index bf0f6af0..abf9cf50 100644 --- a/lib/orocos/async/task_context_base.rb +++ b/lib/orocos/async/task_context_base.rb @@ -4,6 +4,8 @@ class TaskContextBase < Orocos::Async::ObjectBase extend Orocos::Async::ObjectBase::Periodic::ClassMethods include Orocos::Async::ObjectBase::Periodic + attr_reader :name + self.default_period = 1.0 define_events :port_reachable, @@ -22,7 +24,7 @@ def self.to_ruby(task) begin t = Orocos::CORBA.name_service.get(task.basename) raise "Cannot create ruby task for #{task.name} "\ - "because there is already a task #{t.name} "\ + "because is already a task #{t.name} "\ "registered on the main CORBA name service." rescue Orocos::NotFound end @@ -41,7 +43,7 @@ def self.to_ruby(task) prop = task.property(prop) prop.wait p = @ruby_task_context.create_property(prop.name,prop.type) - p.write p.new_sample.zero! + p.write p.new_sample prop.on_change do |data| p.write data end @@ -57,36 +59,77 @@ def self.to_ruby(task) # returned by the method attr_predicate :raise_on_access_error?, true - def initialize(name,options=Hash.new) - event_loop,reachable_options = Kernel.filter_options options,:event_loop => Orocos::Async.event_loop - super(name,event_loop[:event_loop]) - @mutex = Mutex.new + def initialize(name, options = {}) + self_options, reachable_options = + Kernel.filter_options( + options, + { event_loop: Orocos::Async.event_loop, wait: true } + ) + super(name, self_options[:event_loop]) + @last_state = nil - @port_names = Array.new - @property_names = Array.new - @attribute_names = Array.new - - watchdog_proc = Proc.new do - ping # call a method which raises ComError if the connection died - # this is used to disconnect the task by an error handler - [states,port_names,property_names,attribute_names] + @port_names = [] + @property_names = [] + @attribute_names = [] + + @ready = false + ready = [] + resolution_in_progress = {} + @watchdog_timer = + @event_loop.every(default_period, false) do + %I[states port_names property_names attribute_names].each do |type| + next if resolution_in_progress[type] + + send(type) do |result, _| + if result + send("process_#{type}", result) + (ready << type).uniq! + @ready = ready.size == 4 + end + resolution_in_progress[type] = false + end + resolution_in_progress[type] = true + end + end + @watchdog_timer.doc = name + + if reachable_options[:use] + reachable!(reachable_options) + elsif self_options[:wait] + begin + resolved_task_context(access_remote_task_context, + nil, reachable_options) + rescue *Orocos::Async::KNOWN_ERRORS => e # Lint: + resolved_task_context(nil, e, reachable_options) + end + else + @event_loop.defer( + callback: proc { |t, e| resolved_task_context(t, e, reachable_options) }, + known_errors: Orocos::Async::KNOWN_ERRORS + ) do + access_remote_task_context + end end + end - @watchdog_timer = @event_loop.async_every(watchdog_proc,{:period => default_period, - :default => [[],[],[],[]], - :start => false, - :sync_key => nil, #is blocked by the methods call ping, states, etc - :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error| - process_states(data[0]) - process_port_names(data[1]) - process_property_names(data[2]) - process_attribute_names(data[3]) - end - @watchdog_timer.doc = name - reachable!(reachable_options) + def wait(timeout = 5) + super + @event_loop.wait_for(timeout) { @ready } + end + + def resolved_task_context(result, error, reachable_options) + if error + @access_error = error + invalidate_delegator! + raise error if raise_on_access_error? + else + task_context, name = *result + @name = name if name + reachable!(reachable_options.merge(use: task_context)) + end end - def to_async(options=Hash.new) + def to_async(_options = {}) self end @@ -112,7 +155,7 @@ def ruby_task_context? !!@ruby_task_context end - def really_add_listener(listener) + main_thread_call def really_add_listener(listener) return super unless listener.use_last_value? # call new listeners with the current value @@ -120,7 +163,7 @@ def really_add_listener(listener) # the calling order if listener.event == :port_reachable names = @port_names.dup - event_loop.once do + event_loop.once do names.each do |name| listener.call name end @@ -143,12 +186,6 @@ def really_add_listener(listener) super end - def name - @mutex.synchronize do - @name.dup if @name - end - end - # Initiates the binding of the underlying sychronous access object to # this async object. # @@ -167,86 +204,72 @@ def name # @option options [Boolean] raise (false) if set, the #task_context # method will raise if the task context cannot be accessed on first # try. Otherwise, it will try to access it forever until it finds it. - def reachable!(options = Hash.new) - @mutex.synchronize do - options, configure_options = Kernel.filter_options options, - :watchdog => true, - :period => default_period, - :wait => false, - :use => nil, - :raise => false - - self.raise_on_access_error = options[:raise] - - if options[:use] - @delegator_obj = options[:use] - @watchdog_timer.doc = @delegator_obj.name - else - invalidate_delegator! - end - - configure_delegation(configure_options) - - @watchdog_timer.start(options[:period],false) if options[:watchdog] - @event_loop.async(method(:task_context)) + main_thread_call def reachable!(options = {}) + options, configure_options = Kernel.filter_options( + options, + { + watchdog: true, + period: default_period, + wait: false, + use: nil, + raise: false + } + ) + + self.raise_on_access_error = options[:raise] + + if options[:use] + @delegator_obj = options[:use] + @watchdog_timer.doc = @delegator_obj.name + else + invalidate_delegator! end - wait if options[:wait] + + configure_delegation(configure_options) + @watchdog_timer.start(options[:period], false) if options[:watchdog] end # Called by #reachable! to do subclass-specific configuration # # @param [Hash] configure_options all options passed to #reachable! that # are not understood by #reachable! - def configure_delegation(configure_options = Hash.new) - end + main_thread_call def configure_delegation(configure_options = {}); end # Disconnectes self from the remote task context and returns its underlying # object used to communicate with the remote task (designated object). - # + # # Returns nil if the TaskContext is not connected. # Returns an EventLoop Event if not called from the event loop thread. # # @param [Exception] reason The reason for the disconnect # @return [Orocos::TaskContext,nil,Utilrb::EventLoop::Event] - def unreachable!(options = Hash.new) + main_thread_call def unreachable!(options = {}) options = Kernel.validate_options options, :error + # ensure that this is always called from the # event loop thread - @event_loop.call do - old_task = @mutex.synchronize do - if valid_delegator? - @access_error = options.delete(:error) || - ArgumentError.new("cannot access the remote task context for an unknown reason") - task = @delegator_obj - invalidate_delegator! - @watchdog_timer.cancel if @watchdog_timer - task - end - end - clear_interface - event :unreachable if old_task - old_task + if valid_delegator? + @access_error = + options.delete(:error) || + ArgumentError.new("could not access the remote task context "\ + "for an unknown reason") + + old_task = @delegator_obj + invalidate_delegator! + @watchdog_timer.cancel if @watchdog_timer end + + clear_interface + event :unreachable if old_task + old_task end - def clear_interface + main_thread_call def clear_interface process_port_names process_attribute_names process_property_names end - def reachable?(&block) - if block - ping(&block) - else - ping - end - true - rescue Orocos::NotFound,Orocos::ComError => e - unreachable!(:error => e) - false - end - # Helper method to setup async calls # # Asynchronous calls can be called either with a callback or without. In @@ -270,12 +293,12 @@ def reachable?(&block) # # @return [Object] in the synchronous case, the method returns the # underlying method's return value. In the asynchronous case TODO - def call_with_async(method_name,user_callback,to_async_options,*args) - p = proc do |object,error| - async_object = object.to_async(Hash[:use => self].merge(to_async_options)) + def call_with_async(method_name, user_callback, to_async_options, *args) + p = proc do |object, error| + async_object = object&.to_async({ use: self }.merge(to_async_options)) if user_callback if user_callback.arity == 2 - user_callback.call(async_object,error) + user_callback.call(async_object, error) else user_callback.call(async_object) end @@ -284,67 +307,58 @@ def call_with_async(method_name,user_callback,to_async_options,*args) end end if user_callback - send(method_name,*args,&p) + send(method_name, *args, &p) else - async_object = send(method_name,*args) - p.call async_object,nil + async_object = send(method_name, *args) + p.call async_object, nil end end - def attribute(name,options = Hash.new,&block) + main_thread_call def attribute(name,options = Hash.new,&block) call_with_async(:orig_attribute,block,options,name) end - def property(name,options = Hash.new,&block) + main_thread_call def property(name,options = Hash.new,&block) call_with_async(:orig_property,block,options,name) end - def port(name, verify = true,options=Hash.new, &block) + main_thread_call def port(name, verify = true,options=Hash.new, &block) call_with_async(:orig_port,block,options,name,verify) end # call-seq: # task.each_property { |a| ... } => task - # + # # Enumerates the properties that are available on # this task, as instances of Orocos::Attribute - def each_property(&block) - if !block_given? - return enum_for(:each_property) - end - property_names.each do |name| - yield(property(name)) - end + main_thread_call def each_property + return enum_for(__method__) unless block_given? + + property_names.each { |name| yield(property(name)) } self end # call-seq: # task.each_attribute { |a| ... } => task - # + # # Enumerates the attributes that are available on # this task, as instances of Orocos::Attribute - def each_attribute(&block) - if !block_given? - return enum_for(:each_attribute) - end - attribute_names.each do |name| - yield(attribute(name)) - end + def each_attribute + return enum_for(__method__) unless block_given? + + attribute_names.each { |name| yield(attribute(name)) } self end # call-seq: # task.each_port { |p| ... } => task - # + # # Enumerates the ports that are available on this task, as instances of # either Orocos::InputPort or Orocos::OutputPort - def each_port(&block) - if !block_given? - return enum_for(:each_port) - end - port_names.each do |name| - yield(port(name)) - end + def each_port + return enum_for(__method__) unless block_given? + + port_names.each { |name| yield(port(name)) } self end @@ -366,25 +380,28 @@ def each_port(&block) def_delegators methods end + main_thread_call :orig_port + main_thread_call :orig_property + main_thread_call :orig_attribute + # must be called from the event loop thread - def process_states(states=[]) - if !states.empty? - # We don't use #event here. The callbacks, when using #event, - # would be called delayed and therefore task.state would not be - # equal to the state passed to the callback - blocks = listeners :state_change - states.each do |s| - next if @last_state == s - @last_state = s - blocks.each do |b| - b.call(s) - end + main_thread_call def process_states(states = []) + # We don't use #event here. The callbacks, when using #event, would + # be called delayed and therefore task.state would not be equal to + # the state passed to the callback + blocks = listeners :state_change + states.each do |s| + next if @last_state == s + + @last_state = s + blocks.each do |b| + b.call(s) end end end # must be called from the event loop thread - def process_port_names(port_names=[]) + main_thread_call def process_port_names(port_names = []) added_ports = port_names - @port_names deleted_ports = @port_names - port_names deleted_ports.each do |name| @@ -398,7 +415,7 @@ def process_port_names(port_names=[]) end # must be called from the event loop thread - def process_property_names(property_names=[]) + main_thread_call def process_property_names(property_names=[]) added_properties = property_names - @property_names deleted_properties = @property_names - property_names deleted_properties.each do |name| @@ -412,7 +429,7 @@ def process_property_names(property_names=[]) end # must be called from the event loop thread - def process_attribute_names(attribute_names=[]) + main_thread_call def process_attribute_names(attribute_names=[]) added_properties = attribute_names - @attribute_names deleted_properties = @attribute_names - attribute_names deleted_properties.each do |name| @@ -430,39 +447,10 @@ def process_attribute_names(attribute_names=[]) # the @delegator_obj instance variable must not be directly accessed # without proper synchronization. def task_context - @mutex.synchronize do - begin - task = if valid_delegator? - @delegator_obj - elsif @access_error # do not try again - raise @access_error - else - obj = access_remote_task_context - @name = obj.name - @watchdog_timer.doc = @name - port_names = obj.port_names - property_names = obj.property_names - attribute_names = obj.attribute_names - state = obj.state - @event_loop.once do - process_states([state]) - process_port_names(port_names) - process_property_names(property_names) - process_attribute_names(attribute_names) - end - @delegator_obj = obj - event :reachable - obj - end - [task,nil] - rescue Exception => e - @access_error = e - invalidate_delegator! - raise e if raise_on_access_error? # do not be silent if - [nil,@access_error] - end - end + return [@delegator_obj, nil] if valid_delegator? + raise @access_error if @access_error # do not try again + + [nil, Orocos::ComError.new("accessing async task context before it is ready")] end end end - diff --git a/lib/orocos/async/task_context_proxy.rb b/lib/orocos/async/task_context_proxy.rb index d7e3f3f5..b2c89216 100644 --- a/lib/orocos/async/task_context_proxy.rb +++ b/lib/orocos/async/task_context_proxy.rb @@ -20,6 +20,15 @@ def initialize(task_proxy,attribute_name,options=Hash.new) @raw_last_sample = nil end + def to_s + typename = @type&.name || "unknown_t" + "#<#{self.class.name} #{task.name}.#{name}[#{typename}]>" + end + + def invalidate_delegator! + super + end + def task @task_proxy end @@ -34,6 +43,7 @@ def type_name def type raise Orocos::NotFound, "#{self} is not reachable" unless @type + @type end @@ -43,7 +53,7 @@ def type? end def new_sample - type.new + type.zero end def last_sample @@ -59,22 +69,17 @@ def reachable!(attribute,options = Hash.new) remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? @raw_last_sample = attribute.raw_last_sample super(attribute,options) - proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable]) - rescue Orocos::NotFound - unreachable! + proxy_event(@delegator_obj, @delegator_obj.event_names - [:reachable]) end def unreachable!(options=Hash.new) remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? + super(options) end def period - if @options.has_key? :period - @options[:period] - else - nil - end + @options[:period] end def period=(period) @@ -168,7 +173,8 @@ def initialize(task_proxy,port_name,options=Hash.new) end def to_s - "#" + typename = @type&.name || "unknown_t" + "#<#{self.class.name} #{@task_proxy.name}.#{name}[#{typename}]>" end def type_name @@ -181,6 +187,7 @@ def full_name def type raise Orocos::NotFound, "#{self} is not reachable" unless @type + @type end @@ -190,7 +197,7 @@ def type? end def new_sample - type.new + type.zero end def to_async(options=Hash.new) @@ -246,8 +253,6 @@ def reachable!(port,options = Hash.new) elsif number_of_listeners(:data) != 0 raise RuntimeError, "Port #{name} is an input port but callbacks for on_data are registered" end - rescue Orocos::NotFound - unreachable! end def unreachable!(options = Hash.new) @@ -258,6 +263,7 @@ def unreachable!(options = Hash.new) # returns a sub port for the given subfield def sub_port(subfield) raise RuntimeError , "Port #{name} is not an output port" if !output? + SubPortProxy.new(self,subfield) end @@ -384,7 +390,7 @@ def orocos_type_name end def new_sample - type.new + type.zero end def last_sample @@ -460,14 +466,19 @@ class TaskContextProxy < ObjectBase methods -= TaskContextProxy.instance_methods + [:method_missing,:reachable?,:port] def_delegators :@delegator_obj,*methods - def initialize(name,options=Hash.new) - @options,@task_options = Kernel.filter_options options,{:name_service => Orocos::Async.name_service, - :event_loop => Orocos::Async.event_loop, - :reconnect => true, - :retry_period => Orocos::Async::TaskContextBase.default_period, - :use => nil, - :raise => false, - :wait => nil } + def initialize(name, options = {}) + @options, @task_options = Kernel.filter_options( + options, + { + name_service: Orocos::Async.name_service, + event_loop: Orocos::Async.event_loop, + reconnect: true, + retry_period: Orocos::Async::TaskContextBase.default_period, + use: nil, + raise: false, + wait: nil + } + ) @name_service = @options[:name_service] self.namespace,name = split_name(name) @@ -476,76 +487,67 @@ def initialize(name,options=Hash.new) @task_options[:event_loop] = @event_loop @mutex = Mutex.new - @ports = Hash.new - @attributes = Hash.new - @properties = Hash.new - @resolve_timer = @event_loop.async_every(@name_service.method(:get), - {:period => @options[:retry_period],:start => false}, - self.name,@task_options) do |task_context,error| - if error - case error - when Orocos::NotFound, Orocos::ComError - raise error if @options[:raise] - :ignore_error - else - raise error - end - else - @resolve_timer.stop - if !task_context.respond_to?(:event_loop) - raise "TaskProxy is using a name service#{@name_service} which is returning #{task_context.class} but Async::TaskContext was expected." - end - @event_loop.async_with_options(method(:reachable!),{:sync_key => self,:known_errors => Orocos::Async::KNOWN_ERRORS},task_context) do |val,error| - if error - @resolve_timer.start - :ignore_error - end + @ports = {} + @attributes = {} + @properties = {} + + get_in_progress = false + @resolve_timer = @event_loop.every(@options[:retry_period], false) do + unless get_in_progress + get_in_progress = true + @name_service.get(self.name, @task_options) do |task_context, error| + get_in_progress = false + process_resolved_task_context(task_context, error) end end end + @resolve_timer.doc = "#{name} reconnect" - on_port_reachable(false) do |name| - p = @ports[name] - if p && !p.reachable? - error_callback = Proc.new do |error| - p.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_port(p) - end - end + on_port_reachable(false) do |port_name| + object_reachable_handler(@ports, port_name, "port") end - on_property_reachable(false) do |name| - p = @properties[name] - if(p && !p.reachable?) - error_callback = Proc.new do |error| - p.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_property(p) - end - end + on_property_reachable(false) do |property_name| + object_reachable_handler(@properties, property_name, "property") end - on_attribute_reachable(false) do |name| - a = @attributes[name] - if(a && !a.reachable?) - error_callback = Proc.new do |error| - a.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_attribute(a) - end - end + on_attribute_reachable(false) do |attribute_name| + object_reachable_handler(@attributes, attribute_name, "attribute") end - @resolve_timer.doc = "#{name} reconnect" - if @options.has_key?(:use) + if @options[:use] reachable!(@options[:use]) else reconnect(@options[:wait]) end end + def process_resolved_task_context(task_context, error) + if error + case error + when Orocos::NotFound, Orocos::ComError + raise error if @options[:raise] + + return + else + raise error + end + end + + @resolve_timer.stop + unless task_context.respond_to?(:event_loop) + raise "TaskProxy is using a name service#{@name_service} "\ + "which is returning #{task_context.class} but "\ + "Async::TaskContext was expected." + end + + task_context.reachable? do |_, e| + if e + @resolve_timer.start + else + reachable!(task_context) + end + end + end + def name map_to_namespace(@name) end @@ -572,67 +574,63 @@ def reconnect(wait_for_task = false) wait if wait_for_task == true end - def property(name,options = Hash.new) + def disconnect + @resolve_timer.stop + remove_all_listeners + end + + main_thread_call def property(name, options = Hash.new) name = name.to_str - options,other_options = Kernel.filter_options options,:wait => @options[:wait] + options,other_options = Kernel.filter_options options, wait: @options[:wait] wait if options[:wait] - p = @mutex.synchronize do - @properties[name] ||= PropertyProxy.new(self,name,other_options) - end + p = (@properties[name] ||= PropertyProxy.new(self,name,other_options)) if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type other_options.delete(:type) end + if !other_options.empty? && p.options != other_options - Orocos.warn "Property #{p.full_name}: is already initialized with options: #{p.options}" + Orocos.warn "Property #{p.full_name}: is already initialized "\ + "with options: #{p.options}" Orocos.warn "ignoring options: #{other_options}" end - return p if !reachable? || p.reachable? - if options[:wait] - connect_property(p) - p.wait - else - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do - connect_property(p) - end - end + return p if !valid_delegator? || p.valid_delegator? + + connect_property(p) + p.wait if options[:wait] p end - def attribute(name,options = Hash.new) + main_thread_call def attribute(name,options = Hash.new) name = name.to_str options,other_options = Kernel.filter_options options,:wait => @options[:wait] wait if options[:wait] - a = @mutex.synchronize do - @attributes[name] ||= AttributeProxy.new(self,name,other_options) - end + a = (@attributes[name] ||= AttributeProxy.new(self,name,other_options)) if other_options.has_key?(:type) && a.type? && other_options[:type] == a.type other_options.delete(:type) end + if !other_options.empty? && a.options != other_options Orocos.warn "Attribute #{a.full_name}: is already initialized with options: #{a.options}" Orocos.warn "ignoring options: #{other_options}" end - return a if !reachable? || a.reachable? - if options[:wait] - connect_attribute(a) - a.wait - else - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do - connect_attribute(a) - end - end + return a if !valid_delegator? || a.valid_delegator? + + connect_attribute(a) + a.wait if options[:wait] a end - def port(name,options = Hash.new) + main_thread_call def port(name, options = Hash.new, &callback) name = name.to_str - options,other_options = Kernel.filter_options options,:wait => @options[:wait] + options,other_options = Kernel.filter_options( + options, wait: @options[:wait] + ) wait if options[:wait] # support for subports @@ -652,28 +650,22 @@ def port(name,options = Hash.new) nil end - p = @mutex.synchronize do - @ports[name] ||= PortProxy.new(self,name,other_options) - end + p = (@ports[name] ||= PortProxy.new(self,name,other_options)) if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type other_options.delete(:type) end + if !other_options.empty? && p.options != other_options Orocos.warn "Port #{p.full_name}: is already initialized with options: #{p.options}" Orocos.warn "ignoring options: #{other_options}" end - if reachable? && !p.reachable? - if options[:wait] - connect_port(p) - p.wait - else - @event_loop.defer :known_errors => KNOWN_ERRORS do - connect_port(p) - end - end + if valid_delegator? && !p.valid_delegator? + connect_port(p) + p.wait if options[:wait] end + if fields.empty? p else @@ -681,37 +673,28 @@ def port(name,options = Hash.new) end end - def ports(options = Hash.new,&block) - p = proc do |names| - names.map{|name| port(name,options)} - end - if block - port_names(&p) - else - p.call(port_names) - end + def ports + if block_given? + yield(@ports.values) + else + @ports.values + end end - def properties(&block) - p = proc do |names| - names.map{|name| property(name)} - end - if block - property_names(&p) - else - p.call(property_names) - end + def properties + if block_given? + yield(@properties.values) + else + @properties.values + end end - def attributes(&block) - p = proc do |names| - names.map{|name| attribute(name)} - end - if block - attribute_names(&p) - else - p.call(attribute_names) - end + def attributes + if block_given? + yield(@attributes.values) + else + @attributes.values + end end # call-seq: @@ -720,13 +703,7 @@ def attributes(&block) # Enumerates the properties that are available on # this task, as instances of Orocos::Attribute def each_property(&block) - if !block_given? - return enum_for(:each_property) - end - names = property_names - names.each do |name| - yield(property(name)) - end + @properties.each_value(&block) end # call-seq: @@ -735,14 +712,7 @@ def each_property(&block) # Enumerates the attributes that are available on # this task, as instances of Orocos::Attribute def each_attribute(&block) - if !block_given? - return enum_for(:each_attribute) - end - - names = attribute_names - names.each do |name| - yield(attribute(name)) - end + @attributes.each_value(&block) end # call-seq: @@ -751,53 +721,43 @@ def each_attribute(&block) # Enumerates the ports that are available on this task, as instances of # either Orocos::InputPort or Orocos::OutputPort def each_port(&block) - if !block_given? - return enum_for(:each_port) + @ports.each_value(&block) + end + + def reachable!(task_context, options = Hash.new) + if task_context.kind_of?(TaskContextProxy) + raise ArgumentError, + "task_context must not be instance of TaskContextProxy" + elsif !task_context.respond_to?(:event_names) + raise ArgumentError, + "task_context must be an async instance, got #{task_context.class}" end - port_names.each do |name| - yield(port(name)) + @last_task_class ||= task_context.class + if @last_task_class != task_context.class + Orocos.warn "Class mismatch: TaskContextProxy #{name} was recently "\ + "connected to #{@last_task_class} and is now connected "\ + "to #{task_context.class}." + @last_task_class = task_context.class end - self - end - # must be thread safe - def reachable!(task_context,options = Hash.new) - raise ArgumentError, "task_context must not be instance of TaskContextProxy" if task_context.is_a?(TaskContextProxy) - raise ArgumentError, "task_context must be an async instance but is #{task_context.class}" if !task_context.respond_to?(:event_names) - @mutex.synchronize do - @last_task_class ||= task_context.class - if @last_task_class != task_context.class - Vizkit.warn "Class missmatch: TaskContextProxy #{name} was recently connected to #{@last_task_class} and is now connected to #{task_context.class}." - @last_task_class = task_context.class - end + if valid_delegator? + remove_proxy_event(@delegator_obj, @delegator_obj.event_names) + end - remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? - if @delegator_obj_old - remove_proxy_event(@delegator_obj_old,@delegator_obj_old.event_names) - @delegator_obj_old = nil - end - super(task_context,options) + if @delegator_obj_old + remove_proxy_event(@delegator_obj_old, @delegator_obj_old.event_names) + @delegator_obj_old = nil + end - # check if the requested ports are available - @ports.values.each do |port| - unless task_context.port_names.include? port.name - Orocos.warn "task #{name} has currently no port called #{port.name} - on_data will be called when the port was added" - end - end - @attributes.values.each do |attribute| - unless task_context.attribute_names.include? attribute.name - Orocos.warn "task #{name} has currently no attribute called #{attribute.name} - on_change will be called when the attribute was added" - end - end - @properties.values.each do |property| - unless task_context.property_names.include? property.name - Orocos.warn "task #{name} has currently no property called #{property.name} - on_change will be called when the property was added" - end - end + super(task_context, options) + + # this is emitting on_port_reachable, on_property_reachable .... + proxy_event(@delegator_obj, @delegator_obj. + event_names - [:reachable, :unreachable]) - # this is emitting on_port_reachable, on_property_reachable .... - proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable]) + @delegator_obj.on_unreachable do + unreachable! end end @@ -806,139 +766,111 @@ def reachable? super && @delegator_obj.reachable? end rescue Orocos::NotFound => e - unreachable! :error => e,:reconnect => @options[:reconnect] + unreachable! error: e, reconnect: @options[:reconnect] false end - def unreachable!(options = {:reconnect => false}) - Kernel.validate_options options,:reconnect,:error - @mutex.synchronize do - # do not stop proxing events here (see reachable!) - # otherwise unrechable event might get lost - @delegator_obj_old = if valid_delegator? - @delegator_obj - else - @delegator_obj_old - end + def unreachable!(options = {}) + return unless valid_delegator? - disable_emitting do - super(options) + Kernel.validate_options options, :reconnect, :error + @delegator_obj_old = + @mutex.synchronize do + # do not stop proxing events here (see reachable!) + # otherwise unrechable event might get lost + if valid_delegator? + @delegator_obj + else + @delegator_obj_old + end end - end + + super(options) disconnect_ports disconnect_attributes disconnect_properties - re = if options.has_key?(:reconnect) - options[:reconnect] - else - @options[:reconnect] - end - reconnect if re + reconnect if options.fetch(:reconnect, @options[:reconnect]) end private - # blocking call shoud be called from a different thread - # all private methods must be thread safe - def connect_port(port) - return if port.reachable? - p = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - #called in the context of @delegator_obj - begin - port(port.name,true,port.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no port called #{port.name}" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{port.name} -- #{e}" - raise - end - end + + def connect_async_object(object, callback) + delegator_obj = @delegator_obj + return unless delegator_obj + + finalizer = lambda do |resolved, _error| + object.reachable!(resolved) if resolved && !object.valid_delegator? + callback&.call(object) end - @event_loop.call do - port.reachable!(p) unless port.reachable? + + # called in the context of @delegator_obj + begin + yield(delegator_obj, finalizer) # this is yield(@delegator_obj) + rescue Orocos::NotFound + Orocos.warn "task #{name} has currently no port called #{port.name}" + raise + rescue Orocos::CORBA::ComError => e + Orocos.warn "task #{name} with error on port: #{port.name} -- #{e}" + raise end end - def disconnect_ports - ports = @mutex.synchronize do - @ports.values + # blocking call shoud be called from a different thread + # all private methods must be thread safe + main_thread_call def connect_port(port, &callback) + connect_async_object(port, callback) do |delegator_obj, finalizer| + delegator_obj.port(port.name, true, port.options, &finalizer) end + end + + main_thread_call def disconnect_ports + ports = @mutex.synchronize { @ports.values.dup } ports.each(&:unreachable!) end # blocking call shoud be called from a different thread - def connect_attribute(attribute) - return if attribute.reachable? - a = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - #called in the context of @delegator_obj - begin - attribute(attribute.name,attribute.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no attribtue called #{attribute.name} -> on_change will not be called!" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{attribute.name} -- #{e}" - raise - end - end - end - @event_loop.call do - attribute.reachable!(a) unless attribute.reachable? + main_thread_call def connect_attribute(attribute, &callback) + connect_async_object(attribute, callback) do |delegator_obj, finalizer| + delegator_obj.attribute(attribute.name, attribute.options, &finalizer) end end - def disconnect_attributes - attributes = @mutex.synchronize do - @attributes.values - end + main_thread_call def disconnect_attributes + attributes = @mutex.synchronize { @attributes.values.dup } attributes.each(&:unreachable!) end # blocking call shoud be called from a different thread - def connect_property(property) - return if property.reachable? - p = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - begin - property(property.name,property.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no property called #{property.name} -> on_change will not be called!" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{property.name} -- #{e}" - raise - end - end - end - @event_loop.call do - property.reachable!(p) unless property.reachable? + main_thread_call def connect_property(property, &callback) + connect_async_object(property, callback) do |delegator_obj, finalizer| + delegator_obj.property(property.name, property.options, &finalizer) end end - def disconnect_properties - properties = @mutex.synchronize do - @properties.values - end + main_thread_call def disconnect_properties + @mutex.synchronize { pp @properties.keys } + properties = @mutex.synchronize { @properties.values.dup } properties.each(&:unreachable!) end def respond_to_missing?(method_name, include_private = false) - (reachable? && @delegator_obj.respond_to?(method_name)) || super + @delegator_obj.respond_to?(method_name) || super end - def method_missing(m,*args) - if respond_to_missing?(m) - event_loop.sync(@delegator_obj,args) do |args| - @delegator_obj.method(m).call(*args) - end - else - super + def method_missing(m, *args) + return super unless respond_to_missing?(m) + + event_loop.sync(@delegator_obj, args) do |_args| + @delegator_obj.method(m).call(*args) end end + + def object_reachable_handler(map, name, type) + obj = map[name] + return unless valid_delegator? + return if !obj || obj.valid_delegator? + + send("connect_#{type}", obj) + end end end diff --git a/lib/orocos/base.rb b/lib/orocos/base.rb index 6bba6381..8507c43e 100644 --- a/lib/orocos/base.rb +++ b/lib/orocos/base.rb @@ -271,10 +271,11 @@ def self.initialize(name = "orocosrb_#{::Process.pid}") unless disable_sigchld_handler? trap("SIGCHLD") do begin - while dead = ::Process.wait(-1, ::Process::WNOHANG) - if mod = Orocos::Process.from_pid(dead) - mod.dead!($?) - end + loop do + dead_pid, dead_status = ::Process.wait2(-1, ::Process::WNOHANG) + break unless dead_pid + + Orocos::Process.from_pid(dead_pid)&.dead!(dead_status) end rescue Errno::ECHILD end @@ -297,7 +298,7 @@ def self.initialize(name = "orocosrb_#{::Process.pid}") self.name_service << Orocos::ROS.name_service end if defined?(Orocos::Async) - Orocos.name_service.name_services.each do |ns| + Orocos.name_service.each do |ns| Orocos::Async.name_service.add(ns) end end diff --git a/lib/orocos/configurations.rb b/lib/orocos/configurations.rb index f304c14d..1acf8379 100644 --- a/lib/orocos/configurations.rb +++ b/lib/orocos/configurations.rb @@ -760,19 +760,17 @@ def conf_as_ruby(names, override: false) # # @see conf conf_to_ruby def conf_as_typelib(names, override: false) - c = conf(names, override) - return if !c + return unless (c = conf(names, override)) - result = Hash.new - c.each do |property_name, ruby_value| + c.each_with_object({}) do |(property_name, ruby_value), result| orocos_type = model.find_property(property_name).type typelib_type = loader.typelib_type_for(orocos_type) - typelib_value = typelib_type.new - typelib_value.zero! - result[property_name] = TaskConfigurations.apply_conf_on_typelib_value(typelib_value, ruby_value) + typelib_value = typelib_type.zero + result[property_name] = TaskConfigurations.apply_conf_on_typelib_value( + typelib_value, ruby_value + ) end - result end # Applies the specified configuration to the given task diff --git a/lib/orocos/corba.rb b/lib/orocos/corba.rb index 5856eac5..2c34a42e 100644 --- a/lib/orocos/corba.rb +++ b/lib/orocos/corba.rb @@ -117,9 +117,9 @@ def self.refine_exceptions(obj0, obj1 = nil) # :nodoc: rescue ComError => e if !obj1 - raise ComError, "Communication failed with corba #{obj0}", e.backtrace + raise ComError, "Communication failed with corba #{obj0} - #{e}", e.backtrace else - raise ComError, "communication failed with either #{obj0} or #{obj1}", e.backtrace + raise ComError, "communication failed with either #{obj0} or #{obj1} - #{e}", e.backtrace end end end diff --git a/lib/orocos/log/task_context.rb b/lib/orocos/log/task_context.rb index 437a3c8f..80aaa9c7 100644 --- a/lib/orocos/log/task_context.rb +++ b/lib/orocos/log/task_context.rb @@ -500,7 +500,7 @@ def disconnect_all #Returns a new sample object def new_sample - @type.new + @type.zero end #Clears all reader buffers @@ -596,7 +596,7 @@ def notify(&block) end def new_sample - type.new + type.zero end def orocos_type_name diff --git a/lib/orocos/name_service.rb b/lib/orocos/name_service.rb index 88c9dbbd..1cba950a 100644 --- a/lib/orocos/name_service.rb +++ b/lib/orocos/name_service.rb @@ -62,8 +62,8 @@ def self.cleanup end # (see NameService#get) - def self.get(name, options = Hash.new) - Orocos.name_service.get(name, options) + def self.get(name, **options) + Orocos.name_service.get(name, **options) end # Base class for all Orocos name services. An orocos name service is used @@ -310,11 +310,11 @@ def initialized? end #(see NameServiceBase#get) - def get(name,options = Hash.new) + def get(name, **options) name_services.each do |service| begin if service.same_namespace?(name) - task_context = service.get(name,options) + task_context = service.get(name, **options) return task_context if task_context end rescue Orocos::NotFound @@ -416,7 +416,7 @@ def name # Returns an Async object that maps to this name service def to_async(options = Hash.new) - Orocos::Async::Local::NameService.new(:tasks => registered_tasks) + Orocos::Async::NameService.new(self.class.new(registered_tasks)) end #(see NameServiceBase#get) @@ -575,7 +575,7 @@ def bind(task, name) # @param (see Orocos::Async::CORBA::NameService#initialize) # @return [Orocos::Async::CORBA::NameService] def to_async(reconnect: true) - Orocos::Async::CORBA::NameService.new(ip, reconnect: reconnect) + Orocos::Async::NameService.new(self.class.new(ip)) end # Resets the CORBA name service client. diff --git a/lib/orocos/output_reader.rb b/lib/orocos/output_reader.rb index 1296e5dc..f7429de6 100644 --- a/lib/orocos/output_reader.rb +++ b/lib/orocos/output_reader.rb @@ -7,6 +7,18 @@ class OutputReader < RubyTasks::LocalInputPort # The policy of the connection attr_accessor :policy + # Helper method for #read and #read_new + # + # This is overloaded in OutputReader to raise CORBA::ComError if the + # process supporting the remote task is known to be dead + def raw_read_with_result(sample, copy_old_data) + if (process = port.task.process) + disconnect_all unless process.alive? + end + + super + end + # Reads a sample on the associated output port. Returns a value as soon # as a sample has ever been written to the port since the data reader # has been created diff --git a/lib/orocos/ports_base.rb b/lib/orocos/ports_base.rb index 15cc0907..9c77db57 100644 --- a/lib/orocos/ports_base.rb +++ b/lib/orocos/ports_base.rb @@ -61,16 +61,16 @@ def ==(other) other.task == self.task && other.name == self.name end - def ensure_type_available(options = Hash.new) + def ensure_type_available(**options) if !type || type.null? - @type = Orocos.find_type_by_orocos_type_name(orocos_type_name, options) + @type = Orocos.find_type_by_orocos_type_name(orocos_type_name, **options) end end # Returns a new object of this port's type def new_sample ensure_type_available - @type.new + @type.zero end def log_metadata @@ -168,7 +168,7 @@ def writer(distance: PortBase::D_UNKNOWN, **policy) self.class.transient_local_port_name(full_name), orocos_type_name, permanent: false, - class: self.class.writer_class) + class: self.class.writer_class) end writer.port = self writer.policy = policy diff --git a/lib/orocos/process.rb b/lib/orocos/process.rb index 0127abfa..c71840a9 100644 --- a/lib/orocos/process.rb +++ b/lib/orocos/process.rb @@ -1,8 +1,15 @@ require 'utilrb/pkgconfig' require 'orogen' require 'fcntl' +require 'json' module Orocos + # Exception raised when there is no IOR registered for a given task name. + class IORNotRegisteredError < Orocos::NotFound; end + + # Exception raised when the received IOR message is invalid. + class InvalidIORMessage < Orocos::NotFound; end + # The working directory that should be used by default in Orocos.run def self.default_working_directory @default_working_directory || Dir.pwd @@ -162,6 +169,7 @@ def orogen; model end # The set of task contexts for this process. This is valid only after # the process is actually started attr_reader :tasks + attr_reader :ior_mappings def initialize(name, model, name_mappings: Hash.new) @name, @model = name, model @@ -169,6 +177,9 @@ def initialize(name, model, name_mappings: Hash.new) self.name_mappings = name_mappings @logged_ports = Set.new @tasks = [] + @ior_mappings = nil + @ior_message = "" + @ior_read_fd = nil end # Sets a batch of name mappings @@ -198,8 +209,9 @@ def get_mapped_name(name) # # See also #each_task def task_names - if !model - raise Orocos::NotOrogenComponent, "#{name} does not seem to have been generated by orogen" + unless model + raise Orocos::NotOrogenComponent, + "#{name} does not seem to have been generated by orogen" end model.task_activities.map do |deployed_task| name = deployed_task.name @@ -220,19 +232,19 @@ def each_task # Returns the TaskContext instance for a task that runs in this process, # or raises Orocos::NotFound. - def task(task_name, name_service = Orocos.name_service) + def task(task_name) full_name = "#{name}_#{task_name}" - if result = tasks.find { |t| t.basename == task_name || t.basename == full_name } + if (result = tasks.find { |t| [task_name, full_name].include?(t.basename) }) return result end - if task_names.include?(task_name) - name_service.get task_name, process: self - elsif task_names.include?(full_name) - name_service.get full_name, process: self - else - raise Orocos::NotFound, "no task #{task_name} defined on #{name}" + ior = ior_for(task_name) || ior_for(full_name) + unless ior + raise Orocos::IORNotRegisteredError, + "no IOR is registered for #{task_name}" end + + Orocos::TaskContext.new(ior, process: self) end def register_task(task) @@ -240,6 +252,10 @@ def register_task(task) @tasks << task end + def ior_for(task_name) + @ior_mappings&.fetch(task_name, nil) + end + # Requires all known ports of +self+ to be logged by the default logger def log_all_ports(options = Hash.new) @logged_ports |= Orocos.log_all_process_ports(self, options) @@ -334,6 +350,90 @@ def self.resolve_prefix(model, prefix) end return name_mappings end + + # Read the IOR pipe and parse the received message, closing the read file + # descriptor when end of file is reached. + # + # @return [nil, Hash] when eof is reached and the message is valid + # return a { task name => ior } hash. If the process dies or a IO::WaitReadable + # is raised, returns nil. + def resolve_running_tasks + return unless alive? + + begin + loop do + @ior_message += @ior_read_fd.read_nonblock(4096) + end + rescue IO::WaitReadable + return + rescue EOFError + @ior_read_fd.close + load_and_validate_ior_message(@ior_message) + end + end + + # Waits the running tasks resolution for a given amount of time. + # + # @param [nil, Boolean, Float] timeout when nil, this method blocks until all the + # running tasks are resolved or the process crashes. When given a number, it + # block for the given amount of time in milliseconds. When given a boolean, false + # is equivalent as passing 0 as argument, and true is equivalent to passing nil. + # @return [Hash] mappings of { task name => IOR } + # @raise Orocos::NotFound if the process dies during execution + # @raise Orocos::InvalidIORMessage if the message received is invalid + def wait_running(timeout = nil, &block) + return block.call if block_given? + return @ior_mappings if @ior_mappings + + start_time = Time.now + timeout = transform_timeout(timeout) + deadline = start_time + timeout unless timeout == Float::INFINITY + got_alive = alive? + loop do + @ior_mappings = resolve_running_tasks + return @ior_mappings if @ior_mappings + break if timeout < Time.now - start_time + + if got_alive && !alive? + raise Orocos::NotFound, "#{name} was started but crashed" + end + + time_until_deadline = [deadline - Time.now, 0].max if deadline + IO.select([@ior_read_fd], nil, nil, time_until_deadline) + end + + raise Orocos::NotFound, "cannot get a running #{name} module" unless alive? + end + + def transform_timeout(timeout) + return timeout if timeout.kind_of?(Numeric) + + return Float::INFINITY if timeout.nil? || timeout == true + + 0 + end + + # Load and validate the ior message read from the IOR pipe. + # + # @param [String] message the ior message read from the pipe + # @return [Hash, nil] the parsed ior message as a + # { task name => ior} hash, or nil if the message could not be parsed. + # @raise Orocos::InvalidIORMessage raised if any task name present in the message + # is not present in the process' task names. + def load_and_validate_ior_message(message) + begin + message = JSON.parse(message) + rescue JSON::ParserError + return + end + + all_included = message.keys.all? { |name| task_names.include?(name) } + return message if all_included + + raise Orocos::InvalidIORMessage, + "the following tasks were present on the ior message but werent in "\ + "the process task names: #{message.keys - task_names}" + end end # The representation of an Orocos process. It manages @@ -349,11 +449,11 @@ class Process < ProcessBase # # @param [Integer] pid the PID whose process we are looking for # @return [nil,Process] the process object whose PID matches, or nil - def self.from_pid(pid) - if result = registered_processes[pid] + def self.from_pid(pid) + if result = registered_processes[pid] return result end - end + end class << self # A map of existing running processes @@ -443,12 +543,11 @@ def initialize(name, model = name, def join return unless alive? - begin - ::Process.waitpid(pid) - exit_status = $? + begin + _, exit_status = ::Process.waitpid2(pid) dead!(exit_status) - rescue Errno::ECHILD - end + rescue Errno::ECHILD + end end # True if the process is running @@ -457,7 +556,7 @@ def alive?; !!@pid end def running?; alive? end # Called externally to announce a component dead. - def dead!(exit_status) # :nodoc: + def dead!(exit_status) # :nodoc: exit_status = (@exit_status ||= exit_status) if !exit_status Orocos.info "deployment #{name} exited, exit status unknown" @@ -480,7 +579,8 @@ def dead!(exit_status) # :nodoc: Orocos.warn "deployment #{name} terminated with code #{exit_status.to_i}" end - pid, @pid = @pid, nil + pid = @pid + @pid = nil Process.deregister(pid) # Force unregistering the task contexts from CORBA naming @@ -489,9 +589,9 @@ def dead!(exit_status) # :nodoc: # puts "deregistering #{name}" # Orocos::CORBA.unregister(name) # end - end + end - @@logfile_indexes = Hash.new + @@logfile_indexes = Hash.new class TaskNameRequired < ArgumentError; end @@ -880,12 +980,13 @@ def self.allocate_gdb_port # Massages various spawn parameters into the actual deployment command line # # @return [CommandLine] - def command_line(working_directory: Orocos.default_working_directory, - log_level: nil, - cmdline_args: Hash.new, - tracing: Orocos.tracing?, - gdb: nil, valgrind: nil, - name_service_ip: Orocos::CORBA.name_service_ip) + def command_line( + working_directory: Orocos.default_working_directory, + log_level: nil, + cmdline_args: Hash.new, + tracing: Orocos.tracing?, gdb: nil, valgrind: nil, + name_service_ip: Orocos::CORBA.name_service_ip + ) result = CommandLine.new(Hash.new, nil, [], working_directory) result.command = binfile @@ -974,13 +1075,16 @@ def command_line(working_directory: Orocos.default_working_directory, # true will enable valgrind support. Setting it to an array of strings will # specify a list of arguments that should be passed to valgrind # itself. This is obviously incompatible with the gdb option. - def spawn(log_level: nil, working_directory: Orocos.default_working_directory, - cmdline_args: Hash.new, - oro_logfile: "orocos.%m-%p.txt", - prefix: nil, tracing: Orocos.tracing?, name_service: Orocos::CORBA.name_service, - wait: nil, - output: nil, - gdb: nil, valgrind: nil) + def spawn( + log_level: nil, working_directory: Orocos.default_working_directory, + cmdline_args: Hash.new, + oro_logfile: "orocos.%m-%p.txt", + prefix: nil, tracing: Orocos.tracing?, + wait: nil, + output: nil, + gdb: nil, valgrind: nil, + name_service: Orocos::CORBA.name_service + ) raise "#{name} is already running" if alive? Orocos.info "starting deployment #{name}" @@ -990,14 +1094,6 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, name_mappings = prefix_mappings.merge(self.name_mappings) self.name_mappings = name_mappings - # If possible, check that we won't clash with an already running - # process - task_names.each do |name| - if name_service.task_reachable?(name) - raise ArgumentError, "there is already a running task called #{name}, are you starting the same component twice ?" - end - end - if wait.nil? wait = if valgrind then 600 @@ -1047,8 +1143,13 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, end end + @ior_read_fd, ior_write_fd = IO.pipe read, write = IO.pipe @pid = fork do + @ior_read_fd.close + # Pass write file descriptor for the IOR pipe as a commandline argument + cmdline_args["ior-write-fd"] = ior_write_fd.fileno + if tracing ENV['LD_PRELOAD'] = Orocos.tracing_library_path end @@ -1114,19 +1215,17 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, end read.close - write.fcntl(Fcntl::F_SETFD, 1) ::Process.setpgrp begin - if working_directory - Dir.chdir(working_directory) - end - exec(*cmdline) + exec(*cmdline, ior_write_fd => ior_write_fd, chdir: working_directory) rescue Exception write.write("FAILED") end end Process.register(self) + ior_write_fd.close + write.close if read.read == "FAILED" raise "cannot start #{name}" @@ -1142,93 +1241,31 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, elsif wait Float::INFINITY end - wait_running(timeout, name_service) - end - end - - def self.resolve_all_tasks(process, cache = Hash.new) - # Get any task name from that specific deployment, and check we - # can access it. If there is none - all_reachable = process.task_names.all? do |task_name| - begin - cache[task_name] ||= yield(task_name) - rescue Orocos::NotFound - end - end - if all_reachable - cache + wait_running(timeout) end end - # Wait for a process to become reachable + # Resolve all the tasks present on the process, creating a new Orocos::TaskContext + # if the task is not deployed yet. # - def self.wait_running(process, timeout = nil, name_service = Orocos::CORBA.name_service, &block) - if timeout == 0 - return unless process.alive? - - # Use custom block to check if the process is reachable - if block_given? - block.call(process) - else - # Get any task name from that specific deployment, and check we - # can access it. If there is none - all_reachable = process.task_names.all? do |task_name| - if name_service.task_reachable?(task_name) - Orocos.debug "#{task_name} is reachable" - true - else - Orocos.debug "could not access #{task_name}, #{name} is not running yet ..." - false - end - end - if all_reachable - Orocos.info "all tasks of #{process.name} are reachable, assuming it is up and running" - end - all_reachable - end - else - start_time = Time.now - got_alive = process.alive? - loop do - break if wait_running(process, 0, name_service, &block) - break unless timeout - # This formulation allows timeout to be infinite - break if timeout < Time.now - start_time - - if got_alive && !process.alive? - raise Orocos::NotFound, "#{process.name} was started but crashed" - end - - sleep 0.1 - end - - unless process.alive? - raise Orocos::NotFound, "cannot get a running #{process.name} module" - end - - true + # @param [Orocos::Process] process the process object + # @return [Hash] hash with + # { task name => task context objet } + # @raise Orocos::IORNotRegisteredError when an IOR is not registered for the given + # task name. + # @raise Orocos::NotFound if the process dies during execution + # @raise Orocos::InvalidIORMessage if the message received is invalid + def self.resolve_all_tasks(process) + process.task_names.each_with_object({}) do |task_name, resolved_tasks| + resolved_tasks[task_name] = process.task(task_name) end end - def resolve_all_tasks(cache = Hash.new, name_service: Orocos::CORBA.name_service) - Process.resolve_all_tasks(self, cache) do |task_name| - name_service.get(task_name) - end + # See #Orocos::Process.resolve_all_tasks + def resolve_all_tasks + Orocos::Process.resolve_all_tasks(self) end - # Wait for the module to be started. If timeout is 0, the function - # returns immediately, with a false return value if the module is not - # started yet and a true return value if it is started. - # - # Otherwise, it waits for the process to start for the specified amount - # of seconds. It will throw Orocos::NotFound if the process was not - # started within that time. - # - # If timeout is nil, the method will wait indefinitely - def wait_running(timeout = nil, name_service = Orocos::CORBA.name_service) - Process.wait_running(self, timeout, name_service) - end - SIGNAL_NUMBERS = { 'SIGABRT' => 1, 'SIGINT' => 2, @@ -1278,7 +1315,7 @@ def kill(wait = true, signal = nil, cleanup: !signal, hard: false) if cleanup clean_shutdown = true begin - each_task do |task| + tasks.each do |task| if !self.class.try_task_cleanup(task) clean_shutdown = false break diff --git a/lib/orocos/ros/async.rb b/lib/orocos/ros/async.rb index 2e38affe..adf4ad3c 100644 --- a/lib/orocos/ros/async.rb +++ b/lib/orocos/ros/async.rb @@ -38,7 +38,8 @@ def configure_delegation(options) end def access_remote_task_context - Orocos::ROS::Node.new(@name_service, @server, @name) + node = Orocos::ROS::Node.new(@name_service, @server, @name) + [node, name] end # add methods which forward the call to the underlying task context diff --git a/lib/orocos/ruby_tasks/process.rb b/lib/orocos/ruby_tasks/process.rb index 090d84e5..6aefd59a 100644 --- a/lib/orocos/ruby_tasks/process.rb +++ b/lib/orocos/ruby_tasks/process.rb @@ -53,6 +53,9 @@ def pid; ::Process.pid end # @return [Class] attr_reader :task_context_class + # The ior mappings of the deployed tasks + attr_reader :ior_mappings + # Creates a new ruby task process # # @param [nil,#dead_deployment] ruby_process_server the process manager @@ -64,6 +67,7 @@ def initialize(ruby_process_server, name, model, task_context_class: TaskContext @ruby_process_server = ruby_process_server @deployed_tasks = Hash.new @task_context_class = task_context_class + @ior_mappings = nil super(name, model) end @@ -74,32 +78,38 @@ def spawn(options = Hash.new) model.task_activities.each do |deployed_task| name = get_mapped_name(deployed_task.name) Orocos.allow_blocking_calls do - deployed_tasks[name] = task_context_class. - from_orogen_model(name, deployed_task.task_model) + deployed_tasks[name] = + task_context_class.from_orogen_model(name, + deployed_task.task_model) end end @alive = true end - # Waits for the tasks to be ready - # - # This is a no-op for ruby tasks as they are ready as soon as they are - # created - def wait_running(blocking = false) - true + # The ruby tasks are already ready, so all this is does is to get the IOR mappings + # from them. The wait_running method name is maintained to keep the API closer to + # the remote process'. + def wait_running + (@ior_mappings = deployed_tasks.transform_values(&:ior)) unless @ior_mappings + @ior_mappings end def task(task_name) - if t = deployed_tasks[task_name] + if (t = deployed_tasks[task_name]) t - else raise ArgumentError, "#{self} has no task called #{task_name}, known tasks: #{deployed_tasks.keys.sort.join(", ")}" + else + raise ArgumentError, + "#{self} has no task called #{task_name}, known tasks: "\ + "#{deployed_tasks.keys.sort.join(', ')}" end end - def resolve_all_tasks(cache = Hash.new) - Orocos::Process.resolve_all_tasks(self, cache) do |task_name| - task(task_name) - end + def resolve_all_tasks + deployed_tasks + end + + def define_ior_mappings(ior_mappings) + @ior_mappings = ior_mappings end def kill(_wait = true, status = ProcessManager::Status.new(exit_code: 0), **) diff --git a/lib/orocos/ruby_tasks/process_manager.rb b/lib/orocos/ruby_tasks/process_manager.rb index 2a05b988..65db1601 100644 --- a/lib/orocos/ruby_tasks/process_manager.rb +++ b/lib/orocos/ruby_tasks/process_manager.rb @@ -85,6 +85,19 @@ def wait_termination(timeout = nil) result end + def wait_running(*process_names) + process_ior_mappings = {} + process_names.each do |name| + if deployments[name]&.resolve_all_tasks + process_ior_mappings[name] = { iors: deployments[name].wait_running } + else + process_ior_mappings[name] = + { error: "#{name} is not a valid process in the deployment" } + end + end + process_ior_mappings + end + # Requests to stop the given deployment # # The call does not block until the process has quit. You will have to diff --git a/lib/orocos/task_context.rb b/lib/orocos/task_context.rb index 49a704f2..fceb46d8 100644 --- a/lib/orocos/task_context.rb +++ b/lib/orocos/task_context.rb @@ -115,12 +115,6 @@ def #{m}(wait_for_completion = true, polling = 0.05) EOD end - # The logger task that should be used to log data that concerns this - # task - # - # @return [#log] - attr_accessor :logger - # A new TaskContext instance representing the # remote task context with the given IOR # @@ -135,10 +129,6 @@ def #{m}(wait_for_completion = true, polling = 0.05) def initialize(ior, name: do_real_name, model: nil, **other_options) super(name, model: model, **other_options) @ior = ior - - if process && (process.default_logger_name != name) - self.logger = process.default_logger - end end def ping @@ -230,26 +220,6 @@ def rtt_state @state_symbols[value] end - # Connects all ports of the task with the logger of the deployment - # @param [Hash] options option hash to exclude specific ports - # @option options [String,Array] :exclude_ports The name of the excluded ports - # @return [Set] Sets of task and port names - # - # @example logging all ports beside a port called frame - # task.log_all_ports(:exclude_ports => "frame") - def log_all_ports(options = Hash.new) - # Right now, the only allowed option is :exclude_ports - options, logger_options = Kernel.filter_options options,:exclude_ports => nil - exclude_ports = Array(options[:exclude_ports]) - - logger_options[:tasks] = Regexp.new(basename) - ports = Orocos.log_all_process_ports(process,logger_options) do |port| - !exclude_ports.include? port.name - end - raise "#{name}: no ports were selected for logging" if ports.empty? - ports - end - def create_property_log_stream(p) stream_name = "#{self.name}.#{p.name}" if !configuration_log.has_stream?(stream_name) diff --git a/lib/orocos/task_context_base.rb b/lib/orocos/task_context_base.rb index bb5fa769..913958cb 100644 --- a/lib/orocos/task_context_base.rb +++ b/lib/orocos/task_context_base.rb @@ -51,9 +51,9 @@ def log_metadata ] end - def ensure_type_available(options = Hash.new) + def ensure_type_available(**options) if !type || type.null? - @type = Orocos.find_type_by_orocos_type_name(@orocos_type_name, options) + @type = Orocos.find_type_by_orocos_type_name(@orocos_type_name, **options) end end @@ -94,7 +94,7 @@ def log_value(value, timestamp = Time.now) def new_sample ensure_type_available - type.new + type.zero end def pretty_print(pp) # :nodoc: @@ -224,7 +224,7 @@ def self.get(options, process = nil) raise ArgumentError, 'no task name' if options.nil? name = options.to_str end - result = Orocos.name_service.get(name,{:process => process}) + result = Orocos.name_service.get(name, process: process) end # Find one running tasks from the provided names. Raises if there is not diff --git a/lib/orocos/test.rb b/lib/orocos/test.rb index ebb5f8c3..801beeb9 100644 --- a/lib/orocos/test.rb +++ b/lib/orocos/test.rb @@ -189,8 +189,18 @@ def assert_state_equals(state, task, timeout = 1) flunk("#{task} was expected to be in state #{state} but is in #{task.state}") end - def wait_for(timeout = 5, &block) + def wait_for(timeout = 5, msg = nil, &block) Orocos::Async.wait_for(0.005, timeout, &block) + rescue Utilrb::EventLoop::WaitForTimeout + raise unless msg + + msg = msg.call if msg.respond_to?(:call) + raise Utilrb::EventLoop::WaitForTimeout, "timed out waiting for #{msg}" + end + + def wait_for_equality(expected, value, timeout: 5) + msg = proc { "#{value} to equal #{expected}" } + wait_for(timeout, msg) { expected == value } end def name_service diff --git a/test/async/test_attributes.rb b/test/async/test_attributes.rb index 58676d82..77245128 100644 --- a/test/async/test_attributes.rb +++ b/test/async/test_attributes.rb @@ -1,49 +1,38 @@ -require 'orocos/test' -require 'orocos/async' +require "orocos/test" +require "orocos/async" describe Orocos::Async::CORBA::Property do - before do + before do Orocos::Async.clear + @ns = Orocos::Async::NameService.new + @ns << Orocos::CORBA::NameService.new + + start "process" + @task = @ns.get("process_Test") end - describe "When connect to a remote task" do - it "must return a property object" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = t1.property("prop1") - p.must_be_kind_of Orocos::Async::CORBA::Property - end + describe "When connect to a remote task" do + it "returns a property synchronously" do + p = @task.property("prop1") + assert_kind_of Orocos::Async::CORBA::Property, p end - it "must asynchronously return a property" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = nil - t1.property("prop1") do |prop| - p = prop - end - sleep 0.1 - Orocos::Async.step - p.must_be_kind_of Orocos::Async::CORBA::Property - end + it "returns a property asynchronously" do + p = nil + @task.property("prop1") { |prop| p = prop } + wait_for { p } + assert_kind_of Orocos::Async::CORBA::Property, p end - it "must call on_change" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = t1.property("prop2") - p.period = 0.1 - vals = Array.new - p.on_change do |data| - vals << data - end - sleep 0.1 - Orocos::Async.steps - assert_equal 1,vals.size - p.write 33 - sleep 0.1 - Orocos::Async.steps - assert_equal 2,vals.size - assert_equal 33,vals.last + it "calls on_change with the property values" do + p = @task.property("prop2") + p.period = 0.01 + + vals = [] + p.on_change do |data| + vals << data end + wait_for_equality [84], vals + p.write 33 + wait_for_equality [84, 33], vals end end end @@ -51,48 +40,36 @@ describe Orocos::Async::CORBA::Attribute do include Orocos::Spec - before do + before do Orocos::Async.clear + @ns = Orocos::Async::NameService.new + @ns << Orocos::CORBA::NameService.new + + start "process" + @task = @ns.get("process_Test") end - describe "When connect to a remote task" do - it "must return a attribute object" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = t1.attribute("att2") - a.must_be_kind_of Orocos::Async::CORBA::Attribute - end + describe "When connect to a remote task" do + it "returns an attribute object synchronously" do + a = @task.attribute("att2") + assert_kind_of Orocos::Async::CORBA::Attribute, a end - it "must asynchronously return a attribute" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = nil - t1.attribute("att2") do |prop| - a = prop - end - sleep 0.1 - Orocos::Async.step - a.must_be_kind_of Orocos::Async::CORBA::Attribute - end + it "returns an attribute object asynchronously" do + a = nil + @task.attribute("att2") { |prop| a = prop } + wait_for { a } + assert_kind_of Orocos::Async::CORBA::Attribute, a end - it "must call on_change" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = t1.attribute("att2") - a.period = 0.1 - vals = Array.new - a.on_change do |data| - vals << data - end - sleep 0.1 - Orocos::Async.steps - assert_equal 1,vals.size - a.write 33 - sleep 0.1 - Orocos::Async.steps - assert_equal 2,vals.size - assert_equal 33,vals.last + it "calls on_change with the attribute values" do + a = @task.attribute("att2") + a.period = 0.01 + vals = [] + a.on_change do |data| + vals << data end + wait_for_equality [84], vals + a.write 33 + wait_for_equality [84, 33], vals end end end diff --git a/test/async/test_name_service.rb b/test/async/test_name_service.rb index 330fa66c..c22deb3e 100644 --- a/test/async/test_name_service.rb +++ b/test/async/test_name_service.rb @@ -2,7 +2,7 @@ require 'orocos/async' describe Orocos::Async::NameService do - before do + before do Orocos::Async.clear end @@ -12,7 +12,7 @@ it "should raise NotFound if remote task is not reachable" do ns = Orocos::Async::NameService.new - assert_raises Orocos::NotFound do + assert_raises Orocos::NotFound do ns.get "bla" end end @@ -21,27 +21,24 @@ assert Orocos::Async.name_service.reachable? end - it "should return a TaskContextProxy" do + it "should return a TaskContextProxy" do Orocos.run('process') do ns = Orocos::Async::NameService.new - ns << Orocos::Async::CORBA::NameService.new + ns << Orocos::CORBA::NameService.new t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext + assert_kind_of Orocos::Async::CORBA::TaskContext, t t2 = nil - ns.get "process_Test" do |task| - t2 = task - end - sleep 0.1 - Orocos::Async.step - t2.must_be_instance_of Orocos::Async::CORBA::TaskContext + ns.get("process_Test") { |task| t2 = task } + wait_for { t2 } + assert_kind_of Orocos::Async::CORBA::TaskContext, t2 end end - it "should report that new task are added and removed" do + it "should report that new task are added and removed" do ns = Orocos::Async::NameService.new(:period => 0) - ns << Orocos::Async::CORBA::NameService.new + ns << Orocos::CORBA::NameService.new names_added = [] names_removed = [] ns.on_task_added do |n| @@ -63,120 +60,3 @@ assert_equal "/process_Test",names_removed.first end end - -describe Orocos::Async::CORBA::NameService do - include Orocos::Spec - - before do - Orocos::Async.clear - end - - it "should raise NotFound if remote task is not reachable" do - ns = Orocos::Async::CORBA::NameService.new - assert_raises Orocos::NotFound do - ns.get "bla" - end - end - - it "should not raise NotFound if remote task is not reachable and a block is given" do - ns = Orocos::Async::CORBA::NameService.new - - not_called = true - ns.get "bla" do |task| - not_called = false - end - - error = nil - ns.get "bla" do |task,err| - error = err - end - - sleep 0.1 - Orocos::Async.step - assert not_called - error.must_be_instance_of Orocos::NotFound - end - - it "should have a global default instance" do - Orocos::Async::CORBA.name_service.must_be_instance_of Orocos::Async::CORBA::NameService - end - - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::CORBA::NameService.new - t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext - - t2 = nil - ns.get "process_Test" do |task| - t2 = task - end - sleep 0.1 - Orocos::Async.step - t2.must_be_instance_of Orocos::Async::CORBA::TaskContext - end - end -end - -describe Orocos::Async::Local::NameService do - include Orocos::Spec - - before do - Orocos::Async.clear - end - - it "should raise NotFound if remote task is not reachable" do - ns = Orocos::Async::Local::NameService.new - assert_raises Orocos::NotFound do - ns.get "bla" - end - end - - it "should not raise NotFound if remote task is not reachable and a block is given" do - ns = Orocos::Async::Local::NameService.new - - not_called = true - ns.get "bla" do |task| - not_called = false - end - - error = nil - ns.get "bla" do |task,err| - error = err - end - - sleep 0.1 - Orocos::Async.step - assert not_called - error.must_be_instance_of Orocos::NotFound - end - - describe "#get" do - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::Local::NameService.new - t = Orocos.get "process_Test" - ns.register t - t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext - t.wait - assert t.reachable? - end - end - end - - describe "#proxy" do - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::Local::NameService.new - t = Orocos.get "process_Test" - ns.register t - t = ns.proxy "process_Test" - t.must_be_instance_of Orocos::Async::TaskContextProxy - t.wait - assert t.reachable? - end - end - end -end - diff --git a/test/async/test_task.rb b/test/async/test_task.rb index 283b7272..9e6ba5de 100644 --- a/test/async/test_task.rb +++ b/test/async/test_task.rb @@ -2,19 +2,18 @@ require 'orocos/async' describe Orocos::Async::CORBA::TaskContext do - before do + before do Orocos::Async.clear end - describe "initialize" do - before do + describe "initialize" do + before do Orocos::Async.clear end it "should raise ComError if remote task is not reachable and :raise is set to true" do - Orocos::CORBA.connect_timeout = 50 - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior("bla"),:raise => true) + Orocos::Async::CORBA::TaskContext.new(ior: ior("bla"), raise: true) sleep 0.1 assert_raises(Orocos::CORBA::ComError) do Orocos::Async.step @@ -45,46 +44,46 @@ end it "can be initialized from ior" do - Orocos.run('process') do - ior = Orocos.name_service.ior('process_Test') - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior) - assert t1.reachable? - t1 = Orocos::Async::CORBA::TaskContext.new(ior) - assert t1.reachable? - Orocos::Async.steps - end + start "process" + + ior = Orocos.name_service.ior("process_Test") + t1 = Orocos::Async::CORBA::TaskContext.new(ior: ior) + wait_for { t1.reachable? } + t1 = Orocos::Async::CORBA::TaskContext.new(ior) + wait_for { t1.reachable? } + Orocos::Async.steps end it "can be initialized from Orocos::TaskContext" do - Orocos.run('process') do - t1 = Orocos.name_service.get "process_Test" - t2 = Orocos::Async::CORBA::TaskContext.new(t1) - assert t2.reachable? - t2 = Orocos::Async::CORBA::TaskContext.new(:use => t1) - assert t2.reachable? - Orocos::Async.steps - end + start "process" + + t1 = Orocos.name_service.get "process_Test" + t2 = Orocos::Async::CORBA::TaskContext.new(t1) + wait_for { t2.reachable? } + t2 = Orocos::Async::CORBA::TaskContext.new(use: t1) + wait_for { t2.reachable? } + Orocos::Async.steps end it "can be initialized from Orocos::Async::CORBA::TaskContext" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior("process_Test")) - assert t1.reachable? - t2 = Orocos::Async::CORBA::TaskContext.new(t1) - assert t2.reachable? - Orocos::Async.steps - end + start "process" + + t1 = Orocos::Async::CORBA::TaskContext.new(ior: ior("process_Test")) + wait_for { t1.reachable? } + t2 = Orocos::Async::CORBA::TaskContext.new(t1) + wait_for { t2.reachable? } + Orocos::Async.steps end - it 'should have the instance methods from Orocos::TaskContext' do + it "has the instance methods from Orocos::TaskContext" do methods = Orocos::Async::CORBA::TaskContext.instance_methods Orocos::TaskContext.instance_methods.each do |method| - methods.include?(method).wont_be_nil + assert methods.include?(method) end end end - describe "Async access" do + describe "Async access" do it "should raise on all synchronous calls to the remote task if not reachable" do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) t1.reachable?.must_equal false # only function that should never raise @@ -92,7 +91,7 @@ assert_raises Orocos::CORBA::ComError do t1.has_port?("bla").must_equal false end - assert_raises Orocos::CORBA::ComError do + assert_raises Orocos::CORBA::ComError do t1.has_attribute?("bla").must_equal false end t1.attribute_names do |names,e| @@ -107,23 +106,23 @@ connect = nil disconnect = nil t1 = nil + Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test'),:period => 0.1,:watchdog => true) + t1 = Orocos::Async::CORBA::TaskContext.new( + ior('process_Test'), period: 0.01, watchdog: true + ) t1.on_reachable do connect = true end t1.on_unreachable do disconnect = true end - Orocos::Async.steps - assert connect + wait_for { connect } end - sleep 0.11 - Orocos::Async.steps - assert disconnect + wait_for { disconnect } end - it "should call on_port_reachable" do + it "should call on_port_reachable" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source'),:period => 0.1,:watchdog => true) ports = [] @@ -135,7 +134,7 @@ Orocos::Async.step assert_equal ["cycle", "cycle_struct", "out0", "out1", "out2", "out3", "state"],ports.sort ports.clear - + #should be called even if the task is already reachable t1.on_port_reachable do |name| ports << name @@ -146,7 +145,7 @@ end - it "should call on_port_unreachable" do + it "should call on_port_unreachable" do t1 = nil ports = [] Orocos.run('simple_source') do @@ -164,7 +163,7 @@ assert_equal ["cycle", "cycle_struct", "out0", "out1", "out2", "out3", "state"],ports.sort end - it "should call on_property_reachable" do + it "should call on_property_reachable" do process = start('process::Test' => 'process_Test') t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test'),:period => 0.1,:watchdog => true) @@ -186,7 +185,7 @@ assert_equal ["dynamic_prop","dynamic_prop_setter_called","prop1", "prop2", "prop3"],properties end - it "should call on_port_reachable if a port was dynamically added" do + it "should call on_port_reachable if a port was dynamically added" do task = Orocos::RubyTasks::TaskContext.new("test") t1 = Orocos::Async::CORBA::TaskContext.new(ior('test'),:period => 0.1,:watchdog => true) ports = [] @@ -201,7 +200,7 @@ assert_equal ['state', "frame","frame2"], ports end - it "should call on_port_unreachable if a port was dynamically removed" do + it "should call on_port_unreachable if a port was dynamically removed" do task = Orocos::RubyTasks::TaskContext.new("test") t1 = Orocos::Async::CORBA::TaskContext.new(ior('test'),:period => 0.1,:watchdog => true) port = task.create_output_port("frame","string") @@ -220,7 +219,7 @@ Orocos::Async.steps assert_equal ["frame"], ports end - + it "should call on_error" do t1 = nil error = nil @@ -263,7 +262,7 @@ Orocos::Async.step t1 end - t1.port_names do + t1.port_names do end sleep 0.1 Orocos::Async.steps @@ -287,7 +286,7 @@ end end - it "should return its ports" do + it "should return its ports" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source')) names = t1.port_names @@ -300,7 +299,7 @@ end end - it "should asynchronously return its ports" do + it "should asynchronously return its ports" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source')) queue = Queue.new @@ -325,7 +324,7 @@ Orocos.run('process') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) q = Queue.new - 0.upto 19 do + 0.upto 19 do t1.reachable? do |val| sleep 0.15 # this will ensure that no thread can run twice q << val @@ -341,7 +340,7 @@ Orocos.run('process') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) q = Queue.new - 0.upto 9 do + 0.upto 9 do t1.model do |val| sleep 0.1 # this will ensure that no thread can run twice q << val diff --git a/test/async/test_task_proxy.rb b/test/async/test_task_proxy.rb index 766a69fd..bd02bb78 100644 --- a/test/async/test_task_proxy.rb +++ b/test/async/test_task_proxy.rb @@ -22,9 +22,9 @@ t1 = Orocos::Async.proxy("simple_source_source") p = t1.port("cycle") sub_port = p.sub_port(:frame) - sub_port.must_be_instance_of Orocos::Async::PortProxy - sub_port = p.sub_port([:frame,:size]) - sub_port.must_be_instance_of Orocos::Async::PortProxy + assert_kind_of Orocos::Async::SubPortProxy, sub_port + sub_port = p.sub_port(%I[frame size]) + assert_kind_of Orocos::Async::SubPortProxy, sub_port end end @@ -32,7 +32,7 @@ it "should raise RuntimeError if an operation is performed which is not available for this port type" do Orocos.run('simple_sink') do t1 = Orocos::Async.proxy("simple_sink_sink") - p = t1.port("cycle",:wait => true) + p = t1.port("cycle", wait: true) assert_raises RuntimeError do p.period = 1 end @@ -46,12 +46,19 @@ it "should return the type name of the port if known or connected" do t1 = Orocos::Async.proxy("simple_source_source",:retry_period => 0.2,:period => 0.2) p = t1.port("cycle2",:type => Integer) - assert_equal "Fixnum",p.type_name + expected = + if Fixnum == Integer + "Integer" + else + "Fixnum" + end + + assert_equal expected, p.type_name p2 = t1.port("cycle") Orocos.run('simple_source') do p2.wait - assert_equal "Fixnum",p.type_name + assert_equal Fixnum.to_s, p.type_name end end @@ -131,7 +138,7 @@ t1.configure t1.start wait_for { data } - data.must_be_instance_of Fixnum + assert_kind_of Fixnum, data end end end @@ -142,7 +149,7 @@ describe "on_reachable" do it "must be called once when a prop gets reachable" do - prop = Orocos::Async.proxy("process_Test",:period => 0.09).property("prop1") + prop = Orocos::Async.proxy("process_Test", period: 0.09).property("prop1") counter = 0 prop.on_reachable do counter +=1 @@ -160,16 +167,17 @@ prop = Orocos::Async.proxy("process_Test",:period => 0.09).property("prop1") counter = 0 prop.on_unreachable do - counter +=1 + counter += 1 end - Orocos::Async.steps - assert_equal 1,counter + wait_for { !prop.valid_delegator? } + assert_equal 1, counter - Orocos.run('process') do + Orocos.run("process") do prop.wait + wait_for { prop.valid_delegator? } + assert prop.valid_delegator? end - Orocos::Async.steps - assert_equal 2,counter + wait_for { counter == 2 } end end end @@ -244,27 +252,25 @@ end it "should reconnect" do - t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.09) + t1 = Orocos::Async.proxy("process_Test", retry_period: 0.09) p = t1.attribute("att2") - vals = Array.new + vals = [] p.on_change do |data| vals << data end - Orocos.run('process') do - Orocos::Async.steps - assert p.reachable? + Orocos.run("process") do + wait_for { p.valid_delegator? } + wait_for { vals.size == 1 } end - assert_equal 1,vals.size - wait_for do - !t1.reachable? && !p.reachable? - end + wait_for { !t1.valid_delegator? } + refute p.valid_delegator? - Orocos.run('process') do - Orocos::Async.steps + Orocos.run("process") do + wait_for { p.valid_delegator? } + wait_for { vals.size == 2 } end - assert_equal 2,vals.size end end end @@ -333,9 +339,10 @@ end Orocos.run('process') do - wait_for { vals.size == 1 } + wait_for { vals.size >= 1 } end Orocos::Async.steps + Orocos.run('process') do wait_for { vals.size == 2 } end @@ -381,13 +388,13 @@ it "shortcut must return TaskContexProxy" do t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.1,:period => 0.1) - t1.must_be_instance_of Orocos::Async::TaskContextProxy + assert_kind_of Orocos::Async::TaskContextProxy, t1 end it "should return a port proxy" do t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.1,:period => 0.1) p = t1.port("test") - p.must_be_instance_of Orocos::Async::PortProxy + assert_kind_of Orocos::Async::PortProxy, p end it "should connect to a remote task when reachable" do @@ -407,23 +414,23 @@ Orocos::Async.steps assert_equal 1, disconnects # on_unreachable will be called because is not yet reachable - Orocos.run('process') do - Orocos::Async.steps # queue reconnect - assert t1.reachable? - t1.instance_variable_get(:@delegator_obj).must_be_instance_of Orocos::Async::CORBA::TaskContext + Orocos.run("process") do + wait_for { t1.reachable? } + assert_kind_of Orocos::Async::CORBA::TaskContext, + t1.instance_variable_get(:@delegator_obj) assert_equal 1, disconnects end - assert !t1.reachable? + refute t1.reachable? Orocos::Async.steps # queue reconnect assert_equal 1, connects assert_equal 2, disconnects - Orocos.run('process') do + Orocos.run("process") do Orocos::Async.steps # queue reconnect assert t1.reachable? end Orocos::Async.steps # queue reconnect - assert !t1.reachable? + refute t1.reachable? assert_equal 2, connects assert_equal 3, disconnects end diff --git a/test/test_configurations.rb b/test/test_configurations.rb index ab5a7a98..63802c72 100644 --- a/test/test_configurations.rb +++ b/test/test_configurations.rb @@ -927,7 +927,6 @@ def write_fixture_conf(content) # We must load all properties before we activate FakeFS task.each_property do |p| v = p.new_sample - v.zero! p.write v end flexmock(conf).should_receive(:save). @@ -985,9 +984,7 @@ def write_fixture_conf(content) @task = Orocos.get 'task' # We must load all properties before we activate FakeFS task.each_property do |p| - v = p.new_sample - v.zero! - p.write v + p.write p.new_sample end conf = Orocos::TaskConfigurations.new(task.model) @expected = conf.normalize_conf(Orocos::TaskConfigurations.read_task_conf(task)) diff --git a/test/test_output_reader.rb b/test/test_output_reader.rb index cead9b47..9ed2f43c 100644 --- a/test/test_output_reader.rb +++ b/test/test_output_reader.rb @@ -10,14 +10,14 @@ include Orocos::Spec it "should not be possible to create an instance directly" do - assert_raises(NoMethodError) { Orocos::OutputReader.new } + assert_raises(NoMethodError) { Orocos::OutputReader.new } end it "should offer read access on an output port" do Orocos.run('simple_source') do |source| source = source.task('source') output = source.port('cycle') - + # Create a new reader. The default policy is data reader = output.reader assert(reader.kind_of?(Orocos::OutputReader)) @@ -35,7 +35,7 @@ source.write_opaque(42) sleep(0.2) - + # Create a new reader. The default policy is data sample = reader.read assert_equal(42, sample.x) @@ -53,7 +53,7 @@ source.write_opaque(42) sleep(0.2) - + # Create a new reader. The default policy is data sample = output.new_sample returned_sample = reader.read(sample) @@ -69,7 +69,7 @@ output = source.port('cycle') source.configure source.start - + # The default policy is data reader = output.reader sleep(0.2) @@ -135,20 +135,20 @@ end end - it "should raise ComError if the remote end is dead and be disconnected" do - Orocos.run 'simple_source' do |source_p| - source = source_p.task('source') - output = source.port('cycle') + it "does not raise if the remote end is dead, but is disconnected" do + Orocos.run "simple_source" do |source_p| + source = source_p.task("source") + output = source.port("cycle") reader = output.reader source.configure source.start sleep(0.5) - source_p.kill(true, 'KILL') + source_p.kill(true, "KILL") - assert_raises(Orocos::CORBA::ComError) { reader.read } - assert(!reader.connected?) - end + reader.read # should not raise + refute reader.connected? + end end it "should get an initial value when :init is specified" do diff --git a/test/test_port.rb b/test/test_port.rb index 26b64683..25459540 100644 --- a/test/test_port.rb +++ b/test/test_port.rb @@ -105,11 +105,13 @@ describe "validation of message size" do it "initializes data_size by the value returned by #max_marshalling_size f data_size is zero" do flexmock(port).should_receive(:max_marshalling_size).and_return(10) + flexmock(Orocos::MQueue).should_receive(:validate_sizes?).and_return(false) assert_equal Hash[transport: Orocos::TRANSPORT_MQ, size: 42, data_size: 10], port.handle_mq_transport("input", transport: 0, size: 42, data_size: 0) end it "initializes data_size by the value returned by #max_marshalling_size f data_size is not given" do flexmock(port).should_receive(:max_marshalling_size).and_return(10) + flexmock(Orocos::MQueue).should_receive(:validate_sizes?).and_return(false) assert_equal Hash[transport: Orocos::TRANSPORT_MQ, size: 42, data_size: 10], port.handle_mq_transport("input", transport: 0, size: 42) end diff --git a/test/test_process.rb b/test/test_process.rb index 3b908c3f..88b3dfe3 100644 --- a/test/test_process.rb +++ b/test/test_process.rb @@ -10,7 +10,7 @@ project = OroGen::Spec::Project.new(@loader) project.default_task_superclass = OroGen::Spec::TaskContext.new( project, 'base::Task', subclasses: false - ) + ) @task_m = OroGen::Spec::TaskContext.new project, 'test::Task' @deployment_m = OroGen::Spec::Deployment.new project, 'test_deployment' @default_deployment_m = OroGen::Spec::Deployment.new project, 'orogen_default_test__Task' @@ -57,7 +57,7 @@ assert_equal Hash[deployment_m => nil], deployments end it "raises if an unexisting name is given" do - assert_raises(ArgumentError) do + assert_raises(OroGen::NotFound) do Orocos::Process.partition_run_options 'does_not_exist', loader: @loader end end @@ -183,51 +183,204 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) end end end - + describe "#spawn" do it "starts a new process and waits for it with a timeout" do - process = Orocos::Process.new('process') + process = Orocos::Process.new("process") # To ensure that the test teardown will kill it processes << process - process.spawn :wait => 10 - Orocos.get "process_Test" + process.spawn wait: 10 + process.task("process_Test") assert(process.alive?) assert(process.running?) end it "starts a new process and waits for it without a timeout" do - process = Orocos::Process.new('process') + process = Orocos::Process.new("process") # To ensure that the test teardown will kill it processes << process - process.spawn :wait => true - Orocos.get "process_Test" + process.spawn wait: true + process.task("process_Test") assert(process.alive?) assert(process.running?) end it "can automatically add prefixes to tasks" do - process = Orocos::Process.new 'process' + process = Orocos::Process.new "process" begin - process.spawn :prefix => 'prefix' - assert_equal Hash["process_Test" => "prefixprocess_Test"], process.name_mappings - assert Orocos.name_service.get('prefixprocess_Test') + process.spawn prefix: "prefix" + assert_equal Hash["process_Test" => "prefixprocess_Test"], + process.name_mappings + assert process.task("prefixprocess_Test") + ensure process.kill end end it "can rename single tasks" do - process = Orocos::Process.new 'process' + process = Orocos::Process.new "process" begin process.map_name "process_Test", "prefixprocess_Test" process.spawn - assert Orocos.name_service.get('prefixprocess_Test') + assert process.task("prefixprocess_Test") ensure process.kill end end end + describe "#load_and_validate_ior_message" do + it "loads and validates the ior message when it is valid" do + process = Orocos::Process.new("process") + message = "{\"process_Test\": \"IOR:123456\"}" + result = process.load_and_validate_ior_message(message) + assert_equal(result, JSON.parse(message)) + end + + it "returns nil when the ior message is not parseable" do + process = Orocos::Process.new("process") + # Missing the `}` + message = "{\"process_Test\": \"IOR:123456\"" + result = process.load_and_validate_ior_message(message) + assert_nil(result) + end + + it "raises invalid ior message when a task is not included in the ior message" do + process = Orocos::Process.new("process") + message = "{\"another_process\": \"IOR:123456\"}" + error = assert_raises(Orocos::InvalidIORMessage) do + process.load_and_validate_ior_message(message) + end + expected_error = Orocos::InvalidIORMessage.new( + "the following tasks were present on the ior message but werent in the " \ + "process task names: [\"another_process\"]" + ) + assert_equal(expected_error.message, error.message) + end + end + + describe "#wait_running" do + attr_reader :process + before do + @message = "{\"process_Test\": \"IOR:123456\"}" + @process = Orocos::Process.new("process") + flexmock(process).should_receive(:task) + end + + it "resolves the ior mappings" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + process.spawn(wait: false) + flexmock(read).should_receive(:read_nonblock) + .and_return(@message) + .and_return do + raise EOFError + end + result = process.wait_running(0) + assert_equal(JSON.parse(@message), result) + end + + it "stops waiting after the timeout was reached" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read).should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + t = Thread.new do + result = process.wait_running(0.1) + assert_nil(result) + end + t.join + end + + it "parses the message correctly even when it was first received partially" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + initial_message = @message.slice(0, 4) + rest_of_the_message = @message.slice(4, @message.length) + + read_mock = flexmock(read).should_receive(:read_nonblock) + read_mock.and_return(initial_message, rest_of_the_message) + read_mock.and_raise(EOFError) + process.spawn(wait: false) + result = process.wait_running(0.1) + assert_equal(JSON.parse(@message), result) + end + + it "stops waiting if the process has crashed" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read) + .should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + + alive_mock = flexmock(process).should_receive(:alive?) + alive_mock.and_return(true).ordered + alive_mock.and_return(false).ordered + e = assert_raises(Orocos::NotFound) do + process.wait_running(2) + end + assert_equal("process was started but crashed", e.message) + end + + it "raises if the process is dead after waiting" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read) + .should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + + alive_mock = flexmock(process).should_receive(:alive?) + alive_mock.and_return(false).ordered + e = assert_raises(Orocos::NotFound) do + process.wait_running(2) + end + assert_equal("cannot get a running process module", e.message) + end + end + + describe "#resolve_all_tasks" do + attr_reader :process + before do + @process = Orocos::Process.new("process") + end + + it "returns all the process tasks when their ior is registered" do + process.spawn(wait: false) + process.wait_running(0.1) + result = process.resolve_all_tasks + assert_includes(result, "process_Test") + assert_equal("process_Test", result["process_Test"].name) + end + + it "returns the already defined process tasks without checking their ior registration" do + process.spawn(wait: false) + process.wait_running(0.1) + process.resolve_all_tasks + spy = flexmock(:on, Orocos::Process) + process.resolve_all_tasks + assert_spy_not_called(spy, :ior_for) + end + + it "raises when ior is not registered" do + process.spawn(wait: false) + process.wait_running(0.1) + flexmock(process).should_receive(:ior_for) + e = assert_raises(Orocos::IORNotRegisteredError) do + process.resolve_all_tasks + end + assert("no IOR was registered for process", e.message) + end + end + describe "#kill" do it "stops a running process and clean up the name server" do Orocos.run('process') do |process| @@ -240,7 +393,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) it "stops the task if it is running" do Orocos.run("process") do |process| - task = Orocos.get "process_Test" + task = process.task("process_Test") state = nil flexmock(::Process) .should_receive(:kill) @@ -254,7 +407,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) it "does not attempt to stop the task if cleanup is false" do Orocos.run("process") do |process| - task = Orocos.get "process_Test" + task = process.task("process_Test") state = nil flexmock(::Process) .should_receive(:kill) @@ -292,7 +445,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) describe "#task" do it "can get a reference on a deployed task context by name" do Orocos.run('process') do |process| - assert(direct = Orocos::TaskContext.get('process_Test')) + assert(direct = process.task('process_Test')) assert(indirect = process.task("Test")) assert_equal(direct, indirect) end @@ -316,13 +469,13 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) describe "run" do it "can start a process with a prefix" do Orocos.run('process' => 'prefix') do |process| - assert(Orocos::TaskContext.get('prefixprocess_Test')) + assert(process.task('prefixprocess_Test')) end end it "can wait for the process to be running without a timeout" do - Orocos.run 'process', :wait => true do - Orocos.get 'process_Test' + Orocos.run 'process', wait: true do |process| + process.task("process_Test") end end end diff --git a/test/test_properties.rb b/test/test_properties.rb index 8c3b1d00..d1ccf9d3 100644 --- a/test/test_properties.rb +++ b/test/test_properties.rb @@ -150,10 +150,13 @@ it "should be able to enumerate its attributes" do Orocos.run('process') do |process| t = process.task('Test') - assert_equal %w{att1 att2 att3}, t.attribute_names.sort - assert_equal %w{att1 att2 att3}, t.each_attribute.map(&:name).sort - %w{att1 att2 att3}.each do |name| - t.has_attribute?(name) + usual_attributes = + %w[CycleCounter IOCounter TimeOutCounter TriggerCounter TriggerOnStart] + task_attributes = %w[att1 att2 att3] + expectation = usual_attributes + task_attributes + assert_equal expectation, t.attribute_names.sort + expectation.each do |name| + assert t.has_attribute?(name) end end end diff --git a/test/test_task.rb b/test/test_task.rb index 270e9fba..c3bfb70c 100644 --- a/test/test_task.rb +++ b/test/test_task.rb @@ -9,7 +9,7 @@ end it "should raise NotFound on unknown task contexts" do - assert_raises(Orocos::NotFound) { Orocos::TaskContext.get('Bla_Blo') } + assert_raises(Orocos::NotFound) { Orocos::TaskContext.get('Bla_Blo') } end it "should check equality based on CORBA reference" do @@ -53,13 +53,13 @@ source_p.must_be_kind_of(Orocos::OutputPort) source_p.name.must_equal("cycle") source_p.task.must_equal(source) - source_p.orocos_type_name.must_equal("int") + source_p.orocos_type_name.must_equal("/int32_t") assert(sink_p = sink.port('cycle')) sink_p.must_be_kind_of(Orocos::InputPort) sink_p.name.must_equal("cycle") sink_p.task.must_equal(sink) - sink_p.orocos_type_name.must_equal("int") + sink_p.orocos_type_name.must_equal("/int32_t") end end @@ -87,11 +87,14 @@ end end - it "should raise CORBA::ComError when #port is called on a dead remote process" do + it "should raise either CORBA::ComError or TimeoutError when #port is called "\ + "on a dead remote process" do Orocos.run('simple_source') do |p| source = Orocos::TaskContext.get("simple_source_source") p.kill - assert_raises(Orocos::CORBA::ComError) { source.port("cycle") } + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + source.port("cycle") + end end end @@ -100,8 +103,8 @@ echo = Orocos::TaskContext.get('echo_Echo') m = echo.operation(:write) assert_equal "write", m.name - assert_equal ["int"], m.return_spec - assert_equal [["value", "value_arg", "int"]], m.arguments_spec + assert_equal ["/int32_t"], m.return_spec + assert_equal [["value", "value_arg", "/int32_t"]], m.arguments_spec end end @@ -121,18 +124,24 @@ end end - it "should raise CORBA::ComError when the process crashed during a operation call" do - Orocos.run 'echo' do - echo = Orocos::TaskContext.get('echo_Echo') - assert_raises(Orocos::CORBA::ComError) { echo.operation(:kill).callop } + it "should raise CORBA::ComError when the process "\ + "crashed during a operation call" do + Orocos.run "echo" do + echo = Orocos::TaskContext.get("echo_Echo") + assert_raises(Orocos::CORBA::ComError) do + echo.operation(:kill).callop + end end end - it "should raise CORBA::ComError when #operation has communication errors" do + it "should raise either CORBA::ComError or CORBA::TimeoutError when #operation "\ + "has communication errors" do Orocos.run 'echo' do |p| echo = Orocos::TaskContext.get('echo_Echo') p.kill - assert_raises(Orocos::CORBA::ComError) { echo.operation(:write) } + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + echo.operation(:write) + end end end @@ -165,12 +174,18 @@ end end - it "should raise CORBA::ComError when state-related operations are called on a dead process" do + it "should raise either CORBA::ComError or CORBA::TimeoutError when state-related "\ + "operations are called on a dead process" do Orocos.run('simple_source') do |p| source = Orocos::TaskContext.get("simple_source_source") assert source.state p.kill - assert_raises(Orocos::CORBA::ComError) { source.state} + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + source.state + end + # TimeoutError is due to a race condition on ORB shutdown. After the + # call to 'state', there is no more race condition and this should + # always be a ComError assert_raises(Orocos::CORBA::ComError) { source.start } end end @@ -216,15 +231,21 @@ # Note: we don't have state_pre_operational as we already read it # once - assert_equal Orocos::TaskContext::STATE_STOPPED, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNTIME_ERROR, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNTIME_ERROR, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_EXCEPTION, state.read - assert_equal Orocos::TaskContext::STATE_PRE_OPERATIONAL, state.read + expected = [ + Orocos::TaskContext::STATE_STOPPED, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_EXCEPTION, + Orocos::TaskContext::STATE_EXCEPTION, + Orocos::TaskContext::STATE_PRE_OPERATIONAL + ] + actual = expected.map { state.read } + assert_equal expected, actual end end