Skip to content

Commit

Permalink
Improve tests and decouple concepts
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Sep 13, 2024
1 parent 14b9fe4 commit d134e98
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 52 deletions.
74 changes: 51 additions & 23 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@ module Timescaledb
module ContinuousAggregatesHelper
extend ActiveSupport::Concern

included do
class_attribute :rollup_rules, default: {
/count\(\*\)\s+as\s+(\w+)/ => 'sum(\1) as \1',
/sum\((\w+)\)\s+as\s+(\w+)/ => 'sum(\2) as \2',
/min\((\w+)\)\s+as\s+(\w+)/ => 'min(\2) as \2',
/max\((\w+)\)\s+as\s+(\w+)/ => 'max(\2) as \2',
/candlestick_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/stats_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3',
/stats_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/state_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
/percentile_agg\((\w+),\s*(\w+)\)\s+as\s+(\w+)/ => 'rollup(\3) as \3',
/heartbeat_agg\((\w+)\)\s+as\s+(\w+)/ => 'rollup(\2) as \2',
}
end

class_methods do
def continuous_aggregates(options = {})
@time_column = options[:time_column] || 'ts'
Expand All @@ -22,6 +37,9 @@ def continuous_aggregates(options = {})
# Allow for custom aggregate definitions to override or add to scope-based ones
@aggregates.merge!(options[:aggregates] || {})

# Add custom rollup rules if provided
self.rollup_rules.merge!(options[:custom_rollup_rules] || {})

define_continuous_aggregate_classes
end

Expand All @@ -37,32 +55,13 @@ def refresh_aggregates(timeframes = nil)

def create_continuous_aggregates(with_data: false)
@aggregates.each do |aggregate_name, config|
previous_timeframe = nil
@timeframes.each do |timeframe|
klass = const_get("#{aggregate_name}_per_#{timeframe}".classify)
interval = "'1 #{timeframe.to_s}'"
base_query =
if previous_timeframe
prev_klass = const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
prev_klass
.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{config[:select]}")
.group(1, *config[:group_by])
else
scope = public_send(config[:scope_name])
select_values = scope.select_values.join(', ')
group_values = scope.group_values

config[:select] = select_values.gsub('count(*) as total', 'sum(total) as total')
config[:group_by] = (2...(2 + group_values.size)).map(&:to_s).join(', ')

self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}")
.group(1, *group_values)
end

connection.execute <<~SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name}
WITH (timescaledb.continuous) AS
#{base_query.to_sql}
#{klass.base_query.to_sql}
#{with_data ? 'WITH DATA' : 'WITH NO DATA'};
SQL

Expand All @@ -74,30 +73,58 @@ def create_continuous_aggregates(with_data: false)
schedule_interval => INTERVAL '#{policy[:schedule_interval]}');
SQL
end

previous_timeframe = timeframe
end
end
end

def rollup(scope, interval)
select_values = scope.select_values.join(', ')
group_values = scope.group_values

self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}")
.group(1, *group_values)
end

def apply_rollup_rules(select_values)
rollup_rules.reduce(select_values) do |result, (pattern, replacement)|
result.gsub(pattern, replacement)
end
end

private

def define_continuous_aggregate_classes
base_model = self
@aggregates.each do |aggregate_name, config|
previous_timeframe = nil
@timeframes.each do |timeframe|
_table_name = "#{aggregate_name}_per_#{timeframe}"
class_name = "#{aggregate_name}_per_#{timeframe}".classify
const_set(class_name, Class.new(ActiveRecord::Base) do
extend ActiveModel::Naming

class << self
attr_accessor :config, :timeframe
attr_accessor :config, :timeframe, :base_query, :base_model
end

self.table_name = _table_name
self.config = config
self.timeframe = timeframe

interval = "'1 #{timeframe.to_s}'"
self.base_model = base_model
self.base_query =
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
prev_klass
.select("time_bucket(#{interval}, #{base_model.instance_variable_get(:@time_column)}) as #{base_model.instance_variable_get(:@time_column)}, #{config[:select]}")
.group(1, *config[:group_by])
else
scope = base_model.public_send(config[:scope_name])
config[:select] = base_model.apply_rollup_rules(scope.select_values.join(', '))
config[:group_by] = scope.group_values
base_model.rollup(scope, interval)
end

def self.refresh!
connection.execute("CALL refresh_continuous_aggregate('#{table_name}', null, null);")
Expand All @@ -111,6 +138,7 @@ def self.refresh_policy
config[:refresh_policy]&.dig(timeframe)
end
end)
previous_timeframe = timeframe
end
end
end
Expand Down
73 changes: 44 additions & 29 deletions spec/timescaledb/continuos_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,21 @@ class Download < ActiveRecord::Base
it 'defines rollup scope for aggregates' do
test_class.create_continuous_aggregates
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth]
aggregate_classes.each do |agg_class|
expect(agg_class).to respond_to(:rollup)
expect(agg_class.rollup.to_sql).to include('time_bucket')
expect(agg_class.rollup.to_sql).to include('count(*) as total')
end
end

it 'defines time-based scopes for aggregates' do
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth]
aggregate_scopes = [:total_downloads, :downloads_by_gem, :downloads_by_version]

aggregate_scopes.each do |scope|
aggregate_classes.each do |agg_class|
expect(agg_class).to respond_to(scope)
end
end
expect(test_class::TotalDownloadsPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"downloads\" GROUP BY 1")
expect(test_class::TotalDownloadsPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, sum(total) as total FROM \"total_downloads_per_day\" GROUP BY 1")
expect(test_class::TotalDownloadsPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, sum(total) as total FROM \"total_downloads_per_hour\" GROUP BY 1")
expect(test_class::TotalDownloadsPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, sum(total) as total FROM \"total_downloads_per_minute\" GROUP BY 1")

expect(test_class::DownloadsByVersionPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, gem_name, gem_version, count(*) as total FROM \"downloads\" GROUP BY 1, \"downloads\".\"gem_name\", \"downloads\".\"gem_version\"")
expect(test_class::DownloadsByVersionPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_day\" GROUP BY 1, \"downloads_by_version_per_day\".\"gem_name\", \"downloads_by_version_per_day\".\"gem_version\"")
expect(test_class::DownloadsByVersionPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_hour\" GROUP BY 1, \"downloads_by_version_per_hour\".\"gem_name\", \"downloads_by_version_per_hour\".\"gem_version\"")
expect(test_class::DownloadsByVersionPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, gem_name, gem_version, sum(total) as total FROM \"downloads_by_version_per_minute\" GROUP BY 1, \"downloads_by_version_per_minute\".\"gem_name\", \"downloads_by_version_per_minute\".\"gem_version\"")

expect(test_class::DownloadsByGemPerMinute.base_query.to_sql).to eq("SELECT time_bucket('1 minute', ts) as ts, gem_name, count(*) as total FROM \"downloads\" GROUP BY 1, \"downloads\".\"gem_name\"")
expect(test_class::DownloadsByGemPerMonth.base_query.to_sql).to eq("SELECT time_bucket('1 month', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_day\" GROUP BY 1, \"downloads_by_gem_per_day\".\"gem_name\"")
expect(test_class::DownloadsByGemPerDay.base_query.to_sql).to eq("SELECT time_bucket('1 day', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_hour\" GROUP BY 1, \"downloads_by_gem_per_hour\".\"gem_name\"")
expect(test_class::DownloadsByGemPerHour.base_query.to_sql).to eq("SELECT time_bucket('1 hour', ts) as ts, gem_name, sum(total) as total FROM \"downloads_by_gem_per_minute\" GROUP BY 1, \"downloads_by_gem_per_minute\".\"gem_name\"")
end
end

Expand All @@ -112,34 +111,50 @@ class Download < ActiveRecord::Base
it 'creates materialized views for each aggregate' do
test_class.create_continuous_aggregates

expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_gem_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW IF NOT EXISTS downloads_by_version_per_month/i)
end

it 'sets up refresh policies for each aggregate' do
test_class.create_continuous_aggregates

expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_minutely/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_gem_per_month/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_by_version_per_month/i)
end
end

describe 'refresh policies' do
it 'defines appropriate refresh policies for each timeframe' do
policies = {
minute: { start_offset: "INTERVAL '10 minutes'", end_offset: "INTERVAL '1 minute'", schedule_interval: "INTERVAL '1 minute'" },
hour: { start_offset: "INTERVAL '4 hour'", end_offset: "INTERVAL '1 hour'", schedule_interval: "INTERVAL '1 hour'" },
day: { start_offset: "INTERVAL '3 day'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" },
month: { start_offset: "INTERVAL '3 month'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" }
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" },
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" }
}

policies.each do |timeframe, expected_policy|
actual_policy = test_class.const_get(timeframe).refresh_policy
expect(actual_policy).to eq(expected_policy)
%w[TotalDownloadsPer DownloadsByGemPer DownloadsByVersionPer].each do |klass|
actual_policy = test_class.const_get("#{klass}#{timeframe.to_s.capitalize}").refresh_policy
if actual_policy != expected_policy
puts "klass: #{klass}"
puts "timeframe: #{timeframe}"
puts "actual_policy: #{actual_policy}"
puts "expected_policy: #{expected_policy}"
end
expect(actual_policy).to eq(expected_policy)
end
end
end
end
Expand Down

0 comments on commit d134e98

Please sign in to comment.