Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/metrics_sdk/metrics_collect_exp_histogram.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

require 'bundler/inline'

gemfile(true) do
source 'https://rubygems.org'
gem "opentelemetry-api"
gem "opentelemetry-common"
gem "opentelemetry-sdk"

gem 'opentelemetry-metrics-api', path: '../../metrics_api'
gem 'opentelemetry-metrics-sdk', path: '../../metrics_sdk'
end

require 'opentelemetry/sdk'
require 'opentelemetry-metrics-sdk'

# this example manually configures the exporter, turn off automatic configuration
ENV['OTEL_METRICS_EXPORTER'] = 'none'

OpenTelemetry::SDK.configure

console_metric_exporter = OpenTelemetry::SDK::Metrics::Export::ConsoleMetricPullExporter.new

OpenTelemetry.meter_provider.add_metric_reader(console_metric_exporter)

OpenTelemetry.meter_provider.add_view('*exponential*', aggregation: OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(aggregation_temporality: :cumulative, max_scale: 20), type: :histogram, unit: 'smidgen')

meter = OpenTelemetry.meter_provider.meter("SAMPLE_METER_NAME")

exponential_histogram = meter.create_histogram('test_exponential_histogram', unit: 'smidgen', description: 'a small amount of something')
(1..10).each do |v|
val = v ** 2
exponential_histogram.record(val, attributes: { 'lox' => 'xol' })
end

OpenTelemetry.meter_provider.metric_readers.each(&:pull)
OpenTelemetry.meter_provider.shutdown
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require_relative 'exponential_histogram/ieee_754'
require_relative 'exponential_histogram/logarithm_mapping'
require_relative 'exponential_histogram/exponent_mapping'
require_relative 'exponential_histogram_data_point'

module OpenTelemetry
module SDK
Expand Down Expand Up @@ -44,11 +45,25 @@ def initialize(
@scale = validate_scale(max_scale)

@mapping = new_mapping(@scale)

# Previous state for cumulative aggregation
@previous_positive = {} # nil
@previous_negative = {} # nil
@previous_min = {} # Float::INFINITY
@previous_max = {} # -Float::INFINITY
@previous_sum = {} # 0
@previous_count = {} # 0
@previous_zero_count = {} # 0
@previous_scale = {} # nil
# @start_time_unix_nano = {} #nil
end

# when aggregation temporality is cumulative, merge and downscale will happen.
# rubocop:disable Metrics/MethodLength
def collect(start_time, end_time, data_points)
if @aggregation_temporality.delta?
# Set timestamps and 'move' data point values to result.
# puts "data_points.inspect: #{data_points.inspect}"
hdps = data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
Expand All @@ -57,18 +72,133 @@ def collect(start_time, end_time, data_points)
data_points.clear
hdps
else
# Update timestamps and take a snapshot.
data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
hdp.positive = hdp.positive.dup
hdp.negative = hdp.negative.dup
hdp
# CUMULATIVE temporality - merge current data_points to previous data_points
# and only keep the merged data_points in @previous_*

merged_data_points = {}

# this will slow down the operation especially if large amount of data_points present
# but it should be fine since with cumulative, the data_points are merged into previous_* and not kept in data_points
# rubocop:disable Metrics/BlockLength
data_points.each do |attributes, hdp|
# Store current values
current_positive = hdp.positive
current_negative = hdp.negative
current_sum = hdp.sum
current_min = hdp.min
current_max = hdp.max
current_count = hdp.count
current_zero_count = hdp.zero_count
current_scale = hdp.scale

# Setup previous positive, negative bucket and scale based on three different cases
@previous_positive[attributes] = current_positive.copy_empty if @previous_positive[attributes].nil?
@previous_negative[attributes] = current_negative.copy_empty if @previous_negative[attributes].nil?
@previous_scale[attributes] = current_scale if @previous_scale[attributes].nil?

# Determine minimum scale for merging
min_scale = [@previous_scale[attributes], current_scale].min

# Calculate ranges for positive and negative buckets
low_positive, high_positive = get_low_high_previous_current(
@previous_positive[attributes],
current_positive,
@previous_scale[attributes],
current_scale,
min_scale
)
low_negative, high_negative = get_low_high_previous_current(
@previous_negative[attributes],
current_negative,
@previous_scale[attributes],
current_scale,
min_scale
)

# Adjust min_scale based on bucket size constraints
min_scale = [
min_scale - get_scale_change(low_positive, high_positive),
min_scale - get_scale_change(low_negative, high_negative)
].min

# Downscale previous buckets if necessary
downscale_change = @previous_scale[attributes] - min_scale
downscale(downscale_change, @previous_positive[attributes], @previous_negative[attributes])

# Merge current buckets into previous buckets (kind like update); it's always :cumulative
merge_buckets(@previous_positive[attributes], current_positive, current_scale, min_scale, @aggregation_temporality)
merge_buckets(@previous_negative[attributes], current_negative, current_scale, min_scale, @aggregation_temporality)

# initialize min, max, sum, count, zero_count for first time
@previous_min[attributes] = Float::INFINITY if @previous_min[attributes].nil?
@previous_max[attributes] = -Float::INFINITY if @previous_max[attributes].nil?
@previous_sum[attributes] = 0 if @previous_sum[attributes].nil?
@previous_count[attributes] = 0 if @previous_count[attributes].nil?
@previous_zero_count[attributes] = 0 if @previous_zero_count[attributes].nil?

# Update aggregated values
@previous_min[attributes] = [@previous_min[attributes], current_min].min
@previous_max[attributes] = [@previous_max[attributes], current_max].max
@previous_sum[attributes] += current_sum
@previous_count[attributes] += current_count
@previous_zero_count[attributes] += current_zero_count
@previous_scale[attributes] = min_scale

# Create merged data point
merged_hdp = ExponentialHistogramDataPoint.new(
attributes,
start_time,
end_time,
@previous_count[attributes],
@previous_sum[attributes],
@previous_scale[attributes],
@previous_zero_count[attributes],
@previous_positive[attributes].dup,
@previous_negative[attributes].dup,
0, # flags
nil, # exemplars
@previous_min[attributes],
@previous_max[attributes],
@zero_threshold
)

merged_data_points[attributes] = merged_hdp
end
# rubocop:enable Metrics/BlockLength

# when you have no local_data_points, the loop from cumulative aggregation will not run
# so return last merged data points if exists
if data_points.empty? && !@previous_positive.empty?
@previous_positive.each_key do |attributes|
merged_hdp = ExponentialHistogramDataPoint.new(
attributes,
start_time,
end_time,
@previous_count[attributes],
@previous_sum[attributes],
@previous_scale[attributes],
@previous_zero_count[attributes],
@previous_positive[attributes].dup,
@previous_negative[attributes].dup,
0, # flags
nil, # exemplars
@previous_min[attributes],
@previous_max[attributes],
@zero_threshold
)
merged_data_points[attributes] = merged_hdp
end
end

# clear data_points since the data is merged into previous_* already;
# otherwise we will have duplicated data_points in the next collect
data_points.clear
merged_data_points.values # return array
end
end
# rubocop:enable Metrics/MethodLength

# this is aggregate in python; there is no merge in aggregate; but rescale happened
# rubocop:disable Metrics/MethodLength
def update(amount, attributes, data_points)
# fetch or initialize the ExponentialHistogramDataPoint
Expand All @@ -78,6 +208,7 @@ def update(amount, attributes, data_points)
max = -Float::INFINITY
end

# this code block will only be executed if no data_points was found with the attributes
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
Expand Down Expand Up @@ -203,7 +334,8 @@ def get_scale_change(low, high)
end

def downscale(change, positive, negative)
return if change <= 0
return if change == 0
raise ArgumentError, 'Invalid change of scale' if change < 0

positive.downscale(change)
negative.downscale(change)
Expand All @@ -217,11 +349,77 @@ def validate_scale(scale)
end

def validate_size(size)
raise ArgumentError, "Max size #{size} is smaller than minimum size #{MIN_MAX_SIZE}" if size < MIN_MAX_SIZE
raise ArgumentError, "Max size #{size} is larger than maximum size #{MAX_MAX_SIZE}" if size > MAX_MAX_SIZE
raise ArgumentError, "Buckets min size #{size} is smaller than minimum min size #{MIN_MAX_SIZE}" if size < MIN_MAX_SIZE
raise ArgumentError, "Buckets max size #{size} is larger than maximum max size #{MAX_MAX_SIZE}" if size > MAX_MAX_SIZE

size
end

# checked, only issue is if @previous_scale is nil, then get_low_high may throw error
def get_low_high_previous_current(previous_buckets, current_buckets, previous_scale, current_scale, min_scale)
previous_low, previous_high = get_low_high(previous_buckets, previous_scale, min_scale)
current_low, current_high = get_low_high(current_buckets, current_scale, min_scale)

if current_low > current_high
[previous_low, previous_high]
elsif previous_low > previous_high
[current_low, current_high]
else
[[previous_low, current_low].min, [previous_high, current_high].max]
end
end

# checked
def get_low_high(buckets, scale, min_scale)
return [0, -1] if buckets.nil? || buckets.counts == [0] || buckets.counts.empty?

shift = scale - min_scale
[buckets.index_start >> shift, buckets.index_end >> shift]
end

def merge_buckets(previous_buckets, current_buckets, current_scale, min_scale, aggregation_temporality)
return unless current_buckets && !current_buckets.counts.empty?

current_change = current_scale - min_scale

# when we iterate counts, we don't use offset counts
current_buckets.instance_variable_get(:@counts).each_with_index do |current_bucket, current_bucket_index|
next if current_bucket == 0

current_index = current_buckets.index_base + current_bucket_index
current_index -= current_buckets.counts.size if current_index > current_buckets.index_end

inds = current_index >> current_change

# Grow previous buckets if needed to accommodate the new index
if inds < previous_buckets.index_start
span = previous_buckets.index_end - inds

raise StandardError, 'Incorrect merge scale' if span >= @size

previous_buckets.grow(span + 1, @size) if span >= previous_buckets.counts.size

previous_buckets.index_start = inds
end

if inds > previous_buckets.index_end
span = inds - previous_buckets.index_start

raise StandardError, 'Incorrect merge scale' if span >= @size

previous_buckets.grow(span + 1, @size) if span >= previous_buckets.counts.size

previous_buckets.index_end = inds
end

bucket_index = inds - previous_buckets.index_base
bucket_index += previous_buckets.counts.size if bucket_index < 0

# For delta temporality in merge, we subtract (this shouldn't normally happen in our use case)
increment = aggregation_temporality == :delta ? -current_bucket : current_bucket
previous_buckets.increment_bucket(bucket_index, increment)
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def grow(needed, max_size)
old_positive_limit = size - bias

new_size = [2**Math.log2(needed).ceil, max_size].min

new_positive_limit = new_size - bias

tmp = Array.new(new_size, 0)
Expand Down Expand Up @@ -105,6 +104,15 @@ def downscale(amount)
def increment_bucket(bucket_index, increment = 1)
@counts[bucket_index] += increment
end

def copy_empty
new_buckets = self.class.new
new_buckets.instance_variable_set(:@counts, Array.new(@counts.size, 0))
new_buckets.instance_variable_set(:@index_base, @index_base)
new_buckets.instance_variable_set(:@index_start, @index_start)
new_buckets.instance_variable_set(:@index_end, @index_end)
new_buckets
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module ExponentialHistogram
class ExponentMapping
attr_reader :scale

MINIMAL_SCALE = -10
MAXIMAL_SCALE = 0

def initialize(scale)
@scale = scale
@min_normal_lower_boundary_index = calculate_min_normal_lower_boundary_index(scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module ExponentialHistogram
class LogarithmMapping
attr_reader :scale

MINIMAL_SCALE = 1
MAXIMAL_SCALE = 20

def initialize(scale)
@scale = scale
@scale_factor = Log2eScaleFactor::LOG2E_SCALE_BUCKETS[scale] # scale_factor is used for mapping the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@
error = assert_raises(ArgumentError) do
OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 10_000_000)
end
assert_equal('Max size 10000000 is larger than maximum size 16384', error.message)
assert_equal('Buckets max size 10000000 is larger than maximum max size 16384', error.message)

error = assert_raises(ArgumentError) do
OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(max_size: 0)
end
assert_equal('Max size 0 is smaller than minimum size 2', error.message)
assert_equal('Buckets min size 0 is smaller than minimum min size 2', error.message)
end
end
end
Loading
Loading