Skip to content

Commit 1d5da76

Browse files
committed
async: review and refactor interaction between main thread and worker threads
Turns out that the async code was actually doing a lot of sync stuff behind the scenes, which was causing very noticeable lag and blocking behaviors when on bad connections. The first change was to create main_thread_call helper. One marks a "main thread only" method like so ~~~ main_thread_call def some_method end ~~~ and method calls not in the main thread will cause an exception to be raised. This is rather obviously meant for debugging. From there, I've fixed calls that were coming from a worker thread but should not have. In most cases, the pattern I had to change was ~~~ def some_method @event_loop.defer ... do do some async call in sync form (i.e. without blocks) end end ~~~ This was causing a lot of headaches as the code that turns async calls into sync calls depends on a lot of synchronization (e.g. with @delegator_obj), which was solved by having huge @mutex.synchronize blocks, serializing major parts of the execution. These calls have been changed into the non-blocking block form (a.k.a. callback form), and are called from the main thread. One major change is the removal of sub-genres of Async::NameService. These were rather pointless, as turning a sync into an async object can be done generically through to_async. The "new" Async::NameService simply delegates to the non-async get in a worker thread and then turns the non-async object into an async object. Another major change is how Async::TaskContext creates itself. The very unfortunate design choice in the async and proxy version of TaskContext was to let them get a name as argument, and let them do the resolution. This brings a lot of complexity in the async case, as there is a zone where the async task context does not have an underlying task context. Proxies are meant for that, not async. Generally speaking, this interface should be changed to do the name resolution in name_service (duh).
1 parent 6f4cc96 commit 1d5da76

18 files changed

+787
-1096
lines changed

lib/orocos/async/async.rb

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
# puts state
3131
# end
3232
# reader.read do |value|
33-
# puts value
33+
# puts value
3434
# end
3535
#
3636
# If a method call needs the remote Orocos Task which is currently not reachable
@@ -44,13 +44,13 @@
4444
# short on worker threads.
4545
#
4646
# # these events are generated by polling
47-
# task.on_connect do
47+
# task.on_connect do
4848
# puts "connected"
4949
# end
50-
# task.on_disconnect do
50+
# task.on_disconnect do
5151
# puts "disconnected"
5252
# end
53-
# task.on_reconnect do
53+
# task.on_reconnect do
5454
# puts "reconnected"
5555
# end
5656
#
@@ -65,12 +65,22 @@
6565
# port.on_new_data do |data|
6666
# puts data
6767
# end
68-
#
68+
#
6969
# The polling frequency can be changed by setting the period attribute of each
7070
# asynchronous object.
7171
#
7272
module Orocos::Async
73-
KNOWN_ERRORS = [Orocos::ComError,Orocos::NotFound,Typelib::NotFound,Orocos::TypekitTypeNotFound,Orocos::TypekitTypeNotExported,Orocos::StateTransitionFailed,Orocos::ConnectionFailed,OroGen::DefinitionTypekitNotFound]
73+
KNOWN_ERRORS = [
74+
Orocos::ComError,
75+
Orocos::NotFound,
76+
Typelib::NotFound,
77+
Orocos::TypekitTypeNotFound,
78+
Orocos::TypekitTypeNotExported,
79+
Orocos::StateTransitionFailed,
80+
Orocos::ConnectionFailed,
81+
OroGen::DefinitionTypekitNotFound
82+
]
83+
7484
class << self
7585
extend ::Forwardable
7686

lib/orocos/async/attributes.rb

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,26 @@ def initialize(async_task,attribute,options=Hash.new)
1717
disable_emitting do
1818
reachable!(attribute)
1919
end
20-
@poll_timer = @event_loop.async_every(method(:raw_read), {:period => period, :start => false,
21-
:known_errors => Orocos::Async::KNOWN_ERRORS}) do |data,error|
20+
21+
@poll_timer = @event_loop.async_every(
22+
proc { @delegator_obj.raw_read },
23+
{ period: period, start: false,
24+
known_errors: Orocos::Async::KNOWN_ERRORS }
25+
) do |data, error|
2226
if error
2327
@poll_timer.cancel
2428
self.period = @poll_timer.period
25-
@event_loop.once do
26-
event :error,error
27-
end
28-
else
29-
if data
30-
if @raw_last_sample != data
31-
@raw_last_sample = data
32-
event :raw_change,data
33-
event :change,Typelib.to_ruby(data)
34-
end
29+
event :error, error
30+
elsif data
31+
if @raw_last_sample != data
32+
@raw_last_sample = data
33+
event :raw_change, data
34+
event :change, Typelib.to_ruby(data)
3535
end
3636
end
3737
end
3838
@poll_timer.doc = attribute.full_name
39+
3940
@task.on_unreachable do
4041
unreachable!
4142
end
@@ -92,14 +93,10 @@ def wait(timeout = 5.0)
9293
def really_add_listener(listener)
9394
super
9495
if listener.event == :raw_change
95-
if !@poll_timer.running?
96-
@poll_timer.start(period)
97-
end
96+
@poll_timer.start(period) unless @poll_timer.running?
9897
listener.call(@raw_last_sample) if @raw_last_sample && listener.use_last_value?
9998
elsif listener.event == :change
100-
if !@poll_timer.running?
101-
@poll_timer.start(period)
102-
end
99+
@poll_timer.start(period) unless @poll_timer.running?
103100
listener.call(Typelib.to_ruby(@raw_last_sample)) if @raw_last_sample && listener.use_last_value?
104101
end
105102
end

0 commit comments

Comments
 (0)