From ead7980dde5c118fd4394c3ceb77e36b6df91fb9 Mon Sep 17 00:00:00 2001 From: Jhonas Date: Fri, 7 Oct 2022 10:52:15 -0300 Subject: [PATCH 01/15] fix(test): update failing unit tests --- test/test_port.rb | 2 ++ test/test_properties.rb | 11 +++++++---- test/test_task.rb | 8 ++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/test/test_port.rb b/test/test_port.rb index 26b64683..25459540 100644 --- a/test/test_port.rb +++ b/test/test_port.rb @@ -105,11 +105,13 @@ describe "validation of message size" do it "initializes data_size by the value returned by #max_marshalling_size f data_size is zero" do flexmock(port).should_receive(:max_marshalling_size).and_return(10) + flexmock(Orocos::MQueue).should_receive(:validate_sizes?).and_return(false) assert_equal Hash[transport: Orocos::TRANSPORT_MQ, size: 42, data_size: 10], port.handle_mq_transport("input", transport: 0, size: 42, data_size: 0) end it "initializes data_size by the value returned by #max_marshalling_size f data_size is not given" do flexmock(port).should_receive(:max_marshalling_size).and_return(10) + flexmock(Orocos::MQueue).should_receive(:validate_sizes?).and_return(false) assert_equal Hash[transport: Orocos::TRANSPORT_MQ, size: 42, data_size: 10], port.handle_mq_transport("input", transport: 0, size: 42) end diff --git a/test/test_properties.rb b/test/test_properties.rb index 8c3b1d00..d1ccf9d3 100644 --- a/test/test_properties.rb +++ b/test/test_properties.rb @@ -150,10 +150,13 @@ it "should be able to enumerate its attributes" do Orocos.run('process') do |process| t = process.task('Test') - assert_equal %w{att1 att2 att3}, t.attribute_names.sort - assert_equal %w{att1 att2 att3}, t.each_attribute.map(&:name).sort - %w{att1 att2 att3}.each do |name| - t.has_attribute?(name) + usual_attributes = + %w[CycleCounter IOCounter TimeOutCounter TriggerCounter TriggerOnStart] + task_attributes = %w[att1 att2 att3] + expectation = usual_attributes + task_attributes + assert_equal expectation, t.attribute_names.sort + expectation.each do |name| + assert t.has_attribute?(name) end end end diff --git a/test/test_task.rb b/test/test_task.rb index 270e9fba..dd36f0fc 100644 --- a/test/test_task.rb +++ b/test/test_task.rb @@ -53,13 +53,13 @@ source_p.must_be_kind_of(Orocos::OutputPort) source_p.name.must_equal("cycle") source_p.task.must_equal(source) - source_p.orocos_type_name.must_equal("int") + source_p.orocos_type_name.must_equal("/int32_t") assert(sink_p = sink.port('cycle')) sink_p.must_be_kind_of(Orocos::InputPort) sink_p.name.must_equal("cycle") sink_p.task.must_equal(sink) - sink_p.orocos_type_name.must_equal("int") + sink_p.orocos_type_name.must_equal("/int32_t") end end @@ -100,8 +100,8 @@ echo = Orocos::TaskContext.get('echo_Echo') m = echo.operation(:write) assert_equal "write", m.name - assert_equal ["int"], m.return_spec - assert_equal [["value", "value_arg", "int"]], m.arguments_spec + assert_equal ["/int32_t"], m.return_spec + assert_equal [["value", "value_arg", "/int32_t"]], m.arguments_spec end end From ac974c89c30d20c5af7a2c741b6af719b32dfe7a Mon Sep 17 00:00:00 2001 From: Jhonas Date: Fri, 7 Oct 2022 11:03:10 -0300 Subject: [PATCH 02/15] fix: change expected exception on partition run options tests --- test/test_process.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_process.rb b/test/test_process.rb index 3b908c3f..f7cacbd4 100644 --- a/test/test_process.rb +++ b/test/test_process.rb @@ -57,7 +57,7 @@ assert_equal Hash[deployment_m => nil], deployments end it "raises if an unexisting name is given" do - assert_raises(ArgumentError) do + assert_raises(OroGen::NotFound) do Orocos::Process.partition_run_options 'does_not_exist', loader: @loader end end From dddfe77da113d1a93cbf571a1e6e592a448729ab Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 13:40:45 -0300 Subject: [PATCH 03/15] fix: change hash-as-last-arg into proper keyword arguments --- lib/orocos/name_service.rb | 8 ++++---- lib/orocos/ports_base.rb | 4 ++-- lib/orocos/process.rb | 2 +- lib/orocos/task_context_base.rb | 6 +++--- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/orocos/name_service.rb b/lib/orocos/name_service.rb index 88c9dbbd..67104871 100644 --- a/lib/orocos/name_service.rb +++ b/lib/orocos/name_service.rb @@ -62,8 +62,8 @@ def self.cleanup end # (see NameService#get) - def self.get(name, options = Hash.new) - Orocos.name_service.get(name, options) + def self.get(name, **options) + Orocos.name_service.get(name, **options) end # Base class for all Orocos name services. An orocos name service is used @@ -310,11 +310,11 @@ def initialized? end #(see NameServiceBase#get) - def get(name,options = Hash.new) + def get(name, **options) name_services.each do |service| begin if service.same_namespace?(name) - task_context = service.get(name,options) + task_context = service.get(name, **options) return task_context if task_context end rescue Orocos::NotFound diff --git a/lib/orocos/ports_base.rb b/lib/orocos/ports_base.rb index 15cc0907..a8a629b0 100644 --- a/lib/orocos/ports_base.rb +++ b/lib/orocos/ports_base.rb @@ -61,9 +61,9 @@ def ==(other) other.task == self.task && other.name == self.name end - def ensure_type_available(options = Hash.new) + def ensure_type_available(**options) if !type || type.null? - @type = Orocos.find_type_by_orocos_type_name(orocos_type_name, options) + @type = Orocos.find_type_by_orocos_type_name(orocos_type_name, **options) end end diff --git a/lib/orocos/process.rb b/lib/orocos/process.rb index 0127abfa..97622d00 100644 --- a/lib/orocos/process.rb +++ b/lib/orocos/process.rb @@ -813,7 +813,7 @@ def self.run(*args, **options) name_mappings.each do |old, new| p.map_name old, new end - p.spawn(spawn_options) + p.spawn(**spawn_options) p end diff --git a/lib/orocos/task_context_base.rb b/lib/orocos/task_context_base.rb index bb5fa769..8d7125a4 100644 --- a/lib/orocos/task_context_base.rb +++ b/lib/orocos/task_context_base.rb @@ -51,9 +51,9 @@ def log_metadata ] end - def ensure_type_available(options = Hash.new) + def ensure_type_available(**options) if !type || type.null? - @type = Orocos.find_type_by_orocos_type_name(@orocos_type_name, options) + @type = Orocos.find_type_by_orocos_type_name(@orocos_type_name, **options) end end @@ -224,7 +224,7 @@ def self.get(options, process = nil) raise ArgumentError, 'no task name' if options.nil? name = options.to_str end - result = Orocos.name_service.get(name,{:process => process}) + result = Orocos.name_service.get(name, process: process) end # Find one running tasks from the provided names. Raises if there is not From 5c55edef27385fefcf622d42942207ddfb026e77 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 13:44:38 -0300 Subject: [PATCH 04/15] fix: use Type.zero to create properly initialized values Unlike zero! which zeroes even the enums, Type.zero creates a value that is properly initialized, but zero everywhere the "value" is not specified. --- lib/orocos/async/task_context_base.rb | 2 +- lib/orocos/async/task_context_proxy.rb | 6 +++--- lib/orocos/configurations.rb | 3 +-- lib/orocos/log/task_context.rb | 4 ++-- lib/orocos/ports_base.rb | 4 ++-- lib/orocos/task_context_base.rb | 2 +- test/test_configurations.rb | 5 +---- 7 files changed, 11 insertions(+), 15 deletions(-) diff --git a/lib/orocos/async/task_context_base.rb b/lib/orocos/async/task_context_base.rb index bf0f6af0..97923ea6 100644 --- a/lib/orocos/async/task_context_base.rb +++ b/lib/orocos/async/task_context_base.rb @@ -41,7 +41,7 @@ def self.to_ruby(task) prop = task.property(prop) prop.wait p = @ruby_task_context.create_property(prop.name,prop.type) - p.write p.new_sample.zero! + p.write p.new_sample prop.on_change do |data| p.write data end diff --git a/lib/orocos/async/task_context_proxy.rb b/lib/orocos/async/task_context_proxy.rb index d7e3f3f5..596e85b9 100644 --- a/lib/orocos/async/task_context_proxy.rb +++ b/lib/orocos/async/task_context_proxy.rb @@ -43,7 +43,7 @@ def type? end def new_sample - type.new + type.zero end def last_sample @@ -190,7 +190,7 @@ def type? end def new_sample - type.new + type.zero end def to_async(options=Hash.new) @@ -384,7 +384,7 @@ def orocos_type_name end def new_sample - type.new + type.zero end def last_sample diff --git a/lib/orocos/configurations.rb b/lib/orocos/configurations.rb index f304c14d..64744793 100644 --- a/lib/orocos/configurations.rb +++ b/lib/orocos/configurations.rb @@ -768,8 +768,7 @@ def conf_as_typelib(names, override: false) orocos_type = model.find_property(property_name).type typelib_type = loader.typelib_type_for(orocos_type) - typelib_value = typelib_type.new - typelib_value.zero! + typelib_value = typelib_type.zero result[property_name] = TaskConfigurations.apply_conf_on_typelib_value(typelib_value, ruby_value) end result diff --git a/lib/orocos/log/task_context.rb b/lib/orocos/log/task_context.rb index 437a3c8f..80aaa9c7 100644 --- a/lib/orocos/log/task_context.rb +++ b/lib/orocos/log/task_context.rb @@ -500,7 +500,7 @@ def disconnect_all #Returns a new sample object def new_sample - @type.new + @type.zero end #Clears all reader buffers @@ -596,7 +596,7 @@ def notify(&block) end def new_sample - type.new + type.zero end def orocos_type_name diff --git a/lib/orocos/ports_base.rb b/lib/orocos/ports_base.rb index a8a629b0..9c77db57 100644 --- a/lib/orocos/ports_base.rb +++ b/lib/orocos/ports_base.rb @@ -70,7 +70,7 @@ def ensure_type_available(**options) # Returns a new object of this port's type def new_sample ensure_type_available - @type.new + @type.zero end def log_metadata @@ -168,7 +168,7 @@ def writer(distance: PortBase::D_UNKNOWN, **policy) self.class.transient_local_port_name(full_name), orocos_type_name, permanent: false, - class: self.class.writer_class) + class: self.class.writer_class) end writer.port = self writer.policy = policy diff --git a/lib/orocos/task_context_base.rb b/lib/orocos/task_context_base.rb index 8d7125a4..913958cb 100644 --- a/lib/orocos/task_context_base.rb +++ b/lib/orocos/task_context_base.rb @@ -94,7 +94,7 @@ def log_value(value, timestamp = Time.now) def new_sample ensure_type_available - type.new + type.zero end def pretty_print(pp) # :nodoc: diff --git a/test/test_configurations.rb b/test/test_configurations.rb index ab5a7a98..63802c72 100644 --- a/test/test_configurations.rb +++ b/test/test_configurations.rb @@ -927,7 +927,6 @@ def write_fixture_conf(content) # We must load all properties before we activate FakeFS task.each_property do |p| v = p.new_sample - v.zero! p.write v end flexmock(conf).should_receive(:save). @@ -985,9 +984,7 @@ def write_fixture_conf(content) @task = Orocos.get 'task' # We must load all properties before we activate FakeFS task.each_property do |p| - v = p.new_sample - v.zero! - p.write v + p.write p.new_sample end conf = Orocos::TaskConfigurations.new(task.model) @expected = conf.normalize_conf(Orocos::TaskConfigurations.read_task_conf(task)) From af5976c7387ec13a3f5389b43442713dd4a9e9af Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 13:44:46 -0300 Subject: [PATCH 05/15] fix: style --- lib/orocos/async/task_context_base.rb | 10 +++++----- lib/orocos/configurations.rb | 11 +++++------ test/test_output_reader.rb | 22 +++++++++++----------- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/lib/orocos/async/task_context_base.rb b/lib/orocos/async/task_context_base.rb index 97923ea6..d24615cb 100644 --- a/lib/orocos/async/task_context_base.rb +++ b/lib/orocos/async/task_context_base.rb @@ -120,7 +120,7 @@ def really_add_listener(listener) # the calling order if listener.event == :port_reachable names = @port_names.dup - event_loop.once do + event_loop.once do names.each do |name| listener.call name end @@ -202,7 +202,7 @@ def configure_delegation(configure_options = Hash.new) # Disconnectes self from the remote task context and returns its underlying # object used to communicate with the remote task (designated object). - # + # # Returns nil if the TaskContext is not connected. # Returns an EventLoop Event if not called from the event loop thread. # @@ -305,7 +305,7 @@ def port(name, verify = true,options=Hash.new, &block) # call-seq: # task.each_property { |a| ... } => task - # + # # Enumerates the properties that are available on # this task, as instances of Orocos::Attribute def each_property(&block) @@ -320,7 +320,7 @@ def each_property(&block) # call-seq: # task.each_attribute { |a| ... } => task - # + # # Enumerates the attributes that are available on # this task, as instances of Orocos::Attribute def each_attribute(&block) @@ -335,7 +335,7 @@ def each_attribute(&block) # call-seq: # task.each_port { |p| ... } => task - # + # # Enumerates the ports that are available on this task, as instances of # either Orocos::InputPort or Orocos::OutputPort def each_port(&block) diff --git a/lib/orocos/configurations.rb b/lib/orocos/configurations.rb index 64744793..1acf8379 100644 --- a/lib/orocos/configurations.rb +++ b/lib/orocos/configurations.rb @@ -760,18 +760,17 @@ def conf_as_ruby(names, override: false) # # @see conf conf_to_ruby def conf_as_typelib(names, override: false) - c = conf(names, override) - return if !c + return unless (c = conf(names, override)) - result = Hash.new - c.each do |property_name, ruby_value| + c.each_with_object({}) do |(property_name, ruby_value), result| orocos_type = model.find_property(property_name).type typelib_type = loader.typelib_type_for(orocos_type) typelib_value = typelib_type.zero - result[property_name] = TaskConfigurations.apply_conf_on_typelib_value(typelib_value, ruby_value) + result[property_name] = TaskConfigurations.apply_conf_on_typelib_value( + typelib_value, ruby_value + ) end - result end # Applies the specified configuration to the given task diff --git a/test/test_output_reader.rb b/test/test_output_reader.rb index cead9b47..de2d3e1d 100644 --- a/test/test_output_reader.rb +++ b/test/test_output_reader.rb @@ -10,14 +10,14 @@ include Orocos::Spec it "should not be possible to create an instance directly" do - assert_raises(NoMethodError) { Orocos::OutputReader.new } + assert_raises(NoMethodError) { Orocos::OutputReader.new } end it "should offer read access on an output port" do Orocos.run('simple_source') do |source| source = source.task('source') output = source.port('cycle') - + # Create a new reader. The default policy is data reader = output.reader assert(reader.kind_of?(Orocos::OutputReader)) @@ -35,7 +35,7 @@ source.write_opaque(42) sleep(0.2) - + # Create a new reader. The default policy is data sample = reader.read assert_equal(42, sample.x) @@ -53,7 +53,7 @@ source.write_opaque(42) sleep(0.2) - + # Create a new reader. The default policy is data sample = output.new_sample returned_sample = reader.read(sample) @@ -69,7 +69,7 @@ output = source.port('cycle') source.configure source.start - + # The default policy is data reader = output.reader sleep(0.2) @@ -135,20 +135,20 @@ end end - it "should raise ComError if the remote end is dead and be disconnected" do - Orocos.run 'simple_source' do |source_p| - source = source_p.task('source') - output = source.port('cycle') + it "does not raise if the remote end is dead, but is disconnected" do + Orocos.run "simple_source" do |source_p| + source = source_p.task("source") + output = source.port("cycle") reader = output.reader source.configure source.start sleep(0.5) - source_p.kill(true, 'KILL') + source_p.kill(true, "KILL") assert_raises(Orocos::CORBA::ComError) { reader.read } assert(!reader.connected?) - end + end end it "should get an initial value when :init is specified" do From 67f5a77883e0c338c34872daa8c045c9d44c4361 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 14:13:11 -0300 Subject: [PATCH 06/15] chore: use wait2 instead of $! to get a process exit status --- lib/orocos/base.rb | 9 +++++---- lib/orocos/process.rb | 9 ++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/orocos/base.rb b/lib/orocos/base.rb index 6bba6381..ca71fef1 100644 --- a/lib/orocos/base.rb +++ b/lib/orocos/base.rb @@ -271,10 +271,11 @@ def self.initialize(name = "orocosrb_#{::Process.pid}") unless disable_sigchld_handler? trap("SIGCHLD") do begin - while dead = ::Process.wait(-1, ::Process::WNOHANG) - if mod = Orocos::Process.from_pid(dead) - mod.dead!($?) - end + loop do + dead_pid, dead_status = ::Process.wait2(-1, ::Process::WNOHANG) + break unless dead_pid + + Orocos::Process.from_pid(dead_pid)&.dead!(dead_status) end rescue Errno::ECHILD end diff --git a/lib/orocos/process.rb b/lib/orocos/process.rb index 97622d00..abd3244e 100644 --- a/lib/orocos/process.rb +++ b/lib/orocos/process.rb @@ -443,12 +443,11 @@ def initialize(name, model = name, def join return unless alive? - begin - ::Process.waitpid(pid) - exit_status = $? + begin + _, exit_status = ::Process.waitpid2(pid) dead!(exit_status) - rescue Errno::ECHILD - end + rescue Errno::ECHILD + end end # True if the process is running From 1516f994dafc8d1a5bfc3b67ef408da614173f85 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 14:58:59 -0300 Subject: [PATCH 07/15] fix: handle a race condition when accessing CORBA on dying remote processes If the call happens before the connection is actually closed, OmniORB handles it as a timeout. Otherwise, it is a ComError. This is actually not-so-great in our case, as we really do not want the timeout to happen. Note that it does not apply to crashing components, so the general Syskit behavior should not be affected. --- test/test_task.rb | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/test/test_task.rb b/test/test_task.rb index dd36f0fc..fb54d325 100644 --- a/test/test_task.rb +++ b/test/test_task.rb @@ -9,7 +9,7 @@ end it "should raise NotFound on unknown task contexts" do - assert_raises(Orocos::NotFound) { Orocos::TaskContext.get('Bla_Blo') } + assert_raises(Orocos::NotFound) { Orocos::TaskContext.get('Bla_Blo') } end it "should check equality based on CORBA reference" do @@ -87,11 +87,14 @@ end end - it "should raise CORBA::ComError when #port is called on a dead remote process" do + it "should raise either CORBA::ComError or TimeoutError when #port is called "\ + "on a dead remote process" do Orocos.run('simple_source') do |p| source = Orocos::TaskContext.get("simple_source_source") p.kill - assert_raises(Orocos::CORBA::ComError) { source.port("cycle") } + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + source.port("cycle") + end end end @@ -121,18 +124,24 @@ end end - it "should raise CORBA::ComError when the process crashed during a operation call" do - Orocos.run 'echo' do - echo = Orocos::TaskContext.get('echo_Echo') - assert_raises(Orocos::CORBA::ComError) { echo.operation(:kill).callop } + it "should raise CORBA::ComError when the process "\ + "crashed during a operation call" do + Orocos.run "echo" do + echo = Orocos::TaskContext.get("echo_Echo") + assert_raises(Orocos::CORBA::ComError) do + echo.operation(:kill).callop + end end end - it "should raise CORBA::ComError when #operation has communication errors" do + it "should raise either CORBA::ComError or CORBA::TimeoutError when #operation "\ + "has communication errors" do Orocos.run 'echo' do |p| echo = Orocos::TaskContext.get('echo_Echo') p.kill - assert_raises(Orocos::CORBA::ComError) { echo.operation(:write) } + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + echo.operation(:write) + end end end @@ -165,12 +174,18 @@ end end - it "should raise CORBA::ComError when state-related operations are called on a dead process" do + it "should raise either CORBA::ComError or CORBA::TimeoutError when state-related "\ + "operations are called on a dead process" do Orocos.run('simple_source') do |p| source = Orocos::TaskContext.get("simple_source_source") assert source.state p.kill - assert_raises(Orocos::CORBA::ComError) { source.state} + assert_raises(Orocos::CORBA::ComError, Orocos::CORBA::TimeoutError) do + source.state + end + # TimeoutError is due to a race condition on ORB shutdown. After the + # call to 'state', there is no more race condition and this should + # always be a ComError assert_raises(Orocos::CORBA::ComError) { source.start } end end From f948753cb3fe96e250ba9d50ac59236f291ec1a6 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 15:18:30 -0300 Subject: [PATCH 08/15] fix: update state reading test after the introduction of setTaskStates a846ec3692b5020c3ed920683c3393cfc828943d uses the setTaskStates hook in RTT to get properly ordered state changes (see the corresponding commit in RTT 07d7922ba1f0ced3f0288652edca8c6bbd69200f). This causes some state change notifications to be duplicated (notified once by orogen and once by RTT) --- test/test_task.rb | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/test/test_task.rb b/test/test_task.rb index fb54d325..c3bfb70c 100644 --- a/test/test_task.rb +++ b/test/test_task.rb @@ -231,15 +231,21 @@ # Note: we don't have state_pre_operational as we already read it # once - assert_equal Orocos::TaskContext::STATE_STOPPED, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNTIME_ERROR, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_RUNTIME_ERROR, state.read - assert_equal Orocos::TaskContext::STATE_RUNNING, state.read - assert_equal Orocos::TaskContext::STATE_EXCEPTION, state.read - assert_equal Orocos::TaskContext::STATE_PRE_OPERATIONAL, state.read + expected = [ + Orocos::TaskContext::STATE_STOPPED, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNTIME_ERROR, + Orocos::TaskContext::STATE_RUNNING, + Orocos::TaskContext::STATE_EXCEPTION, + Orocos::TaskContext::STATE_EXCEPTION, + Orocos::TaskContext::STATE_PRE_OPERATIONAL + ] + actual = expected.map { state.read } + assert_equal expected, actual end end From 2ff3c8e16fd4ef3733ee76b8b17e0352081976c0 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Fri, 7 Oct 2022 15:15:09 -0300 Subject: [PATCH 09/15] fix: partially reintroduce sanity check on OutputReader's read path This partially reverts commit c7f516d451dc99c951f6d631913dedb32b527982. cf252a453d52a480a8dee4f94b6948fe111bdda7 changed the name of the "core" read method. This method was overloaded in OutputReader to raise if the remote process was dead, and disconnect the read side. I think the boat has sailed on {#read} not raising, and I'm not willing to re-introduce that behavior. However, I do believe the disconnect_all is a good move: it cleans up, and makes sure that a caller that cares does get notified that the reader is dead. --- lib/orocos/output_reader.rb | 12 ++++++++++++ test/test_output_reader.rb | 4 ++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/lib/orocos/output_reader.rb b/lib/orocos/output_reader.rb index 1296e5dc..f7429de6 100644 --- a/lib/orocos/output_reader.rb +++ b/lib/orocos/output_reader.rb @@ -7,6 +7,18 @@ class OutputReader < RubyTasks::LocalInputPort # The policy of the connection attr_accessor :policy + # Helper method for #read and #read_new + # + # This is overloaded in OutputReader to raise CORBA::ComError if the + # process supporting the remote task is known to be dead + def raw_read_with_result(sample, copy_old_data) + if (process = port.task.process) + disconnect_all unless process.alive? + end + + super + end + # Reads a sample on the associated output port. Returns a value as soon # as a sample has ever been written to the port since the data reader # has been created diff --git a/test/test_output_reader.rb b/test/test_output_reader.rb index de2d3e1d..9ed2f43c 100644 --- a/test/test_output_reader.rb +++ b/test/test_output_reader.rb @@ -146,8 +146,8 @@ source_p.kill(true, "KILL") - assert_raises(Orocos::CORBA::ComError) { reader.read } - assert(!reader.connected?) + reader.read # should not raise + refute reader.connected? end end From 05afcf0cafc86b5df6abc3fc70db96dae34aa13c Mon Sep 17 00:00:00 2001 From: Jhonas Date: Thu, 22 Sep 2022 15:38:11 -0300 Subject: [PATCH 10/15] feat: read IOR of a task context from pipe By reading the IOR of a task context from a pipe, we are not limited by CORBA name service to resolve the tasks. The wait running method must be called to register the IORs for each task in the process. This is done by reading the message from the pipe, validating it and saving it as a Hash. Later, this Hash is used to resolve the tasks when the resolve_all_tasks method is called. Since this method might be called before wait_running is ran, a nonblocking wait_running is called in it to ensure that the IORs are registered. Overall, the expected pipeline for Syskit is: The process client calls the process server, which spawns the Orocos::Process. On spawn, the IOR pipe is created. Then, the execution engine triggers the deployment ready resolution, which calls for the process' wait_running, expecting the IOR mappings to be resolved. They are registered and then resolve_all_tasks is called. If the IORs are valid, the deployment resolve the remote task handles and hopefully is becomes ready. --- lib/orocos/process.rb | 284 ++++++++++++++++++++++++------------------ 1 file changed, 161 insertions(+), 123 deletions(-) diff --git a/lib/orocos/process.rb b/lib/orocos/process.rb index abd3244e..c71840a9 100644 --- a/lib/orocos/process.rb +++ b/lib/orocos/process.rb @@ -1,8 +1,15 @@ require 'utilrb/pkgconfig' require 'orogen' require 'fcntl' +require 'json' module Orocos + # Exception raised when there is no IOR registered for a given task name. + class IORNotRegisteredError < Orocos::NotFound; end + + # Exception raised when the received IOR message is invalid. + class InvalidIORMessage < Orocos::NotFound; end + # The working directory that should be used by default in Orocos.run def self.default_working_directory @default_working_directory || Dir.pwd @@ -162,6 +169,7 @@ def orogen; model end # The set of task contexts for this process. This is valid only after # the process is actually started attr_reader :tasks + attr_reader :ior_mappings def initialize(name, model, name_mappings: Hash.new) @name, @model = name, model @@ -169,6 +177,9 @@ def initialize(name, model, name_mappings: Hash.new) self.name_mappings = name_mappings @logged_ports = Set.new @tasks = [] + @ior_mappings = nil + @ior_message = "" + @ior_read_fd = nil end # Sets a batch of name mappings @@ -198,8 +209,9 @@ def get_mapped_name(name) # # See also #each_task def task_names - if !model - raise Orocos::NotOrogenComponent, "#{name} does not seem to have been generated by orogen" + unless model + raise Orocos::NotOrogenComponent, + "#{name} does not seem to have been generated by orogen" end model.task_activities.map do |deployed_task| name = deployed_task.name @@ -220,19 +232,19 @@ def each_task # Returns the TaskContext instance for a task that runs in this process, # or raises Orocos::NotFound. - def task(task_name, name_service = Orocos.name_service) + def task(task_name) full_name = "#{name}_#{task_name}" - if result = tasks.find { |t| t.basename == task_name || t.basename == full_name } + if (result = tasks.find { |t| [task_name, full_name].include?(t.basename) }) return result end - if task_names.include?(task_name) - name_service.get task_name, process: self - elsif task_names.include?(full_name) - name_service.get full_name, process: self - else - raise Orocos::NotFound, "no task #{task_name} defined on #{name}" + ior = ior_for(task_name) || ior_for(full_name) + unless ior + raise Orocos::IORNotRegisteredError, + "no IOR is registered for #{task_name}" end + + Orocos::TaskContext.new(ior, process: self) end def register_task(task) @@ -240,6 +252,10 @@ def register_task(task) @tasks << task end + def ior_for(task_name) + @ior_mappings&.fetch(task_name, nil) + end + # Requires all known ports of +self+ to be logged by the default logger def log_all_ports(options = Hash.new) @logged_ports |= Orocos.log_all_process_ports(self, options) @@ -334,6 +350,90 @@ def self.resolve_prefix(model, prefix) end return name_mappings end + + # Read the IOR pipe and parse the received message, closing the read file + # descriptor when end of file is reached. + # + # @return [nil, Hash] when eof is reached and the message is valid + # return a { task name => ior } hash. If the process dies or a IO::WaitReadable + # is raised, returns nil. + def resolve_running_tasks + return unless alive? + + begin + loop do + @ior_message += @ior_read_fd.read_nonblock(4096) + end + rescue IO::WaitReadable + return + rescue EOFError + @ior_read_fd.close + load_and_validate_ior_message(@ior_message) + end + end + + # Waits the running tasks resolution for a given amount of time. + # + # @param [nil, Boolean, Float] timeout when nil, this method blocks until all the + # running tasks are resolved or the process crashes. When given a number, it + # block for the given amount of time in milliseconds. When given a boolean, false + # is equivalent as passing 0 as argument, and true is equivalent to passing nil. + # @return [Hash] mappings of { task name => IOR } + # @raise Orocos::NotFound if the process dies during execution + # @raise Orocos::InvalidIORMessage if the message received is invalid + def wait_running(timeout = nil, &block) + return block.call if block_given? + return @ior_mappings if @ior_mappings + + start_time = Time.now + timeout = transform_timeout(timeout) + deadline = start_time + timeout unless timeout == Float::INFINITY + got_alive = alive? + loop do + @ior_mappings = resolve_running_tasks + return @ior_mappings if @ior_mappings + break if timeout < Time.now - start_time + + if got_alive && !alive? + raise Orocos::NotFound, "#{name} was started but crashed" + end + + time_until_deadline = [deadline - Time.now, 0].max if deadline + IO.select([@ior_read_fd], nil, nil, time_until_deadline) + end + + raise Orocos::NotFound, "cannot get a running #{name} module" unless alive? + end + + def transform_timeout(timeout) + return timeout if timeout.kind_of?(Numeric) + + return Float::INFINITY if timeout.nil? || timeout == true + + 0 + end + + # Load and validate the ior message read from the IOR pipe. + # + # @param [String] message the ior message read from the pipe + # @return [Hash, nil] the parsed ior message as a + # { task name => ior} hash, or nil if the message could not be parsed. + # @raise Orocos::InvalidIORMessage raised if any task name present in the message + # is not present in the process' task names. + def load_and_validate_ior_message(message) + begin + message = JSON.parse(message) + rescue JSON::ParserError + return + end + + all_included = message.keys.all? { |name| task_names.include?(name) } + return message if all_included + + raise Orocos::InvalidIORMessage, + "the following tasks were present on the ior message but werent in "\ + "the process task names: #{message.keys - task_names}" + end end # The representation of an Orocos process. It manages @@ -349,11 +449,11 @@ class Process < ProcessBase # # @param [Integer] pid the PID whose process we are looking for # @return [nil,Process] the process object whose PID matches, or nil - def self.from_pid(pid) - if result = registered_processes[pid] + def self.from_pid(pid) + if result = registered_processes[pid] return result end - end + end class << self # A map of existing running processes @@ -456,7 +556,7 @@ def alive?; !!@pid end def running?; alive? end # Called externally to announce a component dead. - def dead!(exit_status) # :nodoc: + def dead!(exit_status) # :nodoc: exit_status = (@exit_status ||= exit_status) if !exit_status Orocos.info "deployment #{name} exited, exit status unknown" @@ -479,7 +579,8 @@ def dead!(exit_status) # :nodoc: Orocos.warn "deployment #{name} terminated with code #{exit_status.to_i}" end - pid, @pid = @pid, nil + pid = @pid + @pid = nil Process.deregister(pid) # Force unregistering the task contexts from CORBA naming @@ -488,9 +589,9 @@ def dead!(exit_status) # :nodoc: # puts "deregistering #{name}" # Orocos::CORBA.unregister(name) # end - end + end - @@logfile_indexes = Hash.new + @@logfile_indexes = Hash.new class TaskNameRequired < ArgumentError; end @@ -812,7 +913,7 @@ def self.run(*args, **options) name_mappings.each do |old, new| p.map_name old, new end - p.spawn(**spawn_options) + p.spawn(spawn_options) p end @@ -879,12 +980,13 @@ def self.allocate_gdb_port # Massages various spawn parameters into the actual deployment command line # # @return [CommandLine] - def command_line(working_directory: Orocos.default_working_directory, - log_level: nil, - cmdline_args: Hash.new, - tracing: Orocos.tracing?, - gdb: nil, valgrind: nil, - name_service_ip: Orocos::CORBA.name_service_ip) + def command_line( + working_directory: Orocos.default_working_directory, + log_level: nil, + cmdline_args: Hash.new, + tracing: Orocos.tracing?, gdb: nil, valgrind: nil, + name_service_ip: Orocos::CORBA.name_service_ip + ) result = CommandLine.new(Hash.new, nil, [], working_directory) result.command = binfile @@ -973,13 +1075,16 @@ def command_line(working_directory: Orocos.default_working_directory, # true will enable valgrind support. Setting it to an array of strings will # specify a list of arguments that should be passed to valgrind # itself. This is obviously incompatible with the gdb option. - def spawn(log_level: nil, working_directory: Orocos.default_working_directory, - cmdline_args: Hash.new, - oro_logfile: "orocos.%m-%p.txt", - prefix: nil, tracing: Orocos.tracing?, name_service: Orocos::CORBA.name_service, - wait: nil, - output: nil, - gdb: nil, valgrind: nil) + def spawn( + log_level: nil, working_directory: Orocos.default_working_directory, + cmdline_args: Hash.new, + oro_logfile: "orocos.%m-%p.txt", + prefix: nil, tracing: Orocos.tracing?, + wait: nil, + output: nil, + gdb: nil, valgrind: nil, + name_service: Orocos::CORBA.name_service + ) raise "#{name} is already running" if alive? Orocos.info "starting deployment #{name}" @@ -989,14 +1094,6 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, name_mappings = prefix_mappings.merge(self.name_mappings) self.name_mappings = name_mappings - # If possible, check that we won't clash with an already running - # process - task_names.each do |name| - if name_service.task_reachable?(name) - raise ArgumentError, "there is already a running task called #{name}, are you starting the same component twice ?" - end - end - if wait.nil? wait = if valgrind then 600 @@ -1046,8 +1143,13 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, end end + @ior_read_fd, ior_write_fd = IO.pipe read, write = IO.pipe @pid = fork do + @ior_read_fd.close + # Pass write file descriptor for the IOR pipe as a commandline argument + cmdline_args["ior-write-fd"] = ior_write_fd.fileno + if tracing ENV['LD_PRELOAD'] = Orocos.tracing_library_path end @@ -1113,19 +1215,17 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, end read.close - write.fcntl(Fcntl::F_SETFD, 1) ::Process.setpgrp begin - if working_directory - Dir.chdir(working_directory) - end - exec(*cmdline) + exec(*cmdline, ior_write_fd => ior_write_fd, chdir: working_directory) rescue Exception write.write("FAILED") end end Process.register(self) + ior_write_fd.close + write.close if read.read == "FAILED" raise "cannot start #{name}" @@ -1141,93 +1241,31 @@ def spawn(log_level: nil, working_directory: Orocos.default_working_directory, elsif wait Float::INFINITY end - wait_running(timeout, name_service) + wait_running(timeout) end end - def self.resolve_all_tasks(process, cache = Hash.new) - # Get any task name from that specific deployment, and check we - # can access it. If there is none - all_reachable = process.task_names.all? do |task_name| - begin - cache[task_name] ||= yield(task_name) - rescue Orocos::NotFound - end - end - if all_reachable - cache - end - end - - # Wait for a process to become reachable + # Resolve all the tasks present on the process, creating a new Orocos::TaskContext + # if the task is not deployed yet. # - def self.wait_running(process, timeout = nil, name_service = Orocos::CORBA.name_service, &block) - if timeout == 0 - return unless process.alive? - - # Use custom block to check if the process is reachable - if block_given? - block.call(process) - else - # Get any task name from that specific deployment, and check we - # can access it. If there is none - all_reachable = process.task_names.all? do |task_name| - if name_service.task_reachable?(task_name) - Orocos.debug "#{task_name} is reachable" - true - else - Orocos.debug "could not access #{task_name}, #{name} is not running yet ..." - false - end - end - if all_reachable - Orocos.info "all tasks of #{process.name} are reachable, assuming it is up and running" - end - all_reachable - end - else - start_time = Time.now - got_alive = process.alive? - loop do - break if wait_running(process, 0, name_service, &block) - break unless timeout - # This formulation allows timeout to be infinite - break if timeout < Time.now - start_time - - if got_alive && !process.alive? - raise Orocos::NotFound, "#{process.name} was started but crashed" - end - - sleep 0.1 - end - - unless process.alive? - raise Orocos::NotFound, "cannot get a running #{process.name} module" - end - - true + # @param [Orocos::Process] process the process object + # @return [Hash] hash with + # { task name => task context objet } + # @raise Orocos::IORNotRegisteredError when an IOR is not registered for the given + # task name. + # @raise Orocos::NotFound if the process dies during execution + # @raise Orocos::InvalidIORMessage if the message received is invalid + def self.resolve_all_tasks(process) + process.task_names.each_with_object({}) do |task_name, resolved_tasks| + resolved_tasks[task_name] = process.task(task_name) end end - def resolve_all_tasks(cache = Hash.new, name_service: Orocos::CORBA.name_service) - Process.resolve_all_tasks(self, cache) do |task_name| - name_service.get(task_name) - end + # See #Orocos::Process.resolve_all_tasks + def resolve_all_tasks + Orocos::Process.resolve_all_tasks(self) end - # Wait for the module to be started. If timeout is 0, the function - # returns immediately, with a false return value if the module is not - # started yet and a true return value if it is started. - # - # Otherwise, it waits for the process to start for the specified amount - # of seconds. It will throw Orocos::NotFound if the process was not - # started within that time. - # - # If timeout is nil, the method will wait indefinitely - def wait_running(timeout = nil, name_service = Orocos::CORBA.name_service) - Process.wait_running(self, timeout, name_service) - end - SIGNAL_NUMBERS = { 'SIGABRT' => 1, 'SIGINT' => 2, @@ -1277,7 +1315,7 @@ def kill(wait = true, signal = nil, cleanup: !signal, hard: false) if cleanup clean_shutdown = true begin - each_task do |task| + tasks.each do |task| if !self.class.try_task_cleanup(task) clean_shutdown = false break From c6b84da52ee43aeeba297e133afee64776cc45ad Mon Sep 17 00:00:00 2001 From: Jhonas Date: Wed, 19 Oct 2022 16:34:17 -0300 Subject: [PATCH 11/15] feat: update ruby tasks to use IORs to resolve the tasks This implements for ruby tasks what was done for Orocos::Process. Since the ruby tasks already know their IOR from the beginning, all this does is look for each task's IOR on the deployed tasks map. The pipeline is the same as the Orocos::Process --- lib/orocos/ruby_tasks/process.rb | 38 +++++++++++++++--------- lib/orocos/ruby_tasks/process_manager.rb | 13 ++++++++ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/lib/orocos/ruby_tasks/process.rb b/lib/orocos/ruby_tasks/process.rb index 090d84e5..6aefd59a 100644 --- a/lib/orocos/ruby_tasks/process.rb +++ b/lib/orocos/ruby_tasks/process.rb @@ -53,6 +53,9 @@ def pid; ::Process.pid end # @return [Class] attr_reader :task_context_class + # The ior mappings of the deployed tasks + attr_reader :ior_mappings + # Creates a new ruby task process # # @param [nil,#dead_deployment] ruby_process_server the process manager @@ -64,6 +67,7 @@ def initialize(ruby_process_server, name, model, task_context_class: TaskContext @ruby_process_server = ruby_process_server @deployed_tasks = Hash.new @task_context_class = task_context_class + @ior_mappings = nil super(name, model) end @@ -74,32 +78,38 @@ def spawn(options = Hash.new) model.task_activities.each do |deployed_task| name = get_mapped_name(deployed_task.name) Orocos.allow_blocking_calls do - deployed_tasks[name] = task_context_class. - from_orogen_model(name, deployed_task.task_model) + deployed_tasks[name] = + task_context_class.from_orogen_model(name, + deployed_task.task_model) end end @alive = true end - # Waits for the tasks to be ready - # - # This is a no-op for ruby tasks as they are ready as soon as they are - # created - def wait_running(blocking = false) - true + # The ruby tasks are already ready, so all this is does is to get the IOR mappings + # from them. The wait_running method name is maintained to keep the API closer to + # the remote process'. + def wait_running + (@ior_mappings = deployed_tasks.transform_values(&:ior)) unless @ior_mappings + @ior_mappings end def task(task_name) - if t = deployed_tasks[task_name] + if (t = deployed_tasks[task_name]) t - else raise ArgumentError, "#{self} has no task called #{task_name}, known tasks: #{deployed_tasks.keys.sort.join(", ")}" + else + raise ArgumentError, + "#{self} has no task called #{task_name}, known tasks: "\ + "#{deployed_tasks.keys.sort.join(', ')}" end end - def resolve_all_tasks(cache = Hash.new) - Orocos::Process.resolve_all_tasks(self, cache) do |task_name| - task(task_name) - end + def resolve_all_tasks + deployed_tasks + end + + def define_ior_mappings(ior_mappings) + @ior_mappings = ior_mappings end def kill(_wait = true, status = ProcessManager::Status.new(exit_code: 0), **) diff --git a/lib/orocos/ruby_tasks/process_manager.rb b/lib/orocos/ruby_tasks/process_manager.rb index 2a05b988..65db1601 100644 --- a/lib/orocos/ruby_tasks/process_manager.rb +++ b/lib/orocos/ruby_tasks/process_manager.rb @@ -85,6 +85,19 @@ def wait_termination(timeout = nil) result end + def wait_running(*process_names) + process_ior_mappings = {} + process_names.each do |name| + if deployments[name]&.resolve_all_tasks + process_ior_mappings[name] = { iors: deployments[name].wait_running } + else + process_ior_mappings[name] = + { error: "#{name} is not a valid process in the deployment" } + end + end + process_ior_mappings + end + # Requests to stop the given deployment # # The call does not block until the process has quit. You will have to From 596cae6639aa9aab13c0daa129231dc55e947670 Mon Sep 17 00:00:00 2001 From: Jhonas Date: Wed, 5 Oct 2022 10:43:29 -0300 Subject: [PATCH 12/15] feat(test): add unit tests for ior pipe related behavior --- test/test_process.rb | 193 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 173 insertions(+), 20 deletions(-) diff --git a/test/test_process.rb b/test/test_process.rb index f7cacbd4..88b3dfe3 100644 --- a/test/test_process.rb +++ b/test/test_process.rb @@ -10,7 +10,7 @@ project = OroGen::Spec::Project.new(@loader) project.default_task_superclass = OroGen::Spec::TaskContext.new( project, 'base::Task', subclasses: false - ) + ) @task_m = OroGen::Spec::TaskContext.new project, 'test::Task' @deployment_m = OroGen::Spec::Deployment.new project, 'test_deployment' @default_deployment_m = OroGen::Spec::Deployment.new project, 'orogen_default_test__Task' @@ -183,51 +183,204 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) end end end - + describe "#spawn" do it "starts a new process and waits for it with a timeout" do - process = Orocos::Process.new('process') + process = Orocos::Process.new("process") # To ensure that the test teardown will kill it processes << process - process.spawn :wait => 10 - Orocos.get "process_Test" + process.spawn wait: 10 + process.task("process_Test") assert(process.alive?) assert(process.running?) end it "starts a new process and waits for it without a timeout" do - process = Orocos::Process.new('process') + process = Orocos::Process.new("process") # To ensure that the test teardown will kill it processes << process - process.spawn :wait => true - Orocos.get "process_Test" + process.spawn wait: true + process.task("process_Test") assert(process.alive?) assert(process.running?) end it "can automatically add prefixes to tasks" do - process = Orocos::Process.new 'process' + process = Orocos::Process.new "process" begin - process.spawn :prefix => 'prefix' - assert_equal Hash["process_Test" => "prefixprocess_Test"], process.name_mappings - assert Orocos.name_service.get('prefixprocess_Test') + process.spawn prefix: "prefix" + assert_equal Hash["process_Test" => "prefixprocess_Test"], + process.name_mappings + assert process.task("prefixprocess_Test") + ensure process.kill end end it "can rename single tasks" do - process = Orocos::Process.new 'process' + process = Orocos::Process.new "process" begin process.map_name "process_Test", "prefixprocess_Test" process.spawn - assert Orocos.name_service.get('prefixprocess_Test') + assert process.task("prefixprocess_Test") ensure process.kill end end end + describe "#load_and_validate_ior_message" do + it "loads and validates the ior message when it is valid" do + process = Orocos::Process.new("process") + message = "{\"process_Test\": \"IOR:123456\"}" + result = process.load_and_validate_ior_message(message) + assert_equal(result, JSON.parse(message)) + end + + it "returns nil when the ior message is not parseable" do + process = Orocos::Process.new("process") + # Missing the `}` + message = "{\"process_Test\": \"IOR:123456\"" + result = process.load_and_validate_ior_message(message) + assert_nil(result) + end + + it "raises invalid ior message when a task is not included in the ior message" do + process = Orocos::Process.new("process") + message = "{\"another_process\": \"IOR:123456\"}" + error = assert_raises(Orocos::InvalidIORMessage) do + process.load_and_validate_ior_message(message) + end + expected_error = Orocos::InvalidIORMessage.new( + "the following tasks were present on the ior message but werent in the " \ + "process task names: [\"another_process\"]" + ) + assert_equal(expected_error.message, error.message) + end + end + + describe "#wait_running" do + attr_reader :process + before do + @message = "{\"process_Test\": \"IOR:123456\"}" + @process = Orocos::Process.new("process") + flexmock(process).should_receive(:task) + end + + it "resolves the ior mappings" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + process.spawn(wait: false) + flexmock(read).should_receive(:read_nonblock) + .and_return(@message) + .and_return do + raise EOFError + end + result = process.wait_running(0) + assert_equal(JSON.parse(@message), result) + end + + it "stops waiting after the timeout was reached" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read).should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + t = Thread.new do + result = process.wait_running(0.1) + assert_nil(result) + end + t.join + end + + it "parses the message correctly even when it was first received partially" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + initial_message = @message.slice(0, 4) + rest_of_the_message = @message.slice(4, @message.length) + + read_mock = flexmock(read).should_receive(:read_nonblock) + read_mock.and_return(initial_message, rest_of_the_message) + read_mock.and_raise(EOFError) + process.spawn(wait: false) + result = process.wait_running(0.1) + assert_equal(JSON.parse(@message), result) + end + + it "stops waiting if the process has crashed" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read) + .should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + + alive_mock = flexmock(process).should_receive(:alive?) + alive_mock.and_return(true).ordered + alive_mock.and_return(false).ordered + e = assert_raises(Orocos::NotFound) do + process.wait_running(2) + end + assert_equal("process was started but crashed", e.message) + end + + it "raises if the process is dead after waiting" do + read, write = IO.pipe + flexmock(IO).should_receive(:pipe) + .and_return([read, write]) + flexmock(read) + .should_receive(:read_nonblock) + .and_raise(IO::EAGAINWaitReadable) + process.spawn(wait: false) + + alive_mock = flexmock(process).should_receive(:alive?) + alive_mock.and_return(false).ordered + e = assert_raises(Orocos::NotFound) do + process.wait_running(2) + end + assert_equal("cannot get a running process module", e.message) + end + end + + describe "#resolve_all_tasks" do + attr_reader :process + before do + @process = Orocos::Process.new("process") + end + + it "returns all the process tasks when their ior is registered" do + process.spawn(wait: false) + process.wait_running(0.1) + result = process.resolve_all_tasks + assert_includes(result, "process_Test") + assert_equal("process_Test", result["process_Test"].name) + end + + it "returns the already defined process tasks without checking their ior registration" do + process.spawn(wait: false) + process.wait_running(0.1) + process.resolve_all_tasks + spy = flexmock(:on, Orocos::Process) + process.resolve_all_tasks + assert_spy_not_called(spy, :ior_for) + end + + it "raises when ior is not registered" do + process.spawn(wait: false) + process.wait_running(0.1) + flexmock(process).should_receive(:ior_for) + e = assert_raises(Orocos::IORNotRegisteredError) do + process.resolve_all_tasks + end + assert("no IOR was registered for process", e.message) + end + end + describe "#kill" do it "stops a running process and clean up the name server" do Orocos.run('process') do |process| @@ -240,7 +393,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) it "stops the task if it is running" do Orocos.run("process") do |process| - task = Orocos.get "process_Test" + task = process.task("process_Test") state = nil flexmock(::Process) .should_receive(:kill) @@ -254,7 +407,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) it "does not attempt to stop the task if cleanup is false" do Orocos.run("process") do |process| - task = Orocos.get "process_Test" + task = process.task("process_Test") state = nil flexmock(::Process) .should_receive(:kill) @@ -292,7 +445,7 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) describe "#task" do it "can get a reference on a deployed task context by name" do Orocos.run('process') do |process| - assert(direct = Orocos::TaskContext.get('process_Test')) + assert(direct = process.task('process_Test')) assert(indirect = process.task("Test")) assert_equal(direct, indirect) end @@ -316,13 +469,13 @@ def parse_run_options(wrapper_name, arg, options: wrapper_options) describe "run" do it "can start a process with a prefix" do Orocos.run('process' => 'prefix') do |process| - assert(Orocos::TaskContext.get('prefixprocess_Test')) + assert(process.task('prefixprocess_Test')) end end it "can wait for the process to be running without a timeout" do - Orocos.run 'process', :wait => true do - Orocos.get 'process_Test' + Orocos.run 'process', wait: true do |process| + process.task("process_Test") end end end From c24d238cfdf7e0152ab64d92f30861a320f1f88a Mon Sep 17 00:00:00 2001 From: Sylvain Date: Mon, 26 Jun 2023 16:38:04 -0300 Subject: [PATCH 13/15] chore: remove obsolete log management code --- lib/orocos/task_context.rb | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/lib/orocos/task_context.rb b/lib/orocos/task_context.rb index 49a704f2..fceb46d8 100644 --- a/lib/orocos/task_context.rb +++ b/lib/orocos/task_context.rb @@ -115,12 +115,6 @@ def #{m}(wait_for_completion = true, polling = 0.05) EOD end - # The logger task that should be used to log data that concerns this - # task - # - # @return [#log] - attr_accessor :logger - # A new TaskContext instance representing the # remote task context with the given IOR # @@ -135,10 +129,6 @@ def #{m}(wait_for_completion = true, polling = 0.05) def initialize(ior, name: do_real_name, model: nil, **other_options) super(name, model: model, **other_options) @ior = ior - - if process && (process.default_logger_name != name) - self.logger = process.default_logger - end end def ping @@ -230,26 +220,6 @@ def rtt_state @state_symbols[value] end - # Connects all ports of the task with the logger of the deployment - # @param [Hash] options option hash to exclude specific ports - # @option options [String,Array] :exclude_ports The name of the excluded ports - # @return [Set] Sets of task and port names - # - # @example logging all ports beside a port called frame - # task.log_all_ports(:exclude_ports => "frame") - def log_all_ports(options = Hash.new) - # Right now, the only allowed option is :exclude_ports - options, logger_options = Kernel.filter_options options,:exclude_ports => nil - exclude_ports = Array(options[:exclude_ports]) - - logger_options[:tasks] = Regexp.new(basename) - ports = Orocos.log_all_process_ports(process,logger_options) do |port| - !exclude_ports.include? port.name - end - raise "#{name}: no ports were selected for logging" if ports.empty? - ports - end - def create_property_log_stream(p) stream_name = "#{self.name}.#{p.name}" if !configuration_log.has_stream?(stream_name) From 2fc55fa7e9ff16547df399ceb7e9e472a2b7d08e Mon Sep 17 00:00:00 2001 From: Sylvain Date: Tue, 20 Apr 2021 19:39:16 -0300 Subject: [PATCH 14/15] async: review and refactor interaction between main thread and worker threads Turns out that the async code was actually doing a lot of sync stuff behind the scenes, which was causing very noticeable lag and blocking behaviors when on bad connections. The first change was to create main_thread_call helper. One marks a "main thread only" method like so ~~~ main_thread_call def some_method end ~~~ and method calls not in the main thread will cause an exception to be raised. This is rather obviously meant for debugging. From there, I've fixed calls that were coming from a worker thread but should not have. In most cases, the pattern I had to change was ~~~ def some_method @event_loop.defer ... do do some async call in sync form (i.e. without blocks) end end ~~~ This was causing a lot of headaches as the code that turns async calls into sync calls depends on a lot of synchronization (e.g. with @delegator_obj), which was solved by having huge @mutex.synchronize blocks, serializing major parts of the execution. These calls have been changed into the non-blocking block form (a.k.a. callback form), and are called from the main thread. One major change is the removal of sub-genres of Async::NameService. These were rather pointless, as turning a sync into an async object can be done generically through to_async. The "new" Async::NameService simply delegates to the non-async get in a worker thread and then turns the non-async object into an async object. Another major change is how Async::TaskContext creates itself. The very unfortunate design choice in the async and proxy version of TaskContext was to let them get a name as argument, and let them do the resolution. This brings a lot of complexity in the async case, as there is a zone where the async task context does not have an underlying task context. Proxies are meant for that, not async. Generally speaking, this interface should be changed to do the name resolution in name_service (duh). --- lib/orocos/async/async.rb | 22 +- lib/orocos/async/attributes.rb | 33 +- lib/orocos/async/name_service.rb | 267 ++++--------- lib/orocos/async/object_base.rb | 190 +++++---- lib/orocos/async/orocos.rb | 4 +- lib/orocos/async/ports.rb | 2 +- lib/orocos/async/task_context.rb | 52 +-- lib/orocos/async/task_context_base.rb | 318 ++++++++------- lib/orocos/async/task_context_proxy.rb | 517 +++++++++++-------------- lib/orocos/base.rb | 2 +- lib/orocos/corba.rb | 4 +- lib/orocos/name_service.rb | 4 +- lib/orocos/ros/async.rb | 3 +- lib/orocos/test.rb | 12 +- test/async/test_attributes.rb | 125 +++--- test/async/test_name_service.rb | 140 +------ test/async/test_task.rb | 99 +++-- test/async/test_task_proxy.rb | 79 ++-- 18 files changed, 782 insertions(+), 1091 deletions(-) diff --git a/lib/orocos/async/async.rb b/lib/orocos/async/async.rb index 7a921eac..b84b5ae6 100644 --- a/lib/orocos/async/async.rb +++ b/lib/orocos/async/async.rb @@ -30,7 +30,7 @@ # puts state # end # reader.read do |value| -# puts value +# puts value # end # # If a method call needs the remote Orocos Task which is currently not reachable @@ -44,13 +44,13 @@ # short on worker threads. # # # these events are generated by polling -# task.on_connect do +# task.on_connect do # puts "connected" # end -# task.on_disconnect do +# task.on_disconnect do # puts "disconnected" # end -# task.on_reconnect do +# task.on_reconnect do # puts "reconnected" # end # @@ -65,12 +65,22 @@ # port.on_new_data do |data| # puts data # end -# +# # The polling frequency can be changed by setting the period attribute of each # asynchronous object. # module Orocos::Async - KNOWN_ERRORS = [Orocos::ComError,Orocos::NotFound,Typelib::NotFound,Orocos::TypekitTypeNotFound,Orocos::TypekitTypeNotExported,Orocos::StateTransitionFailed,Orocos::ConnectionFailed,OroGen::DefinitionTypekitNotFound] + KNOWN_ERRORS = [ + Orocos::ComError, + Orocos::NotFound, + Typelib::NotFound, + Orocos::TypekitTypeNotFound, + Orocos::TypekitTypeNotExported, + Orocos::StateTransitionFailed, + Orocos::ConnectionFailed, + OroGen::DefinitionTypekitNotFound + ] + class << self extend ::Forwardable diff --git a/lib/orocos/async/attributes.rb b/lib/orocos/async/attributes.rb index 3a115f40..5ed78f9b 100644 --- a/lib/orocos/async/attributes.rb +++ b/lib/orocos/async/attributes.rb @@ -17,25 +17,26 @@ def initialize(async_task,attribute,options=Hash.new) disable_emitting do reachable!(attribute) end - @poll_timer = @event_loop.async_every(method(:raw_read), {:period => period, :start => false, - :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error| + + @poll_timer = @event_loop.async_every( + proc { @delegator_obj.raw_read }, + { period: period, start: false, + known_errors: Orocos::Async::KNOWN_ERRORS } + ) do |data, error| if error @poll_timer.cancel self.period = @poll_timer.period - @event_loop.once do - event :error,error - end - else - if data - if @raw_last_sample != data - @raw_last_sample = data - event :raw_change,data - event :change,Typelib.to_ruby(data) - end + event :error, error + elsif data + if @raw_last_sample != data + @raw_last_sample = data + event :raw_change, data + event :change, Typelib.to_ruby(data) end end end @poll_timer.doc = attribute.full_name + @task.on_unreachable do unreachable! end @@ -92,14 +93,10 @@ def wait(timeout = 5.0) def really_add_listener(listener) super if listener.event == :raw_change - if !@poll_timer.running? - @poll_timer.start(period) - end + @poll_timer.start(period) unless @poll_timer.running? listener.call(@raw_last_sample) if @raw_last_sample && listener.use_last_value? elsif listener.event == :change - if !@poll_timer.running? - @poll_timer.start(period) - end + @poll_timer.start(period) unless @poll_timer.running? listener.call(Typelib.to_ruby(@raw_last_sample)) if @raw_last_sample && listener.use_last_value? end end diff --git a/lib/orocos/async/name_service.rb b/lib/orocos/async/name_service.rb index 0e7b14e8..5b245e94 100644 --- a/lib/orocos/async/name_service.rb +++ b/lib/orocos/async/name_service.rb @@ -36,7 +36,7 @@ class NameServiceBase < ObjectBase # context proxy # @return [void] define_events :task_added - + # @!method on_task_added # # Registers an event callback that will receive task names when the task @@ -50,34 +50,37 @@ class NameServiceBase < ObjectBase self.default_period = 1.0 - def initialize(name_service,options = Hash.new) - @options ||= Kernel.validate_options options,:period => default_period,:start => false,:sync_key => nil,:known_errors => Orocos::Async::KNOWN_ERRORS,:event_loop => Orocos::Async.event_loop - @stored_names ||= Set.new - _,options_async = Kernel.filter_options @options,:event_loop=>nil - super(name_service.name,@options[:event_loop]) - disable_emitting do - reachable! name_service - end - @watchdog_timer = @event_loop.async_every method(:names),options_async do |names| - names.each do |name| - n = @stored_names.add? name - event :task_added,name if n - end - @stored_names.delete_if do |name| - if !names.include?(name) - event :task_removed,name + def initialize(name_service, options = {}) + @options = Kernel.validate_options( + options, { period: default_period, start: false, sync_key: nil, + known_errors: Orocos::Async::KNOWN_ERRORS, + event_loop: Orocos::Async.event_loop } + ) + + @stored_names = Set.new + _, options_async = Kernel.filter_options @options, event_loop: nil + super(name_service.name, @options[:event_loop]) + + disable_emitting { reachable!(name_service) } + @watchdog_timer = + @event_loop.async_every(method(:names), options_async) do |names| + names.each do |name| + event(:task_added, name) if @stored_names.add?(name) + end + + @stored_names.delete_if do |name| + next if names.include?(name) + + event :task_removed, name true - else - false end end - end @watchdog_timer.doc = name - @task_context_proxies = Array.new + @task_context_proxies = [] end def really_add_listener(listener) - if listener.event == :task_added || listener.event == :task_removed + if listener.event == :task_added || listener.event == :task_removed @watchdog_timer.start unless @watchdog_timer.running? if listener.use_last_value? && !@stored_names.empty? @stored_names.each do |name| @@ -132,14 +135,14 @@ class NameService < NameServiceBase define_events :name_service_added, :name_service_removed def initialize(*name_services) - options = if name_services.last.is_a? Hash + options = if name_services.last.kind_of?(Hash) name_services.pop else - Hash.new + {} end - name_services = name_services.map { |ns| ns.to_async } - name_service = Orocos::NameService.new *name_services - super(name_service,options) + + name_services = [Orocos::NameService.new(*name_services)] + super(name_services.first, options) end def clear @@ -147,9 +150,9 @@ def clear orig_clear end - def proxy(name,options = Hash.new) - if(name_services.empty?) - Vizkit.error "Orocos is not initialized!" unless Orocos.initialized? + def proxy(name, options = {}) + if name_services.empty? + Orocos.error "Orocos is not initialized!" unless Orocos.initialized? raise "No name service available." end super @@ -173,18 +176,16 @@ def add_listener(listener) # # Emits the name_service_added event def add(name_service) - name_service = name_service.to_async orig_add(name_service) - event :name_service_added,name_service + event :name_service_added, name_service end # (see Orocos::NameServiceBase#add_front) # # Emits the name_service_added event def add_front(name_service) - name_service = name_service.to_async orig_add_front(name_service) - event :name_service_added,name_service + event :name_service_added, name_service end # (see Orocos::NameServiceBase#remove) @@ -193,195 +194,57 @@ def add_front(name_service) def remove(name_service) removed = false name_services.delete_if do |ns| - if name_service == ns || ns.delegator_obj == ns - true - end + removed = true if name_service == ns end + if removed - event :name_service_removed,name_service + event :name_service_removed, name_service true end end - private - # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop, :known_errors => Orocos::Async::KNOWN_ERRORS do - methods = Orocos::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::NameService.instance_methods + [:method_missing] - def_delegator :add,:alias => :orig_add - def_delegator :add_front,:alias => :orig_add_front - def_delegator :clear,:alias => :orig_clear - def_delegators methods - end - end - - module Local - class NameService < NameServiceBase - extend Utilrb::EventLoop::Forwardable - - def initialize(options = Hash.new) - options,other_options = Kernel.filter_options options, - :tasks => Array.new - - name_service = Orocos::Local::NameService.new options[:tasks] - super(name_service,other_options) - end - - def get(name,options=Hash.new,&block) - async_options,other_options = Kernel.filter_options options, - :sync_key => nil, - :raise => nil, - :event_loop => @event_loop, - :period => nil, - :wait => nil - - if block - p = proc do |task,error| - task = task.to_async(async_options) unless error - if block.arity == 2 - block.call task,error - elsif !error - block.call task - end - end - orig_get name,other_options,&p - else - task = orig_get name,other_options - task.to_async(async_options) - end - end - - private - # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop,:known_errors=> Orocos::Async::KNOWN_ERRORS do - methods = Orocos::Local::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::Local::NameService.instance_methods + [:method_missing] - def_delegators methods - def_delegator :get,:alias => :orig_get - end - end - end - - # Base class for name services that are accessed remotely (e.g. over the - # network) - class RemoteNameService < NameServiceBase - extend Utilrb::EventLoop::Forwardable - - def initialize(name_service,options = Hash.new) - options = Kernel.validate_options options, - :reconnect => true, - :known_errors => Array.new - - @reconnect = options.delete(:reconnect) - options[:known_errors].concat([Orocos::ComError,Orocos::NotFound]) - super(name_service,options) - @namespace = name_service.namespace - end - - # True if this name service should automatically reconnect - # @return [Boolean] - def reconnect?; @reconnect end - - def unreachable!(options = Hash.new) - @watchdog_timer.stop - if !valid_delegator? - raise "This should never happen. There must be always a valid delegator obj" - end - - if reconnect? && options.has_key?(:error) - obj = @delegator_obj - obj.reset - timer = @event_loop.async_every obj.method(:names),:period => 1.0,:sync_key => nil,:known_errors => [Orocos::NotFound,Orocos::ComError] do |names,error| - if error - obj.reset - else - reachable!(obj) - @watchdog_timer.start - timer.stop - end - end - timer.doc = "NameService #{name} reconnect" - else - end - super - end - - def name - @delegator_obj.name - end - - def get(name,options=Hash.new,&block) - async_options,other_options = Kernel.filter_options options, - :sync_key => nil,:raise => nil,:event_loop => @event_loop, - :period => nil,:wait => nil + def get(name, options = {}, &block) + async_options, other_options = Kernel.filter_options( + options, + { + sync_key: nil, raise: nil, event_loop: @event_loop, + period: nil, wait: nil + } + ) if block - p = proc do |task,error| - async_options[:use] = task - atask = if !error - task.to_async(async_options) - end + p = proc do |task, error| + task = task.to_async(async_options) unless error if block.arity == 2 - block.call atask,error + block.call task, error elsif !error - block.call atask + block.call task end - end - orig_get name,other_options,&p + end + orig_get name, other_options, &p else - task = orig_get name,other_options - task.to_async(Hash[:use => task].merge(async_options)) + task = orig_get name, other_options + task.to_async(async_options) end end - private # add methods which forward the call to the underlying name service - forward_to :@delegator_obj,:@event_loop, :known_errors => [Orocos::ComError,Orocos::NotFound], :on_error => :error do - methods = Orocos::NameServiceBase.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} - methods -= Orocos::Async::RemoteNameService.instance_methods + [:method_missing] - thread_safe do - def_delegators methods - def_delegator :get,:alias => :orig_get - end - end - - def error(e) - emit_error e if !e.is_a? Orocos::NotFound + forward_to :@delegator_obj,:@event_loop, :known_errors => Orocos::Async::KNOWN_ERRORS do + methods = Orocos::NameService.instance_methods.find_all{|method| nil == (method.to_s =~ /^do.*/)} + methods -= Orocos::Async::NameService.instance_methods + [:method_missing] + def_delegator :add, alias: :orig_add + def_delegator :add_front, alias: :orig_add_front + def_delegator :clear, alias: :orig_clear + def_delegator :get, alias: :orig_get + def_delegators methods end end module CORBA - class << self - def name_service=(service) - Orocos::Async.name_service.name_services.each_with_index do |i,val| - if val == @delegator_obj - Orocos::Async.name_service.name_services[i] = service - break - end - end - reachable! service - end - def name_service - @name_service ||= NameService.new(Orocos::CORBA.name_service.ip) - end - - def get(name,options =Hash.new) - name_service.get(name,options) - end - - def proxy(name,options = Hash.new) - name_service.proxy(name,options) - end - end - - class NameService < RemoteNameService - extend Utilrb::EventLoop::Forwardable - - def initialize(host = "", reconnect: true) - name_service = Orocos::CORBA::NameService.new(host) - super(name_service, reconnect: reconnect) + class NameService + def self.new(*args) + Orocos::CORBA::NameService.new(*args).to_async end end end end - diff --git a/lib/orocos/async/object_base.rb b/lib/orocos/async/object_base.rb index 117636d2..da7b8b21 100644 --- a/lib/orocos/async/object_base.rb +++ b/lib/orocos/async/object_base.rb @@ -1,18 +1,15 @@ module Orocos::Async - class EventListener attr_reader :event attr_reader :last_args - def initialize(obj,event,use_last_value=true,&block) + def initialize(obj, event, use_last_value = true, &block) + raise ArgumentError, "no object given" unless obj + @block = block - if !obj - raise ArgumentError, "no object given" - end @obj = obj @event = event @use_last_value = use_last_value - @last_args end # returns true if the listener shall be called @@ -39,8 +36,7 @@ def start(use_last_value = @use_last_value) self end - #return true if the listener is listing to - #the event + # Whether the listener is currently started def listening? @obj.listener?(self) end @@ -52,25 +48,32 @@ def call(*args) end end + # Null object for the delegated object in ObjectBase class DelegatorDummy attr_reader :event_loop attr_reader :name + def initialize(parent,name,event_loop) @parent = parent @name = name @event_loop = event_loop end + def respond_to_missing?(m, _include_private = false) + true + end + def method_missing(m,*args,&block) return super if m == :to_ary + error = Orocos::NotFound.new "#{@name} is not reachable while accessing #{m}" error.set_backtrace(Kernel.caller) - if !block + raise error unless block + + @event_loop.defer on_error: @parent.method(:emit_error), + callback: block, + known_errors: Orocos::Async::KNOWN_ERRORS do raise error - else - @event_loop.defer :on_error => @parent.method(:emit_error),:callback => block,:known_errors => Orocos::Async::KNOWN_ERRORS do - raise error - end end end end @@ -156,7 +159,6 @@ def validate_event(name) attr_reader :event_loop attr_reader :options - attr_accessor :emitting attr_accessor :name define_events :error,:reachable,:unreachable @@ -170,40 +172,64 @@ def validate_event(name) # @return [Array] attr_reader :pending_adds - def initialize(name,event_loop) - raise ArgumentError, "no name was given" if !name + def initialize(name, event_loop) + raise ArgumentError, "object name cannot be nil" unless name + @listeners ||= Hash.new{ |hash,key| hash[key] = []} - @proxy_listeners ||= Hash.new{|hash,key| hash[key] = Hash.new} + @proxy_listeners ||= Hash.new{ |hash,key| hash[key] = {} } @name ||= name @event_loop ||= event_loop - @options ||= Hash.new - @emitting = true - @pending_adds = Array.new + @options ||= {} + @pending_adds = [] + invalidate_delegator! on_error do |e| - if e.kind_of?(Orocos::ComError) - unreachable!(:error => e) - end + unreachable!(error: e) if e.kind_of?(Orocos::ComError) + end + end + + def self.main_thread_call(name) + alias_name = "__main_thread_call_#{name}" + return if method_defined?(alias_name) + + alias_method alias_name, name + define_method name do |*args, **kw, &block| + return send(alias_name, *args, **kw, &block) if @event_loop.thread? + + raise "#{name} called outside of event loop thread" end + name + end + + def assert_in_event_loop_thread + @event_loop.validate_thread end def invalidate_delegator! - @delegator_obj = DelegatorDummy.new self,@name,@event_loop + @delegator_obj = DelegatorDummy.new(self, @name, @event_loop) end - # sets @emitting to value for the time the given block is called - def emitting(value,&block) - old,@emitting = @emitting,value + def emitting? + !Thread.current["__#{self}_disable_emitting"] + end + + # Disable event emission while the block is called + # + # This is thread-safe + def emitting(value, &block) + old = emitting? + Thread.current["__#{self}_disable_emitting"] = !value + instance_eval(&block) ensure - @emitting = old + Thread.current["__#{self}_disable_emitting"] = !old end def disable_emitting(&block) - emitting(false,&block) + emitting(false, &block) end - #returns true if the event is known + # Returns true if the event is known def valid_event?(event) self.class.valid_event?(event) end @@ -216,24 +242,24 @@ def event_names self.class.event_names end - def on_event(event,use_last_value=true,&block) + main_thread_call def on_event(event, use_last_value = true, &block) event = validate_event event - EventListener.new(self,event,use_last_value,&block).start + EventListener.new(self, event, use_last_value, &block).start end # returns the number of listener for the given event - def number_of_listeners(event) + main_thread_call def number_of_listeners(event) event = validate_event event @listeners[event].size end - #returns true if the listener is active - def listener?(listener) + # Returns true if the listener is active + main_thread_call def listener?(listener) @listeners[listener.event].include? listener end - #returns the listeners for the given event - def listeners(event) + # Returns the listeners for the given event + main_thread_call def listeners(event) event = validate_event event @listeners[event] end @@ -241,10 +267,11 @@ def listeners(event) # adds a listener to obj and proxies # event like it would be emitted from self # - # if no listener is registererd to event it + # if no listener is registererd to event it # also removes the listener from obj - def proxy_event(obj,*events) + main_thread_call def proxy_event(obj,*events) return if obj == self + events = events.flatten events.each do |e| if existing = @proxy_listeners[obj].delete(e) @@ -257,8 +284,9 @@ def proxy_event(obj,*events) end end - def remove_proxy_event(obj,*events) + main_thread_call def remove_proxy_event(obj,*events) return if obj == self + events = events.flatten if events.empty? remove_proxy_event(obj,@proxy_listeners[obj].keys) @@ -272,17 +300,23 @@ def remove_proxy_event(obj,*events) end end - def add_listener(listener) - event = validate_event listener.event + main_thread_call def add_listener(listener) + validate_event(listener.event) return listener if pending_adds.include? listener + pending_adds << listener + + # We queue the addition so that evenst that are currently queued are not + # given to the new listeners. event_loop.once do expected = pending_adds.shift # 'expected' is nil if the listener has been removed before this # block got processed if expected if expected != listener - raise RuntimeError, "internal error in #{self}#add_listener: pending addition and expected addition mismatch" + raise RuntimeError, + "internal error in #{self}#add_listener: "\ + "pending addition and expected addition mismatch" end really_add_listener(listener) end @@ -290,16 +324,16 @@ def add_listener(listener) listener end - def really_add_listener(listener) + main_thread_call def really_add_listener(listener) if listener.use_last_value? if listener.event == :reachable listener.call if valid_delegator? elsif listener.event == :unreachable - listener.call if !valid_delegator? + listener.call unless valid_delegator? end end @proxy_listeners.each do |obj,listeners| - if l = listeners[listener.event] + if (l = listeners[listener.event]) if listener.use_last_value? && !listener.last_args # replay last value if requested obj.really_add_listener(listener) @@ -308,14 +342,17 @@ def really_add_listener(listener) l.start(false) unless l.listening? end end - @listeners[listener.event] << listener unless @listeners[listener.event].include?(listener) + unless @listeners[listener.event].include?(listener) + @listeners[listener.event] << listener + end listener end - def remove_listener(listener) - if idx = pending_adds.index(listener) + main_thread_call def remove_listener(listener) + if (idx = pending_adds.index(listener)) pending_adds[idx] = nil end + @listeners[listener.event].delete listener # Check whether the only listeners left are proxy listeners. If they @@ -332,10 +369,11 @@ def remove_listener(listener) # calls all listener which are registered for the given event # the next step def event(event_name,*args,&block) + return unless emitting? + validate_event event_name - return unless @emitting @event_loop.once do - process_event event_name,*args,&block + process_event event_name, *args, &block end self end @@ -347,52 +385,58 @@ def wait(timeout = 5.0) @event_loop.wait_for do if timeout && timeout <= Time.now-time Utilrb::EventLoop.cleanup_backtrace do - raise Orocos::NotFound,"#{self.class}: #{respond_to?(:full_name) ? full_name : name} is not reachable after #{timeout} seconds" + name = respond_to?(:full_name) ? full_name : name + raise Orocos::NotFound, + "#{self.class}: #{name} is not reachable "\ + "after #{timeout} seconds" end end - reachable? + valid_delegator? end self end - # TODO CODE BLOCK - def reachable?(&block) - valid_delegator? + main_thread_call def reachable? + if block_given? + yield(valid_delegator?) + else + valid_delegator? + end end - def reachable!(obj,options = Hash.new) + main_thread_call def reachable!(obj, _options = {}) @delegator_obj = obj event :reachable if valid_delegator? end - def unreachable!(options = Hash.new) - if valid_delegator? - invalidate_delegator! - event :unreachable - end + main_thread_call def unreachable!(_options = {}) + return unless valid_delegator? + + invalidate_delegator! + event :unreachable end def valid_delegator? - !@delegator_obj.is_a? DelegatorDummy + !@delegator_obj.kind_of?(DelegatorDummy) end - def remove_all_listeners - !@listeners.each do |event,listeners| - while !listeners.empty? - remove_listener listeners.first - end + main_thread_call def remove_all_listeners + @listeners.dup.each_value do |listeners| + remove_listener(listeners.first) until listeners.empty? end end private + # calls all listener which are registered for the given event - def process_event(event_name,*args,&block) - event = validate_event event_name + main_thread_call def process_event(event_name, *args, &block) + validate_event(event_name) args = block.call if block - #@listeners have to be cloned because it might get modified - #by listener.call + + # @listeners have to be cloned because it might get modified + # by listener.call @listeners[event_name].clone.each do |listener| - listener.call *args + listener.call(*args) end self end diff --git a/lib/orocos/async/orocos.rb b/lib/orocos/async/orocos.rb index 3fc8213f..a92b11aa 100644 --- a/lib/orocos/async/orocos.rb +++ b/lib/orocos/async/orocos.rb @@ -79,12 +79,12 @@ class TaskContext def to_async(options = Hash.new) options[:name] ||= name options[:ior] ||= ior - Orocos::Async::CORBA::TaskContext.new(options) + Orocos::Async::CORBA::TaskContext.new(options.merge(use: self)) end def to_proxy(options = Hash.new) options[:use] ||= to_async - # use name service to check if there is already + # use name service to check if there is already # a proxy for the task Orocos::Async.proxy(name,options) end diff --git a/lib/orocos/async/ports.rb b/lib/orocos/async/ports.rb index cf76ee72..eec575de 100644 --- a/lib/orocos/async/ports.rb +++ b/lib/orocos/async/ports.rb @@ -361,7 +361,7 @@ def add_listener(listener) def remove_listener(listener) super - if number_of_listeners(:data) == 0 && number_of_listeners(:raw_data) == 0 && @global_reader + if number_of_listeners(:data) == 0 && number_of_listeners(:raw_data) == 0 && @global_reader&.valid_delegator? remove_proxy_event(@global_reader) @global_reader.disconnect{} # call it asynchron @global_reader = nil diff --git a/lib/orocos/async/task_context.rb b/lib/orocos/async/task_context.rb index eecb1a38..1580022d 100644 --- a/lib/orocos/async/task_context.rb +++ b/lib/orocos/async/task_context.rb @@ -1,6 +1,8 @@ module Orocos::Async::CORBA class TaskContext < Orocos::Async::TaskContextBase + attr_reader :ior + # A TaskContext # # If not specified the default option settings are: @@ -25,23 +27,19 @@ class TaskContext < Orocos::Async::TaskContextBase # @overload initialize(options) # @overload initialize(task,options) # @option options [#ior,#name] :task a task context. - def initialize(ior,options=Hash.new) - if ior.respond_to?(:ior) - ior = ior.ior - end - ior,options = if ior.is_a? Hash - [nil,ior] - else - [ior,options] - end - ior ||= if options.has_key? :ior - options[:ior] - elsif options.has_key? :use - options[:use].ior - end + def initialize(ior, options = {}) + ior = ior.ior if ior.respond_to?(:ior) + ior, options = + if ior.kind_of?(Hash) + [nil, ior] + else + [ior, options] + end + + ior ||= options[:ior] || options[:use]&.ior name = options[:name] || ior - super(name,options.merge(:ior => ior)) @ior = ior.to_str + super(name, options.merge(ior: ior)) end def really_add_listener(listener) @@ -51,16 +49,8 @@ def really_add_listener(listener) # to prevent different behaviors depending on # the calling order if listener.use_last_value? && listener.event == :state_change - state = @mutex.synchronize do - @delegator_obj.current_state if valid_delegator? - end - event_loop.once{listener.call state} if state - end - end - - def ior - @mutex.synchronize do - @ior.dup if @ior + state = @delegator_obj.current_state if valid_delegator? + event_loop.once{listener.call(state)} if state end end @@ -69,9 +59,7 @@ def ior # @option options [String] name the task name # @option options [String] ior the task IOR def configure_delegation(options = Hash.new) - options = Kernel.validate_options options, - :name=> nil, - :ior => nil + options = Kernel.validate_options options, name: nil, ior: nil ior = options[:ior] @ior,@name = if valid_delegator? @@ -82,9 +70,7 @@ def configure_delegation(options = Hash.new) [ior, @name] end - if !@ior - raise ArgumentError, "no IOR or task has been given" - end + raise ArgumentError, "no IOR or task has been given" unless @ior end def respond_to_missing?(method_name, include_private = false) @@ -105,7 +91,9 @@ def method_missing(m,*args) # Called by #task_context to create the underlying task context object def access_remote_task_context - Orocos::TaskContext.new @ior ,:name => @name + task = Orocos::TaskContext.new(@ior, name: @name) + name = @name || task.name + [task, name] end # add methods which forward the call to the underlying task context diff --git a/lib/orocos/async/task_context_base.rb b/lib/orocos/async/task_context_base.rb index d24615cb..abf9cf50 100644 --- a/lib/orocos/async/task_context_base.rb +++ b/lib/orocos/async/task_context_base.rb @@ -4,6 +4,8 @@ class TaskContextBase < Orocos::Async::ObjectBase extend Orocos::Async::ObjectBase::Periodic::ClassMethods include Orocos::Async::ObjectBase::Periodic + attr_reader :name + self.default_period = 1.0 define_events :port_reachable, @@ -22,7 +24,7 @@ def self.to_ruby(task) begin t = Orocos::CORBA.name_service.get(task.basename) raise "Cannot create ruby task for #{task.name} "\ - "because there is already a task #{t.name} "\ + "because is already a task #{t.name} "\ "registered on the main CORBA name service." rescue Orocos::NotFound end @@ -57,36 +59,77 @@ def self.to_ruby(task) # returned by the method attr_predicate :raise_on_access_error?, true - def initialize(name,options=Hash.new) - event_loop,reachable_options = Kernel.filter_options options,:event_loop => Orocos::Async.event_loop - super(name,event_loop[:event_loop]) - @mutex = Mutex.new + def initialize(name, options = {}) + self_options, reachable_options = + Kernel.filter_options( + options, + { event_loop: Orocos::Async.event_loop, wait: true } + ) + super(name, self_options[:event_loop]) + @last_state = nil - @port_names = Array.new - @property_names = Array.new - @attribute_names = Array.new - - watchdog_proc = Proc.new do - ping # call a method which raises ComError if the connection died - # this is used to disconnect the task by an error handler - [states,port_names,property_names,attribute_names] + @port_names = [] + @property_names = [] + @attribute_names = [] + + @ready = false + ready = [] + resolution_in_progress = {} + @watchdog_timer = + @event_loop.every(default_period, false) do + %I[states port_names property_names attribute_names].each do |type| + next if resolution_in_progress[type] + + send(type) do |result, _| + if result + send("process_#{type}", result) + (ready << type).uniq! + @ready = ready.size == 4 + end + resolution_in_progress[type] = false + end + resolution_in_progress[type] = true + end + end + @watchdog_timer.doc = name + + if reachable_options[:use] + reachable!(reachable_options) + elsif self_options[:wait] + begin + resolved_task_context(access_remote_task_context, + nil, reachable_options) + rescue *Orocos::Async::KNOWN_ERRORS => e # Lint: + resolved_task_context(nil, e, reachable_options) + end + else + @event_loop.defer( + callback: proc { |t, e| resolved_task_context(t, e, reachable_options) }, + known_errors: Orocos::Async::KNOWN_ERRORS + ) do + access_remote_task_context + end end + end - @watchdog_timer = @event_loop.async_every(watchdog_proc,{:period => default_period, - :default => [[],[],[],[]], - :start => false, - :sync_key => nil, #is blocked by the methods call ping, states, etc - :known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error| - process_states(data[0]) - process_port_names(data[1]) - process_property_names(data[2]) - process_attribute_names(data[3]) - end - @watchdog_timer.doc = name - reachable!(reachable_options) + def wait(timeout = 5) + super + @event_loop.wait_for(timeout) { @ready } end - def to_async(options=Hash.new) + def resolved_task_context(result, error, reachable_options) + if error + @access_error = error + invalidate_delegator! + raise error if raise_on_access_error? + else + task_context, name = *result + @name = name if name + reachable!(reachable_options.merge(use: task_context)) + end + end + + def to_async(_options = {}) self end @@ -112,7 +155,7 @@ def ruby_task_context? !!@ruby_task_context end - def really_add_listener(listener) + main_thread_call def really_add_listener(listener) return super unless listener.use_last_value? # call new listeners with the current value @@ -143,12 +186,6 @@ def really_add_listener(listener) super end - def name - @mutex.synchronize do - @name.dup if @name - end - end - # Initiates the binding of the underlying sychronous access object to # this async object. # @@ -167,38 +204,36 @@ def name # @option options [Boolean] raise (false) if set, the #task_context # method will raise if the task context cannot be accessed on first # try. Otherwise, it will try to access it forever until it finds it. - def reachable!(options = Hash.new) - @mutex.synchronize do - options, configure_options = Kernel.filter_options options, - :watchdog => true, - :period => default_period, - :wait => false, - :use => nil, - :raise => false - - self.raise_on_access_error = options[:raise] - - if options[:use] - @delegator_obj = options[:use] - @watchdog_timer.doc = @delegator_obj.name - else - invalidate_delegator! - end - - configure_delegation(configure_options) - - @watchdog_timer.start(options[:period],false) if options[:watchdog] - @event_loop.async(method(:task_context)) + main_thread_call def reachable!(options = {}) + options, configure_options = Kernel.filter_options( + options, + { + watchdog: true, + period: default_period, + wait: false, + use: nil, + raise: false + } + ) + + self.raise_on_access_error = options[:raise] + + if options[:use] + @delegator_obj = options[:use] + @watchdog_timer.doc = @delegator_obj.name + else + invalidate_delegator! end - wait if options[:wait] + + configure_delegation(configure_options) + @watchdog_timer.start(options[:period], false) if options[:watchdog] end # Called by #reachable! to do subclass-specific configuration # # @param [Hash] configure_options all options passed to #reachable! that # are not understood by #reachable! - def configure_delegation(configure_options = Hash.new) - end + main_thread_call def configure_delegation(configure_options = {}); end # Disconnectes self from the remote task context and returns its underlying # object used to communicate with the remote task (designated object). @@ -208,45 +243,33 @@ def configure_delegation(configure_options = Hash.new) # # @param [Exception] reason The reason for the disconnect # @return [Orocos::TaskContext,nil,Utilrb::EventLoop::Event] - def unreachable!(options = Hash.new) + main_thread_call def unreachable!(options = {}) options = Kernel.validate_options options, :error + # ensure that this is always called from the # event loop thread - @event_loop.call do - old_task = @mutex.synchronize do - if valid_delegator? - @access_error = options.delete(:error) || - ArgumentError.new("cannot access the remote task context for an unknown reason") - task = @delegator_obj - invalidate_delegator! - @watchdog_timer.cancel if @watchdog_timer - task - end - end - clear_interface - event :unreachable if old_task - old_task + if valid_delegator? + @access_error = + options.delete(:error) || + ArgumentError.new("could not access the remote task context "\ + "for an unknown reason") + + old_task = @delegator_obj + invalidate_delegator! + @watchdog_timer.cancel if @watchdog_timer end + + clear_interface + event :unreachable if old_task + old_task end - def clear_interface + main_thread_call def clear_interface process_port_names process_attribute_names process_property_names end - def reachable?(&block) - if block - ping(&block) - else - ping - end - true - rescue Orocos::NotFound,Orocos::ComError => e - unreachable!(:error => e) - false - end - # Helper method to setup async calls # # Asynchronous calls can be called either with a callback or without. In @@ -270,12 +293,12 @@ def reachable?(&block) # # @return [Object] in the synchronous case, the method returns the # underlying method's return value. In the asynchronous case TODO - def call_with_async(method_name,user_callback,to_async_options,*args) - p = proc do |object,error| - async_object = object.to_async(Hash[:use => self].merge(to_async_options)) + def call_with_async(method_name, user_callback, to_async_options, *args) + p = proc do |object, error| + async_object = object&.to_async({ use: self }.merge(to_async_options)) if user_callback if user_callback.arity == 2 - user_callback.call(async_object,error) + user_callback.call(async_object, error) else user_callback.call(async_object) end @@ -284,22 +307,22 @@ def call_with_async(method_name,user_callback,to_async_options,*args) end end if user_callback - send(method_name,*args,&p) + send(method_name, *args, &p) else - async_object = send(method_name,*args) - p.call async_object,nil + async_object = send(method_name, *args) + p.call async_object, nil end end - def attribute(name,options = Hash.new,&block) + main_thread_call def attribute(name,options = Hash.new,&block) call_with_async(:orig_attribute,block,options,name) end - def property(name,options = Hash.new,&block) + main_thread_call def property(name,options = Hash.new,&block) call_with_async(:orig_property,block,options,name) end - def port(name, verify = true,options=Hash.new, &block) + main_thread_call def port(name, verify = true,options=Hash.new, &block) call_with_async(:orig_port,block,options,name,verify) end @@ -308,13 +331,10 @@ def port(name, verify = true,options=Hash.new, &block) # # Enumerates the properties that are available on # this task, as instances of Orocos::Attribute - def each_property(&block) - if !block_given? - return enum_for(:each_property) - end - property_names.each do |name| - yield(property(name)) - end + main_thread_call def each_property + return enum_for(__method__) unless block_given? + + property_names.each { |name| yield(property(name)) } self end @@ -323,13 +343,10 @@ def each_property(&block) # # Enumerates the attributes that are available on # this task, as instances of Orocos::Attribute - def each_attribute(&block) - if !block_given? - return enum_for(:each_attribute) - end - attribute_names.each do |name| - yield(attribute(name)) - end + def each_attribute + return enum_for(__method__) unless block_given? + + attribute_names.each { |name| yield(attribute(name)) } self end @@ -338,13 +355,10 @@ def each_attribute(&block) # # Enumerates the ports that are available on this task, as instances of # either Orocos::InputPort or Orocos::OutputPort - def each_port(&block) - if !block_given? - return enum_for(:each_port) - end - port_names.each do |name| - yield(port(name)) - end + def each_port + return enum_for(__method__) unless block_given? + + port_names.each { |name| yield(port(name)) } self end @@ -366,25 +380,28 @@ def each_port(&block) def_delegators methods end + main_thread_call :orig_port + main_thread_call :orig_property + main_thread_call :orig_attribute + # must be called from the event loop thread - def process_states(states=[]) - if !states.empty? - # We don't use #event here. The callbacks, when using #event, - # would be called delayed and therefore task.state would not be - # equal to the state passed to the callback - blocks = listeners :state_change - states.each do |s| - next if @last_state == s - @last_state = s - blocks.each do |b| - b.call(s) - end + main_thread_call def process_states(states = []) + # We don't use #event here. The callbacks, when using #event, would + # be called delayed and therefore task.state would not be equal to + # the state passed to the callback + blocks = listeners :state_change + states.each do |s| + next if @last_state == s + + @last_state = s + blocks.each do |b| + b.call(s) end end end # must be called from the event loop thread - def process_port_names(port_names=[]) + main_thread_call def process_port_names(port_names = []) added_ports = port_names - @port_names deleted_ports = @port_names - port_names deleted_ports.each do |name| @@ -398,7 +415,7 @@ def process_port_names(port_names=[]) end # must be called from the event loop thread - def process_property_names(property_names=[]) + main_thread_call def process_property_names(property_names=[]) added_properties = property_names - @property_names deleted_properties = @property_names - property_names deleted_properties.each do |name| @@ -412,7 +429,7 @@ def process_property_names(property_names=[]) end # must be called from the event loop thread - def process_attribute_names(attribute_names=[]) + main_thread_call def process_attribute_names(attribute_names=[]) added_properties = attribute_names - @attribute_names deleted_properties = @attribute_names - attribute_names deleted_properties.each do |name| @@ -430,39 +447,10 @@ def process_attribute_names(attribute_names=[]) # the @delegator_obj instance variable must not be directly accessed # without proper synchronization. def task_context - @mutex.synchronize do - begin - task = if valid_delegator? - @delegator_obj - elsif @access_error # do not try again - raise @access_error - else - obj = access_remote_task_context - @name = obj.name - @watchdog_timer.doc = @name - port_names = obj.port_names - property_names = obj.property_names - attribute_names = obj.attribute_names - state = obj.state - @event_loop.once do - process_states([state]) - process_port_names(port_names) - process_property_names(property_names) - process_attribute_names(attribute_names) - end - @delegator_obj = obj - event :reachable - obj - end - [task,nil] - rescue Exception => e - @access_error = e - invalidate_delegator! - raise e if raise_on_access_error? # do not be silent if - [nil,@access_error] - end - end + return [@delegator_obj, nil] if valid_delegator? + raise @access_error if @access_error # do not try again + + [nil, Orocos::ComError.new("accessing async task context before it is ready")] end end end - diff --git a/lib/orocos/async/task_context_proxy.rb b/lib/orocos/async/task_context_proxy.rb index 596e85b9..c7b3cd71 100644 --- a/lib/orocos/async/task_context_proxy.rb +++ b/lib/orocos/async/task_context_proxy.rb @@ -20,6 +20,15 @@ def initialize(task_proxy,attribute_name,options=Hash.new) @raw_last_sample = nil end + def to_s + typename = @type&.name || "unknown_t" + "#<#{self.class.name} #{task.name}.#{name}[#{typename}]>" + end + + def invalidate_delegator! + super + end + def task @task_proxy end @@ -34,6 +43,7 @@ def type_name def type raise Orocos::NotFound, "#{self} is not reachable" unless @type + @type end @@ -59,22 +69,17 @@ def reachable!(attribute,options = Hash.new) remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? @raw_last_sample = attribute.raw_last_sample super(attribute,options) - proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable]) - rescue Orocos::NotFound - unreachable! + proxy_event(@delegator_obj, @delegator_obj.event_names - [:reachable]) end def unreachable!(options=Hash.new) remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? + super(options) end def period - if @options.has_key? :period - @options[:period] - else - nil - end + @options[:period] end def period=(period) @@ -168,7 +173,8 @@ def initialize(task_proxy,port_name,options=Hash.new) end def to_s - "#" + typename = @type&.name || "unknown_t" + "#<#{self.class.name} #{@task_proxy.name}.#{name}[#{typename}]>" end def type_name @@ -181,6 +187,7 @@ def full_name def type raise Orocos::NotFound, "#{self} is not reachable" unless @type + @type end @@ -246,8 +253,6 @@ def reachable!(port,options = Hash.new) elsif number_of_listeners(:data) != 0 raise RuntimeError, "Port #{name} is an input port but callbacks for on_data are registered" end - rescue Orocos::NotFound - unreachable! end def unreachable!(options = Hash.new) @@ -258,6 +263,7 @@ def unreachable!(options = Hash.new) # returns a sub port for the given subfield def sub_port(subfield) raise RuntimeError , "Port #{name} is not an output port" if !output? + SubPortProxy.new(self,subfield) end @@ -460,14 +466,19 @@ class TaskContextProxy < ObjectBase methods -= TaskContextProxy.instance_methods + [:method_missing,:reachable?,:port] def_delegators :@delegator_obj,*methods - def initialize(name,options=Hash.new) - @options,@task_options = Kernel.filter_options options,{:name_service => Orocos::Async.name_service, - :event_loop => Orocos::Async.event_loop, - :reconnect => true, - :retry_period => Orocos::Async::TaskContextBase.default_period, - :use => nil, - :raise => false, - :wait => nil } + def initialize(name, options = {}) + @options, @task_options = Kernel.filter_options( + options, + { + name_service: Orocos::Async.name_service, + event_loop: Orocos::Async.event_loop, + reconnect: true, + retry_period: Orocos::Async::TaskContextBase.default_period, + use: nil, + raise: false, + wait: nil + } + ) @name_service = @options[:name_service] self.namespace,name = split_name(name) @@ -476,76 +487,67 @@ def initialize(name,options=Hash.new) @task_options[:event_loop] = @event_loop @mutex = Mutex.new - @ports = Hash.new - @attributes = Hash.new - @properties = Hash.new - @resolve_timer = @event_loop.async_every(@name_service.method(:get), - {:period => @options[:retry_period],:start => false}, - self.name,@task_options) do |task_context,error| - if error - case error - when Orocos::NotFound, Orocos::ComError - raise error if @options[:raise] - :ignore_error - else - raise error - end - else - @resolve_timer.stop - if !task_context.respond_to?(:event_loop) - raise "TaskProxy is using a name service#{@name_service} which is returning #{task_context.class} but Async::TaskContext was expected." - end - @event_loop.async_with_options(method(:reachable!),{:sync_key => self,:known_errors => Orocos::Async::KNOWN_ERRORS},task_context) do |val,error| - if error - @resolve_timer.start - :ignore_error - end + @ports = {} + @attributes = {} + @properties = {} + + get_in_progress = false + @resolve_timer = @event_loop.every(@options[:retry_period], false) do + unless get_in_progress + get_in_progress = true + @name_service.get(self.name, @task_options) do |task_context, error| + get_in_progress = false + process_resolved_task_context(task_context, error) end end end + @resolve_timer.doc = "#{name} reconnect" - on_port_reachable(false) do |name| - p = @ports[name] - if p && !p.reachable? - error_callback = Proc.new do |error| - p.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_port(p) - end - end + on_port_reachable(false) do |port_name| + object_reachable_handler(@ports, port_name, "port") end - on_property_reachable(false) do |name| - p = @properties[name] - if(p && !p.reachable?) - error_callback = Proc.new do |error| - p.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_property(p) - end - end + on_property_reachable(false) do |property_name| + object_reachable_handler(@properties, property_name, "property") end - on_attribute_reachable(false) do |name| - a = @attributes[name] - if(a && !a.reachable?) - error_callback = Proc.new do |error| - a.emit_error(error) - end - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS,:on_error => error_callback do - connect_attribute(a) - end - end + on_attribute_reachable(false) do |attribute_name| + object_reachable_handler(@attributes, attribute_name, "attribute") end - @resolve_timer.doc = "#{name} reconnect" - if @options.has_key?(:use) + if @options[:use] reachable!(@options[:use]) else reconnect(@options[:wait]) end end + def process_resolved_task_context(task_context, error) + if error + case error + when Orocos::NotFound, Orocos::ComError + raise error if @options[:raise] + + return + else + raise error + end + end + + @resolve_timer.stop + unless task_context.respond_to?(:event_loop) + raise "TaskProxy is using a name service#{@name_service} "\ + "which is returning #{task_context.class} but "\ + "Async::TaskContext was expected." + end + + task_context.reachable? do |_, e| + if e + @resolve_timer.start + else + reachable!(task_context) + end + end + end + def name map_to_namespace(@name) end @@ -572,67 +574,58 @@ def reconnect(wait_for_task = false) wait if wait_for_task == true end - def property(name,options = Hash.new) + main_thread_call def property(name, options = Hash.new) name = name.to_str - options,other_options = Kernel.filter_options options,:wait => @options[:wait] + options,other_options = Kernel.filter_options options, wait: @options[:wait] wait if options[:wait] - p = @mutex.synchronize do - @properties[name] ||= PropertyProxy.new(self,name,other_options) - end + p = (@properties[name] ||= PropertyProxy.new(self,name,other_options)) if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type other_options.delete(:type) end + if !other_options.empty? && p.options != other_options - Orocos.warn "Property #{p.full_name}: is already initialized with options: #{p.options}" + Orocos.warn "Property #{p.full_name}: is already initialized "\ + "with options: #{p.options}" Orocos.warn "ignoring options: #{other_options}" end - return p if !reachable? || p.reachable? - if options[:wait] - connect_property(p) - p.wait - else - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do - connect_property(p) - end - end + return p if !valid_delegator? || p.valid_delegator? + + connect_property(p) + p.wait if options[:wait] p end - def attribute(name,options = Hash.new) + main_thread_call def attribute(name,options = Hash.new) name = name.to_str options,other_options = Kernel.filter_options options,:wait => @options[:wait] wait if options[:wait] - a = @mutex.synchronize do - @attributes[name] ||= AttributeProxy.new(self,name,other_options) - end + a = (@attributes[name] ||= AttributeProxy.new(self,name,other_options)) if other_options.has_key?(:type) && a.type? && other_options[:type] == a.type other_options.delete(:type) end + if !other_options.empty? && a.options != other_options Orocos.warn "Attribute #{a.full_name}: is already initialized with options: #{a.options}" Orocos.warn "ignoring options: #{other_options}" end - return a if !reachable? || a.reachable? - if options[:wait] - connect_attribute(a) - a.wait - else - @event_loop.defer :known_errors => Orocos::Async::KNOWN_ERRORS do - connect_attribute(a) - end - end + return a if !valid_delegator? || a.valid_delegator? + + connect_attribute(a) + a.wait if options[:wait] a end - def port(name,options = Hash.new) + main_thread_call def port(name, options = Hash.new, &callback) name = name.to_str - options,other_options = Kernel.filter_options options,:wait => @options[:wait] + options,other_options = Kernel.filter_options( + options, wait: @options[:wait] + ) wait if options[:wait] # support for subports @@ -652,28 +645,22 @@ def port(name,options = Hash.new) nil end - p = @mutex.synchronize do - @ports[name] ||= PortProxy.new(self,name,other_options) - end + p = (@ports[name] ||= PortProxy.new(self,name,other_options)) if other_options.has_key?(:type) && p.type? && other_options[:type] == p.type other_options.delete(:type) end + if !other_options.empty? && p.options != other_options Orocos.warn "Port #{p.full_name}: is already initialized with options: #{p.options}" Orocos.warn "ignoring options: #{other_options}" end - if reachable? && !p.reachable? - if options[:wait] - connect_port(p) - p.wait - else - @event_loop.defer :known_errors => KNOWN_ERRORS do - connect_port(p) - end - end + if valid_delegator? && !p.valid_delegator? + connect_port(p) + p.wait if options[:wait] end + if fields.empty? p else @@ -681,37 +668,28 @@ def port(name,options = Hash.new) end end - def ports(options = Hash.new,&block) - p = proc do |names| - names.map{|name| port(name,options)} - end - if block - port_names(&p) - else - p.call(port_names) - end + def ports + if block_given? + yield(@ports.values) + else + @ports.values + end end - def properties(&block) - p = proc do |names| - names.map{|name| property(name)} - end - if block - property_names(&p) - else - p.call(property_names) - end + def properties + if block_given? + yield(@properties.values) + else + @properties.values + end end - def attributes(&block) - p = proc do |names| - names.map{|name| attribute(name)} - end - if block - attribute_names(&p) - else - p.call(attribute_names) - end + def attributes + if block_given? + yield(@attributes.values) + else + @attributes.values + end end # call-seq: @@ -720,13 +698,7 @@ def attributes(&block) # Enumerates the properties that are available on # this task, as instances of Orocos::Attribute def each_property(&block) - if !block_given? - return enum_for(:each_property) - end - names = property_names - names.each do |name| - yield(property(name)) - end + @properties.each_value(&block) end # call-seq: @@ -735,14 +707,7 @@ def each_property(&block) # Enumerates the attributes that are available on # this task, as instances of Orocos::Attribute def each_attribute(&block) - if !block_given? - return enum_for(:each_attribute) - end - - names = attribute_names - names.each do |name| - yield(attribute(name)) - end + @attributes.each_value(&block) end # call-seq: @@ -751,53 +716,43 @@ def each_attribute(&block) # Enumerates the ports that are available on this task, as instances of # either Orocos::InputPort or Orocos::OutputPort def each_port(&block) - if !block_given? - return enum_for(:each_port) + @ports.each_value(&block) + end + + def reachable!(task_context, options = Hash.new) + if task_context.kind_of?(TaskContextProxy) + raise ArgumentError, + "task_context must not be instance of TaskContextProxy" + elsif !task_context.respond_to?(:event_names) + raise ArgumentError, + "task_context must be an async instance, got #{task_context.class}" end - port_names.each do |name| - yield(port(name)) + @last_task_class ||= task_context.class + if @last_task_class != task_context.class + Orocos.warn "Class mismatch: TaskContextProxy #{name} was recently "\ + "connected to #{@last_task_class} and is now connected "\ + "to #{task_context.class}." + @last_task_class = task_context.class end - self - end - # must be thread safe - def reachable!(task_context,options = Hash.new) - raise ArgumentError, "task_context must not be instance of TaskContextProxy" if task_context.is_a?(TaskContextProxy) - raise ArgumentError, "task_context must be an async instance but is #{task_context.class}" if !task_context.respond_to?(:event_names) - @mutex.synchronize do - @last_task_class ||= task_context.class - if @last_task_class != task_context.class - Vizkit.warn "Class missmatch: TaskContextProxy #{name} was recently connected to #{@last_task_class} and is now connected to #{task_context.class}." - @last_task_class = task_context.class - end + if valid_delegator? + remove_proxy_event(@delegator_obj, @delegator_obj.event_names) + end - remove_proxy_event(@delegator_obj,@delegator_obj.event_names) if valid_delegator? - if @delegator_obj_old - remove_proxy_event(@delegator_obj_old,@delegator_obj_old.event_names) - @delegator_obj_old = nil - end - super(task_context,options) + if @delegator_obj_old + remove_proxy_event(@delegator_obj_old, @delegator_obj_old.event_names) + @delegator_obj_old = nil + end - # check if the requested ports are available - @ports.values.each do |port| - unless task_context.port_names.include? port.name - Orocos.warn "task #{name} has currently no port called #{port.name} - on_data will be called when the port was added" - end - end - @attributes.values.each do |attribute| - unless task_context.attribute_names.include? attribute.name - Orocos.warn "task #{name} has currently no attribute called #{attribute.name} - on_change will be called when the attribute was added" - end - end - @properties.values.each do |property| - unless task_context.property_names.include? property.name - Orocos.warn "task #{name} has currently no property called #{property.name} - on_change will be called when the property was added" - end - end + super(task_context, options) + + # this is emitting on_port_reachable, on_property_reachable .... + proxy_event(@delegator_obj, @delegator_obj. + event_names - [:reachable, :unreachable]) - # this is emitting on_port_reachable, on_property_reachable .... - proxy_event(@delegator_obj,@delegator_obj.event_names-[:reachable]) + @delegator_obj.on_unreachable do + unreachable! end end @@ -806,139 +761,111 @@ def reachable? super && @delegator_obj.reachable? end rescue Orocos::NotFound => e - unreachable! :error => e,:reconnect => @options[:reconnect] + unreachable! error: e, reconnect: @options[:reconnect] false end - def unreachable!(options = {:reconnect => false}) - Kernel.validate_options options,:reconnect,:error - @mutex.synchronize do - # do not stop proxing events here (see reachable!) - # otherwise unrechable event might get lost - @delegator_obj_old = if valid_delegator? - @delegator_obj - else - @delegator_obj_old - end + def unreachable!(options = {}) + return unless valid_delegator? - disable_emitting do - super(options) + Kernel.validate_options options, :reconnect, :error + @delegator_obj_old = + @mutex.synchronize do + # do not stop proxing events here (see reachable!) + # otherwise unrechable event might get lost + if valid_delegator? + @delegator_obj + else + @delegator_obj_old + end end - end + + super(options) disconnect_ports disconnect_attributes disconnect_properties - re = if options.has_key?(:reconnect) - options[:reconnect] - else - @options[:reconnect] - end - reconnect if re + reconnect if options.fetch(:reconnect, @options[:reconnect]) end private - # blocking call shoud be called from a different thread - # all private methods must be thread safe - def connect_port(port) - return if port.reachable? - p = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - #called in the context of @delegator_obj - begin - port(port.name,true,port.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no port called #{port.name}" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{port.name} -- #{e}" - raise - end - end + + def connect_async_object(object, callback) + delegator_obj = @delegator_obj + return unless delegator_obj + + finalizer = lambda do |resolved, _error| + object.reachable!(resolved) if resolved && !object.valid_delegator? + callback&.call(object) end - @event_loop.call do - port.reachable!(p) unless port.reachable? + + # called in the context of @delegator_obj + begin + yield(delegator_obj, finalizer) # this is yield(@delegator_obj) + rescue Orocos::NotFound + Orocos.warn "task #{name} has currently no port called #{port.name}" + raise + rescue Orocos::CORBA::ComError => e + Orocos.warn "task #{name} with error on port: #{port.name} -- #{e}" + raise end end - def disconnect_ports - ports = @mutex.synchronize do - @ports.values + # blocking call shoud be called from a different thread + # all private methods must be thread safe + main_thread_call def connect_port(port, &callback) + connect_async_object(port, callback) do |delegator_obj, finalizer| + delegator_obj.port(port.name, true, port.options, &finalizer) end + end + + main_thread_call def disconnect_ports + ports = @mutex.synchronize { @ports.values.dup } ports.each(&:unreachable!) end # blocking call shoud be called from a different thread - def connect_attribute(attribute) - return if attribute.reachable? - a = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - #called in the context of @delegator_obj - begin - attribute(attribute.name,attribute.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no attribtue called #{attribute.name} -> on_change will not be called!" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{attribute.name} -- #{e}" - raise - end - end - end - @event_loop.call do - attribute.reachable!(a) unless attribute.reachable? + main_thread_call def connect_attribute(attribute, &callback) + connect_async_object(attribute, callback) do |delegator_obj, finalizer| + delegator_obj.attribute(attribute.name, attribute.options, &finalizer) end end - def disconnect_attributes - attributes = @mutex.synchronize do - @attributes.values - end + main_thread_call def disconnect_attributes + attributes = @mutex.synchronize { @attributes.values.dup } attributes.each(&:unreachable!) end # blocking call shoud be called from a different thread - def connect_property(property) - return if property.reachable? - p = @mutex.synchronize do - return unless valid_delegator? - @delegator_obj.disable_emitting do - begin - property(property.name,property.options) - rescue Orocos::NotFound - Orocos.warn "task #{name} has currently no property called #{property.name} -> on_change will not be called!" - raise - rescue Orocos::CORBA::ComError => e - Orocos.warn "task #{name} with error on port: #{property.name} -- #{e}" - raise - end - end - end - @event_loop.call do - property.reachable!(p) unless property.reachable? + main_thread_call def connect_property(property, &callback) + connect_async_object(property, callback) do |delegator_obj, finalizer| + delegator_obj.property(property.name, property.options, &finalizer) end end - def disconnect_properties - properties = @mutex.synchronize do - @properties.values - end + main_thread_call def disconnect_properties + @mutex.synchronize { pp @properties.keys } + properties = @mutex.synchronize { @properties.values.dup } properties.each(&:unreachable!) end def respond_to_missing?(method_name, include_private = false) - (reachable? && @delegator_obj.respond_to?(method_name)) || super + @delegator_obj.respond_to?(method_name) || super end - def method_missing(m,*args) - if respond_to_missing?(m) - event_loop.sync(@delegator_obj,args) do |args| - @delegator_obj.method(m).call(*args) - end - else - super + def method_missing(m, *args) + return super unless respond_to_missing?(m) + + event_loop.sync(@delegator_obj, args) do |_args| + @delegator_obj.method(m).call(*args) end end + + def object_reachable_handler(map, name, type) + obj = map[name] + return unless valid_delegator? + return if !obj || obj.valid_delegator? + + send("connect_#{type}", obj) + end end end diff --git a/lib/orocos/base.rb b/lib/orocos/base.rb index ca71fef1..8507c43e 100644 --- a/lib/orocos/base.rb +++ b/lib/orocos/base.rb @@ -298,7 +298,7 @@ def self.initialize(name = "orocosrb_#{::Process.pid}") self.name_service << Orocos::ROS.name_service end if defined?(Orocos::Async) - Orocos.name_service.name_services.each do |ns| + Orocos.name_service.each do |ns| Orocos::Async.name_service.add(ns) end end diff --git a/lib/orocos/corba.rb b/lib/orocos/corba.rb index 5856eac5..2c34a42e 100644 --- a/lib/orocos/corba.rb +++ b/lib/orocos/corba.rb @@ -117,9 +117,9 @@ def self.refine_exceptions(obj0, obj1 = nil) # :nodoc: rescue ComError => e if !obj1 - raise ComError, "Communication failed with corba #{obj0}", e.backtrace + raise ComError, "Communication failed with corba #{obj0} - #{e}", e.backtrace else - raise ComError, "communication failed with either #{obj0} or #{obj1}", e.backtrace + raise ComError, "communication failed with either #{obj0} or #{obj1} - #{e}", e.backtrace end end end diff --git a/lib/orocos/name_service.rb b/lib/orocos/name_service.rb index 67104871..1cba950a 100644 --- a/lib/orocos/name_service.rb +++ b/lib/orocos/name_service.rb @@ -416,7 +416,7 @@ def name # Returns an Async object that maps to this name service def to_async(options = Hash.new) - Orocos::Async::Local::NameService.new(:tasks => registered_tasks) + Orocos::Async::NameService.new(self.class.new(registered_tasks)) end #(see NameServiceBase#get) @@ -575,7 +575,7 @@ def bind(task, name) # @param (see Orocos::Async::CORBA::NameService#initialize) # @return [Orocos::Async::CORBA::NameService] def to_async(reconnect: true) - Orocos::Async::CORBA::NameService.new(ip, reconnect: reconnect) + Orocos::Async::NameService.new(self.class.new(ip)) end # Resets the CORBA name service client. diff --git a/lib/orocos/ros/async.rb b/lib/orocos/ros/async.rb index 2e38affe..adf4ad3c 100644 --- a/lib/orocos/ros/async.rb +++ b/lib/orocos/ros/async.rb @@ -38,7 +38,8 @@ def configure_delegation(options) end def access_remote_task_context - Orocos::ROS::Node.new(@name_service, @server, @name) + node = Orocos::ROS::Node.new(@name_service, @server, @name) + [node, name] end # add methods which forward the call to the underlying task context diff --git a/lib/orocos/test.rb b/lib/orocos/test.rb index ebb5f8c3..801beeb9 100644 --- a/lib/orocos/test.rb +++ b/lib/orocos/test.rb @@ -189,8 +189,18 @@ def assert_state_equals(state, task, timeout = 1) flunk("#{task} was expected to be in state #{state} but is in #{task.state}") end - def wait_for(timeout = 5, &block) + def wait_for(timeout = 5, msg = nil, &block) Orocos::Async.wait_for(0.005, timeout, &block) + rescue Utilrb::EventLoop::WaitForTimeout + raise unless msg + + msg = msg.call if msg.respond_to?(:call) + raise Utilrb::EventLoop::WaitForTimeout, "timed out waiting for #{msg}" + end + + def wait_for_equality(expected, value, timeout: 5) + msg = proc { "#{value} to equal #{expected}" } + wait_for(timeout, msg) { expected == value } end def name_service diff --git a/test/async/test_attributes.rb b/test/async/test_attributes.rb index 58676d82..77245128 100644 --- a/test/async/test_attributes.rb +++ b/test/async/test_attributes.rb @@ -1,49 +1,38 @@ -require 'orocos/test' -require 'orocos/async' +require "orocos/test" +require "orocos/async" describe Orocos::Async::CORBA::Property do - before do + before do Orocos::Async.clear + @ns = Orocos::Async::NameService.new + @ns << Orocos::CORBA::NameService.new + + start "process" + @task = @ns.get("process_Test") end - describe "When connect to a remote task" do - it "must return a property object" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = t1.property("prop1") - p.must_be_kind_of Orocos::Async::CORBA::Property - end + describe "When connect to a remote task" do + it "returns a property synchronously" do + p = @task.property("prop1") + assert_kind_of Orocos::Async::CORBA::Property, p end - it "must asynchronously return a property" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = nil - t1.property("prop1") do |prop| - p = prop - end - sleep 0.1 - Orocos::Async.step - p.must_be_kind_of Orocos::Async::CORBA::Property - end + it "returns a property asynchronously" do + p = nil + @task.property("prop1") { |prop| p = prop } + wait_for { p } + assert_kind_of Orocos::Async::CORBA::Property, p end - it "must call on_change" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - p = t1.property("prop2") - p.period = 0.1 - vals = Array.new - p.on_change do |data| - vals << data - end - sleep 0.1 - Orocos::Async.steps - assert_equal 1,vals.size - p.write 33 - sleep 0.1 - Orocos::Async.steps - assert_equal 2,vals.size - assert_equal 33,vals.last + it "calls on_change with the property values" do + p = @task.property("prop2") + p.period = 0.01 + + vals = [] + p.on_change do |data| + vals << data end + wait_for_equality [84], vals + p.write 33 + wait_for_equality [84, 33], vals end end end @@ -51,48 +40,36 @@ describe Orocos::Async::CORBA::Attribute do include Orocos::Spec - before do + before do Orocos::Async.clear + @ns = Orocos::Async::NameService.new + @ns << Orocos::CORBA::NameService.new + + start "process" + @task = @ns.get("process_Test") end - describe "When connect to a remote task" do - it "must return a attribute object" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = t1.attribute("att2") - a.must_be_kind_of Orocos::Async::CORBA::Attribute - end + describe "When connect to a remote task" do + it "returns an attribute object synchronously" do + a = @task.attribute("att2") + assert_kind_of Orocos::Async::CORBA::Attribute, a end - it "must asynchronously return a attribute" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = nil - t1.attribute("att2") do |prop| - a = prop - end - sleep 0.1 - Orocos::Async.step - a.must_be_kind_of Orocos::Async::CORBA::Attribute - end + it "returns an attribute object asynchronously" do + a = nil + @task.attribute("att2") { |prop| a = prop } + wait_for { a } + assert_kind_of Orocos::Async::CORBA::Attribute, a end - it "must call on_change" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) - a = t1.attribute("att2") - a.period = 0.1 - vals = Array.new - a.on_change do |data| - vals << data - end - sleep 0.1 - Orocos::Async.steps - assert_equal 1,vals.size - a.write 33 - sleep 0.1 - Orocos::Async.steps - assert_equal 2,vals.size - assert_equal 33,vals.last + it "calls on_change with the attribute values" do + a = @task.attribute("att2") + a.period = 0.01 + vals = [] + a.on_change do |data| + vals << data end + wait_for_equality [84], vals + a.write 33 + wait_for_equality [84, 33], vals end end end diff --git a/test/async/test_name_service.rb b/test/async/test_name_service.rb index 330fa66c..c22deb3e 100644 --- a/test/async/test_name_service.rb +++ b/test/async/test_name_service.rb @@ -2,7 +2,7 @@ require 'orocos/async' describe Orocos::Async::NameService do - before do + before do Orocos::Async.clear end @@ -12,7 +12,7 @@ it "should raise NotFound if remote task is not reachable" do ns = Orocos::Async::NameService.new - assert_raises Orocos::NotFound do + assert_raises Orocos::NotFound do ns.get "bla" end end @@ -21,27 +21,24 @@ assert Orocos::Async.name_service.reachable? end - it "should return a TaskContextProxy" do + it "should return a TaskContextProxy" do Orocos.run('process') do ns = Orocos::Async::NameService.new - ns << Orocos::Async::CORBA::NameService.new + ns << Orocos::CORBA::NameService.new t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext + assert_kind_of Orocos::Async::CORBA::TaskContext, t t2 = nil - ns.get "process_Test" do |task| - t2 = task - end - sleep 0.1 - Orocos::Async.step - t2.must_be_instance_of Orocos::Async::CORBA::TaskContext + ns.get("process_Test") { |task| t2 = task } + wait_for { t2 } + assert_kind_of Orocos::Async::CORBA::TaskContext, t2 end end - it "should report that new task are added and removed" do + it "should report that new task are added and removed" do ns = Orocos::Async::NameService.new(:period => 0) - ns << Orocos::Async::CORBA::NameService.new + ns << Orocos::CORBA::NameService.new names_added = [] names_removed = [] ns.on_task_added do |n| @@ -63,120 +60,3 @@ assert_equal "/process_Test",names_removed.first end end - -describe Orocos::Async::CORBA::NameService do - include Orocos::Spec - - before do - Orocos::Async.clear - end - - it "should raise NotFound if remote task is not reachable" do - ns = Orocos::Async::CORBA::NameService.new - assert_raises Orocos::NotFound do - ns.get "bla" - end - end - - it "should not raise NotFound if remote task is not reachable and a block is given" do - ns = Orocos::Async::CORBA::NameService.new - - not_called = true - ns.get "bla" do |task| - not_called = false - end - - error = nil - ns.get "bla" do |task,err| - error = err - end - - sleep 0.1 - Orocos::Async.step - assert not_called - error.must_be_instance_of Orocos::NotFound - end - - it "should have a global default instance" do - Orocos::Async::CORBA.name_service.must_be_instance_of Orocos::Async::CORBA::NameService - end - - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::CORBA::NameService.new - t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext - - t2 = nil - ns.get "process_Test" do |task| - t2 = task - end - sleep 0.1 - Orocos::Async.step - t2.must_be_instance_of Orocos::Async::CORBA::TaskContext - end - end -end - -describe Orocos::Async::Local::NameService do - include Orocos::Spec - - before do - Orocos::Async.clear - end - - it "should raise NotFound if remote task is not reachable" do - ns = Orocos::Async::Local::NameService.new - assert_raises Orocos::NotFound do - ns.get "bla" - end - end - - it "should not raise NotFound if remote task is not reachable and a block is given" do - ns = Orocos::Async::Local::NameService.new - - not_called = true - ns.get "bla" do |task| - not_called = false - end - - error = nil - ns.get "bla" do |task,err| - error = err - end - - sleep 0.1 - Orocos::Async.step - assert not_called - error.must_be_instance_of Orocos::NotFound - end - - describe "#get" do - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::Local::NameService.new - t = Orocos.get "process_Test" - ns.register t - t = ns.get "process_Test" - t.must_be_instance_of Orocos::Async::CORBA::TaskContext - t.wait - assert t.reachable? - end - end - end - - describe "#proxy" do - it "should return a TaskContextProxy" do - Orocos.run('process') do - ns = Orocos::Async::Local::NameService.new - t = Orocos.get "process_Test" - ns.register t - t = ns.proxy "process_Test" - t.must_be_instance_of Orocos::Async::TaskContextProxy - t.wait - assert t.reachable? - end - end - end -end - diff --git a/test/async/test_task.rb b/test/async/test_task.rb index 283b7272..9e6ba5de 100644 --- a/test/async/test_task.rb +++ b/test/async/test_task.rb @@ -2,19 +2,18 @@ require 'orocos/async' describe Orocos::Async::CORBA::TaskContext do - before do + before do Orocos::Async.clear end - describe "initialize" do - before do + describe "initialize" do + before do Orocos::Async.clear end it "should raise ComError if remote task is not reachable and :raise is set to true" do - Orocos::CORBA.connect_timeout = 50 - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior("bla"),:raise => true) + Orocos::Async::CORBA::TaskContext.new(ior: ior("bla"), raise: true) sleep 0.1 assert_raises(Orocos::CORBA::ComError) do Orocos::Async.step @@ -45,46 +44,46 @@ end it "can be initialized from ior" do - Orocos.run('process') do - ior = Orocos.name_service.ior('process_Test') - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior) - assert t1.reachable? - t1 = Orocos::Async::CORBA::TaskContext.new(ior) - assert t1.reachable? - Orocos::Async.steps - end + start "process" + + ior = Orocos.name_service.ior("process_Test") + t1 = Orocos::Async::CORBA::TaskContext.new(ior: ior) + wait_for { t1.reachable? } + t1 = Orocos::Async::CORBA::TaskContext.new(ior) + wait_for { t1.reachable? } + Orocos::Async.steps end it "can be initialized from Orocos::TaskContext" do - Orocos.run('process') do - t1 = Orocos.name_service.get "process_Test" - t2 = Orocos::Async::CORBA::TaskContext.new(t1) - assert t2.reachable? - t2 = Orocos::Async::CORBA::TaskContext.new(:use => t1) - assert t2.reachable? - Orocos::Async.steps - end + start "process" + + t1 = Orocos.name_service.get "process_Test" + t2 = Orocos::Async::CORBA::TaskContext.new(t1) + wait_for { t2.reachable? } + t2 = Orocos::Async::CORBA::TaskContext.new(use: t1) + wait_for { t2.reachable? } + Orocos::Async.steps end it "can be initialized from Orocos::Async::CORBA::TaskContext" do - Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(:ior => ior("process_Test")) - assert t1.reachable? - t2 = Orocos::Async::CORBA::TaskContext.new(t1) - assert t2.reachable? - Orocos::Async.steps - end + start "process" + + t1 = Orocos::Async::CORBA::TaskContext.new(ior: ior("process_Test")) + wait_for { t1.reachable? } + t2 = Orocos::Async::CORBA::TaskContext.new(t1) + wait_for { t2.reachable? } + Orocos::Async.steps end - it 'should have the instance methods from Orocos::TaskContext' do + it "has the instance methods from Orocos::TaskContext" do methods = Orocos::Async::CORBA::TaskContext.instance_methods Orocos::TaskContext.instance_methods.each do |method| - methods.include?(method).wont_be_nil + assert methods.include?(method) end end end - describe "Async access" do + describe "Async access" do it "should raise on all synchronous calls to the remote task if not reachable" do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) t1.reachable?.must_equal false # only function that should never raise @@ -92,7 +91,7 @@ assert_raises Orocos::CORBA::ComError do t1.has_port?("bla").must_equal false end - assert_raises Orocos::CORBA::ComError do + assert_raises Orocos::CORBA::ComError do t1.has_attribute?("bla").must_equal false end t1.attribute_names do |names,e| @@ -107,23 +106,23 @@ connect = nil disconnect = nil t1 = nil + Orocos.run('process') do - t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test'),:period => 0.1,:watchdog => true) + t1 = Orocos::Async::CORBA::TaskContext.new( + ior('process_Test'), period: 0.01, watchdog: true + ) t1.on_reachable do connect = true end t1.on_unreachable do disconnect = true end - Orocos::Async.steps - assert connect + wait_for { connect } end - sleep 0.11 - Orocos::Async.steps - assert disconnect + wait_for { disconnect } end - it "should call on_port_reachable" do + it "should call on_port_reachable" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source'),:period => 0.1,:watchdog => true) ports = [] @@ -135,7 +134,7 @@ Orocos::Async.step assert_equal ["cycle", "cycle_struct", "out0", "out1", "out2", "out3", "state"],ports.sort ports.clear - + #should be called even if the task is already reachable t1.on_port_reachable do |name| ports << name @@ -146,7 +145,7 @@ end - it "should call on_port_unreachable" do + it "should call on_port_unreachable" do t1 = nil ports = [] Orocos.run('simple_source') do @@ -164,7 +163,7 @@ assert_equal ["cycle", "cycle_struct", "out0", "out1", "out2", "out3", "state"],ports.sort end - it "should call on_property_reachable" do + it "should call on_property_reachable" do process = start('process::Test' => 'process_Test') t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test'),:period => 0.1,:watchdog => true) @@ -186,7 +185,7 @@ assert_equal ["dynamic_prop","dynamic_prop_setter_called","prop1", "prop2", "prop3"],properties end - it "should call on_port_reachable if a port was dynamically added" do + it "should call on_port_reachable if a port was dynamically added" do task = Orocos::RubyTasks::TaskContext.new("test") t1 = Orocos::Async::CORBA::TaskContext.new(ior('test'),:period => 0.1,:watchdog => true) ports = [] @@ -201,7 +200,7 @@ assert_equal ['state', "frame","frame2"], ports end - it "should call on_port_unreachable if a port was dynamically removed" do + it "should call on_port_unreachable if a port was dynamically removed" do task = Orocos::RubyTasks::TaskContext.new("test") t1 = Orocos::Async::CORBA::TaskContext.new(ior('test'),:period => 0.1,:watchdog => true) port = task.create_output_port("frame","string") @@ -220,7 +219,7 @@ Orocos::Async.steps assert_equal ["frame"], ports end - + it "should call on_error" do t1 = nil error = nil @@ -263,7 +262,7 @@ Orocos::Async.step t1 end - t1.port_names do + t1.port_names do end sleep 0.1 Orocos::Async.steps @@ -287,7 +286,7 @@ end end - it "should return its ports" do + it "should return its ports" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source')) names = t1.port_names @@ -300,7 +299,7 @@ end end - it "should asynchronously return its ports" do + it "should asynchronously return its ports" do Orocos.run('simple_source') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('simple_source_source')) queue = Queue.new @@ -325,7 +324,7 @@ Orocos.run('process') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) q = Queue.new - 0.upto 19 do + 0.upto 19 do t1.reachable? do |val| sleep 0.15 # this will ensure that no thread can run twice q << val @@ -341,7 +340,7 @@ Orocos.run('process') do t1 = Orocos::Async::CORBA::TaskContext.new(ior('process_Test')) q = Queue.new - 0.upto 9 do + 0.upto 9 do t1.model do |val| sleep 0.1 # this will ensure that no thread can run twice q << val diff --git a/test/async/test_task_proxy.rb b/test/async/test_task_proxy.rb index 766a69fd..bd02bb78 100644 --- a/test/async/test_task_proxy.rb +++ b/test/async/test_task_proxy.rb @@ -22,9 +22,9 @@ t1 = Orocos::Async.proxy("simple_source_source") p = t1.port("cycle") sub_port = p.sub_port(:frame) - sub_port.must_be_instance_of Orocos::Async::PortProxy - sub_port = p.sub_port([:frame,:size]) - sub_port.must_be_instance_of Orocos::Async::PortProxy + assert_kind_of Orocos::Async::SubPortProxy, sub_port + sub_port = p.sub_port(%I[frame size]) + assert_kind_of Orocos::Async::SubPortProxy, sub_port end end @@ -32,7 +32,7 @@ it "should raise RuntimeError if an operation is performed which is not available for this port type" do Orocos.run('simple_sink') do t1 = Orocos::Async.proxy("simple_sink_sink") - p = t1.port("cycle",:wait => true) + p = t1.port("cycle", wait: true) assert_raises RuntimeError do p.period = 1 end @@ -46,12 +46,19 @@ it "should return the type name of the port if known or connected" do t1 = Orocos::Async.proxy("simple_source_source",:retry_period => 0.2,:period => 0.2) p = t1.port("cycle2",:type => Integer) - assert_equal "Fixnum",p.type_name + expected = + if Fixnum == Integer + "Integer" + else + "Fixnum" + end + + assert_equal expected, p.type_name p2 = t1.port("cycle") Orocos.run('simple_source') do p2.wait - assert_equal "Fixnum",p.type_name + assert_equal Fixnum.to_s, p.type_name end end @@ -131,7 +138,7 @@ t1.configure t1.start wait_for { data } - data.must_be_instance_of Fixnum + assert_kind_of Fixnum, data end end end @@ -142,7 +149,7 @@ describe "on_reachable" do it "must be called once when a prop gets reachable" do - prop = Orocos::Async.proxy("process_Test",:period => 0.09).property("prop1") + prop = Orocos::Async.proxy("process_Test", period: 0.09).property("prop1") counter = 0 prop.on_reachable do counter +=1 @@ -160,16 +167,17 @@ prop = Orocos::Async.proxy("process_Test",:period => 0.09).property("prop1") counter = 0 prop.on_unreachable do - counter +=1 + counter += 1 end - Orocos::Async.steps - assert_equal 1,counter + wait_for { !prop.valid_delegator? } + assert_equal 1, counter - Orocos.run('process') do + Orocos.run("process") do prop.wait + wait_for { prop.valid_delegator? } + assert prop.valid_delegator? end - Orocos::Async.steps - assert_equal 2,counter + wait_for { counter == 2 } end end end @@ -244,27 +252,25 @@ end it "should reconnect" do - t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.09) + t1 = Orocos::Async.proxy("process_Test", retry_period: 0.09) p = t1.attribute("att2") - vals = Array.new + vals = [] p.on_change do |data| vals << data end - Orocos.run('process') do - Orocos::Async.steps - assert p.reachable? + Orocos.run("process") do + wait_for { p.valid_delegator? } + wait_for { vals.size == 1 } end - assert_equal 1,vals.size - wait_for do - !t1.reachable? && !p.reachable? - end + wait_for { !t1.valid_delegator? } + refute p.valid_delegator? - Orocos.run('process') do - Orocos::Async.steps + Orocos.run("process") do + wait_for { p.valid_delegator? } + wait_for { vals.size == 2 } end - assert_equal 2,vals.size end end end @@ -333,9 +339,10 @@ end Orocos.run('process') do - wait_for { vals.size == 1 } + wait_for { vals.size >= 1 } end Orocos::Async.steps + Orocos.run('process') do wait_for { vals.size == 2 } end @@ -381,13 +388,13 @@ it "shortcut must return TaskContexProxy" do t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.1,:period => 0.1) - t1.must_be_instance_of Orocos::Async::TaskContextProxy + assert_kind_of Orocos::Async::TaskContextProxy, t1 end it "should return a port proxy" do t1 = Orocos::Async.proxy("process_Test",:retry_period => 0.1,:period => 0.1) p = t1.port("test") - p.must_be_instance_of Orocos::Async::PortProxy + assert_kind_of Orocos::Async::PortProxy, p end it "should connect to a remote task when reachable" do @@ -407,23 +414,23 @@ Orocos::Async.steps assert_equal 1, disconnects # on_unreachable will be called because is not yet reachable - Orocos.run('process') do - Orocos::Async.steps # queue reconnect - assert t1.reachable? - t1.instance_variable_get(:@delegator_obj).must_be_instance_of Orocos::Async::CORBA::TaskContext + Orocos.run("process") do + wait_for { t1.reachable? } + assert_kind_of Orocos::Async::CORBA::TaskContext, + t1.instance_variable_get(:@delegator_obj) assert_equal 1, disconnects end - assert !t1.reachable? + refute t1.reachable? Orocos::Async.steps # queue reconnect assert_equal 1, connects assert_equal 2, disconnects - Orocos.run('process') do + Orocos.run("process") do Orocos::Async.steps # queue reconnect assert t1.reachable? end Orocos::Async.steps # queue reconnect - assert !t1.reachable? + refute t1.reachable? assert_equal 2, connects assert_equal 3, disconnects end From 4c1e7ebf1471098a10061d8edf4c9a71b02e5393 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Thu, 3 Aug 2023 15:29:23 -0300 Subject: [PATCH 15/15] feat: define TaskContextProxy#disconnect There was previously no way to make a task context proxy completely stop working. --- lib/orocos/async/task_context_proxy.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/orocos/async/task_context_proxy.rb b/lib/orocos/async/task_context_proxy.rb index c7b3cd71..b2c89216 100644 --- a/lib/orocos/async/task_context_proxy.rb +++ b/lib/orocos/async/task_context_proxy.rb @@ -574,6 +574,11 @@ def reconnect(wait_for_task = false) wait if wait_for_task == true end + def disconnect + @resolve_timer.stop + remove_all_listeners + end + main_thread_call def property(name, options = Hash.new) name = name.to_str options,other_options = Kernel.filter_options options, wait: @options[:wait]