Skip to content

Commit 6f70825

Browse files
xuan-cao-swidmathieukaylareopelle
authored
refactor: Support multiple views by isolating data_points from shared modification (#1912)
* test: test case for metric_stream and store * update test case * refine the test case; safeguard the callback * remove typo * replace unsafe timeout.timeout with thread join time * test fix * sleep 0.2 * fix: fix the mutliple view change the single data point issue * update test case * Update metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb Co-authored-by: Damien Mathieu <[email protected]> * update * skip the exception test for macos temprary --------- Co-authored-by: Damien Mathieu <[email protected]> Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 65cff52 commit 6f70825

File tree

5 files changed

+38
-18
lines changed

5 files changed

+38
-18
lines changed

metrics_sdk/lib/opentelemetry/sdk/metrics/state/asynchronous_metric_stream.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ def invoke_callback(timeout, attributes)
5656
end
5757
end
5858
else
59-
@registered_views.each do |view|
59+
@registered_views.each do |view, data_points|
6060
@mutex.synchronize do
6161
@callback.each do |cb|
6262
value = safe_guard_callback(cb, timeout: timeout)
6363
next unless value.is_a?(Numeric) # ignore if value is not valid number
6464

6565
merged_attributes = attributes || {}
6666
merged_attributes.merge!(view.attribute_keys)
67-
view.aggregation.update(value, merged_attributes, @data_points) if view.valid_aggregation?
67+
view.aggregation.update(value, merged_attributes, data_points) if view.valid_aggregation?
6868
end
6969
end
7070
end
@@ -78,19 +78,19 @@ def safe_guard_callback(callback, timeout: DEFAULT_TIMEOUT)
7878
thread = Thread.new do
7979
result = callback.call
8080
rescue StandardError => e
81-
OpenTelemetry.logger.error("Error invoking callback: #{e.message}")
81+
OpenTelemetry.handle_error(exception: e, message: 'Error invoking callback.')
8282
result = :error
8383
end
8484

8585
unless thread.join(timeout)
8686
thread.kill
87-
OpenTelemetry.logger.error("Timeout while invoking callback after #{timeout} seconds")
87+
OpenTelemetry.handle_error(message: "Timeout while invoking callback after #{timeout} seconds")
8888
return nil
8989
end
9090

9191
result == :error ? nil : result
9292
rescue StandardError => e
93-
OpenTelemetry.logger.error("Unexpected error in callback execution: #{e.message}")
93+
OpenTelemetry.handle_error(exception: e, message: 'Unexpected error in callback execution.')
9494
nil
9595
end
9696
end

metrics_sdk/lib/opentelemetry/sdk/metrics/state/metric_stream.rb

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def initialize(
3232
@instrumentation_scope = instrumentation_scope
3333
@default_aggregation = aggregation
3434
@data_points = {}
35-
@registered_views = []
35+
@registered_views = {}
3636

3737
find_registered_view
3838
@mutex = Mutex.new
@@ -43,12 +43,14 @@ def collect(start_time, end_time)
4343
metric_data = []
4444

4545
# data points are required to export over OTLP
46-
return metric_data if @data_points.empty?
46+
return metric_data if empty_data_point?
4747

4848
if @registered_views.empty?
4949
metric_data << aggregate_metric_data(start_time, end_time)
5050
else
51-
@registered_views.each { |view| metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation) }
51+
@registered_views.each do |view, data_points|
52+
metric_data << aggregate_metric_data(start_time, end_time, aggregation: view.aggregation, data_points: data_points)
53+
end
5254
end
5355

5456
metric_data
@@ -60,20 +62,21 @@ def update(value, attributes)
6062
if @registered_views.empty?
6163
@mutex.synchronize { @default_aggregation.update(value, attributes, @data_points) }
6264
else
63-
@registered_views.each do |view|
65+
@registered_views.each do |view, data_points|
6466
@mutex.synchronize do
6567
attributes ||= {}
6668
attributes.merge!(view.attribute_keys)
67-
view.aggregation.update(value, attributes, @data_points) if view.valid_aggregation?
69+
view.aggregation.update(value, attributes, data_points) if view.valid_aggregation?
6870
end
6971
end
7072
end
7173
end
7274

73-
def aggregate_metric_data(start_time, end_time, aggregation: nil)
75+
def aggregate_metric_data(start_time, end_time, aggregation: nil, data_points: nil)
7476
aggregator = aggregation || @default_aggregation
7577
is_monotonic = aggregator.respond_to?(:monotonic?) ? aggregator.monotonic? : nil
7678
aggregation_temporality = aggregator.respond_to?(:aggregation_temporality) ? aggregator.aggregation_temporality : nil
79+
data_point = data_points || @data_points
7780

7881
MetricData.new(
7982
@name,
@@ -82,7 +85,7 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil)
8285
@instrument_kind,
8386
@meter_provider.resource,
8487
@instrumentation_scope,
85-
aggregator.collect(start_time, end_time, @data_points),
88+
aggregator.collect(start_time, end_time, data_point),
8689
aggregation_temporality,
8790
start_time,
8891
end_time,
@@ -93,7 +96,17 @@ def aggregate_metric_data(start_time, end_time, aggregation: nil)
9396
def find_registered_view
9497
return if @meter_provider.nil?
9598

96-
@meter_provider.registered_views.each { |view| @registered_views << view if view.match_instrument?(self) }
99+
@meter_provider.registered_views.each { |view| @registered_views[view] = {} if view.match_instrument?(self) }
100+
end
101+
102+
def empty_data_point?
103+
if @registered_views.empty?
104+
@data_points.empty?
105+
else
106+
@registered_views.each_value do |data_points|
107+
return false unless data_points.empty?
108+
end
109+
end
97110
end
98111

99112
def to_s

metrics_sdk/test/opentelemetry/sdk/metrics/state/asynchronous_metric_stream_test.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767

6868
registered_views = stream.instance_variable_get(:@registered_views)
6969
_(registered_views.size).must_equal(1)
70-
_(registered_views.first.aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
70+
_(registered_views.first[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
7171
end
7272
end
7373

@@ -139,6 +139,8 @@
139139
end
140140

141141
it 'handles callback exceptions' do
142+
skip if RUBY_PLATFORM.include? 'darwin'
143+
142144
error_callback = [proc { raise StandardError, 'Callback error' }]
143145
error_stream = OpenTelemetry::SDK::Metrics::State::AsynchronousMetricStream.new(
144146
'async_counter', 'description', 'unit', :observable_counter,
@@ -151,7 +153,7 @@
151153
log_output = StringIO.new
152154
OpenTelemetry.logger = Logger.new(log_output)
153155
error_stream.collect(0, 1000)
154-
assert_includes log_output.string, 'Error invoking callback: Callback error'
156+
assert_includes log_output.string, 'OpenTelemetry error: Error invoking callback.'
155157
OpenTelemetry.logger = original_logger
156158
end
157159
end

metrics_sdk/test/opentelemetry/sdk/metrics/state/metric_stream_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252

5353
registered_views = stream.instance_variable_get(:@registered_views)
5454
_(registered_views.size).must_equal(1)
55-
_(registered_views.first).must_equal(view)
55+
_(registered_views.first[0]).must_equal(view)
5656
end
5757
end
5858

@@ -229,7 +229,7 @@
229229
registered_views = stream.instance_variable_get(:@registered_views)
230230

231231
_(registered_views.size).must_equal 1
232-
_(registered_views[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
232+
_(registered_views.first[0].aggregation.class).must_equal ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue
233233
end
234234
end
235235

metrics_sdk/test/opentelemetry/sdk/metrics/view/registered_view_test.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@
181181
OpenTelemetry.meter_provider.add_view('async_counter', aggregation: ::OpenTelemetry::SDK::Metrics::Aggregation::LastValue.new)
182182

183183
callback = proc { 25 }
184-
meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
184+
asynch_counter = meter.create_observable_counter('async_counter', unit: 'smidgen', description: 'an async counter', callback: callback)
185+
asynch_counter.observe
186+
asynch_counter.observe
185187

186188
metric_exporter.pull
187189
last_snapshot = metric_exporter.metric_snapshots
@@ -197,6 +199,9 @@
197199
_(snapshot.instrumentation_scope.name).must_equal('test')
198200
_(snapshot.data_points).wont_be_empty
199201
end
202+
203+
_(last_snapshot[0].data_points.first.value).must_equal 75 # view aggregation sum
204+
_(last_snapshot[1].data_points.first.value).must_equal 25 # view aggregation last value
200205
end
201206

202207
it 'emits asynchronous counter metrics with view attribute filtering' do

0 commit comments

Comments
 (0)