Skip to content

Commit

Permalink
WIP on caggs DSL
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Sep 13, 2024
1 parent fd83fd6 commit 14b9fe4
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 3 deletions.
1 change: 1 addition & 0 deletions lib/timescaledb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require_relative 'timescaledb/application_record'
require_relative 'timescaledb/acts_as_hypertable'
require_relative 'timescaledb/acts_as_hypertable/core'
require_relative 'timescaledb/continuous_aggregates_helper'
require_relative 'timescaledb/connection'
require_relative 'timescaledb/toolkit'
require_relative 'timescaledb/chunk'
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/acts_as_hypertable/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def define_association_scopes
CompressionSettings.where(hypertable_name: table_name)
end

scope :continuous_aggregates, -> do
scope :caggs, -> do
ContinuousAggregates.where(hypertable_name: table_name)
end
end
Expand Down
3 changes: 1 addition & 2 deletions lib/timescaledb/continuous_aggregates.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Timescaledb
class ContinuousAggregate < ::Timescaledb::ApplicationRecord
class ContinuousAggregates < ::Timescaledb::ApplicationRecord
self.table_name = "timescaledb_information.continuous_aggregates"
self.primary_key = 'materialization_hypertable_name'

Expand Down Expand Up @@ -39,5 +39,4 @@ class ContinuousAggregate < ::Timescaledb::ApplicationRecord
end
end
end
ContinuousAggregates = ContinuousAggregate
end
119 changes: 119 additions & 0 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
module Timescaledb
module ContinuousAggregatesHelper
extend ActiveSupport::Concern

class_methods do
def continuous_aggregates(options = {})
@time_column = options[:time_column] || 'ts'
@timeframes = options[:timeframes] || [:minute, :hour, :day, :week, :month, :year]

scopes = options[:scopes] || []
@aggregates = {}

scopes.each do |scope_name|
@aggregates[scope_name] = {
scope_name: scope_name,
select: nil,
group_by: nil,
refresh_policy: options[:refresh_policy] || {}
}
end

# Allow for custom aggregate definitions to override or add to scope-based ones
@aggregates.merge!(options[:aggregates] || {})

define_continuous_aggregate_classes
end

def refresh_aggregates(timeframes = nil)
timeframes ||= @timeframes
@aggregates.each do |aggregate_name, _|
timeframes.each do |timeframe|
klass = const_get("#{aggregate_name}_per_#{timeframe}".classify)
klass.refresh!
end
end
end

def create_continuous_aggregates(with_data: false)
@aggregates.each do |aggregate_name, config|
previous_timeframe = nil
@timeframes.each do |timeframe|
klass = const_get("#{aggregate_name}_per_#{timeframe}".classify)
interval = "'1 #{timeframe.to_s}'"
base_query =
if previous_timeframe
prev_klass = const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
prev_klass
.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{config[:select]}")
.group(1, *config[:group_by])
else
scope = public_send(config[:scope_name])
select_values = scope.select_values.join(', ')
group_values = scope.group_values

config[:select] = select_values.gsub('count(*) as total', 'sum(total) as total')
config[:group_by] = (2...(2 + group_values.size)).map(&:to_s).join(', ')

self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}")
.group(1, *group_values)
end

connection.execute <<~SQL
CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name}
WITH (timescaledb.continuous) AS
#{base_query.to_sql}
#{with_data ? 'WITH DATA' : 'WITH NO DATA'};
SQL

if (policy = klass.refresh_policy)
connection.execute <<~SQL
SELECT add_continuous_aggregate_policy('#{klass.table_name}',
start_offset => INTERVAL '#{policy[:start_offset]}',
end_offset => INTERVAL '#{policy[:end_offset]}',
schedule_interval => INTERVAL '#{policy[:schedule_interval]}');
SQL
end

previous_timeframe = timeframe
end
end
end

private

def define_continuous_aggregate_classes
@aggregates.each do |aggregate_name, config|
@timeframes.each do |timeframe|
_table_name = "#{aggregate_name}_per_#{timeframe}"
class_name = "#{aggregate_name}_per_#{timeframe}".classify
const_set(class_name, Class.new(ActiveRecord::Base) do
extend ActiveModel::Naming

class << self
attr_accessor :config, :timeframe
end

self.table_name = _table_name
self.config = config
self.timeframe = timeframe


def self.refresh!
connection.execute("CALL refresh_continuous_aggregate('#{table_name}', null, null);")
end

def readonly?
true
end

def self.refresh_policy
config[:refresh_policy]&.dig(timeframe)
end
end)
end
end
end
end
end
end
146 changes: 146 additions & 0 deletions spec/timescaledb/continuos_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
require 'spec_helper'

class Download < ActiveRecord::Base
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: 'ts'

scope :total_downloads, -> { select("count(*) as total") }
scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) }
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) }

continuous_aggregates(
time_column: 'ts',
timeframes: [:minute, :hour, :day, :month],
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" },
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" }
}
)
end

RSpec.describe Timescaledb::ContinuousAggregatesHelper do
let(:test_class) do
Download
end

before(:all) do
ActiveRecord::Base.connection.instance_exec do
hypertable_options = {
time_column: 'ts',
chunk_time_interval: '1 day',
compress_segmentby: 'gem_name, gem_version',
compress_orderby: 'ts DESC',
}
create_table(:downloads, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :ts, null: false
t.text :gem_name, :gem_version, null: false
t.jsonb :payload
end
end
end

after(:all) do
ActiveRecord::Base.connection.drop_table :downloads, if_exists: true
end

describe '.continuous_aggregates' do
it 'defines aggregate classes' do
expect(test_class.const_defined?(:TotalDownloadsPerMinute)).to be true
expect(test_class.const_defined?(:TotalDownloadsPerHour)).to be true
expect(test_class.const_defined?(:TotalDownloadsPerDay)).to be true
expect(test_class.const_defined?(:TotalDownloadsPerMonth)).to be true

expect(test_class.const_defined?(:DownloadsByVersionPerMinute)).to be true
expect(test_class.const_defined?(:DownloadsByVersionPerHour)).to be true
expect(test_class.const_defined?(:DownloadsByVersionPerDay)).to be true
expect(test_class.const_defined?(:DownloadsByVersionPerMonth)).to be true

expect(test_class.const_defined?(:DownloadsByGemPerMinute)).to be true
expect(test_class.const_defined?(:DownloadsByGemPerHour)).to be true
expect(test_class.const_defined?(:DownloadsByGemPerDay)).to be true
expect(test_class.const_defined?(:DownloadsByGemPerMonth)).to be true
end

it 'sets up correct table names for aggregates' do
expect(test_class::TotalDownloadsPerMinute.table_name).to eq('total_downloads_per_minute')
expect(test_class::TotalDownloadsPerHour.table_name).to eq('total_downloads_per_hour')
expect(test_class::TotalDownloadsPerDay.table_name).to eq('total_downloads_per_day')
expect(test_class::TotalDownloadsPerMonth.table_name).to eq('total_downloads_per_month')

expect(test_class::DownloadsByVersionPerMinute.table_name).to eq('downloads_by_version_per_minute')
expect(test_class::DownloadsByVersionPerHour.table_name).to eq('downloads_by_version_per_hour')
expect(test_class::DownloadsByVersionPerDay.table_name).to eq('downloads_by_version_per_day')
expect(test_class::DownloadsByVersionPerMonth.table_name).to eq('downloads_by_version_per_month')

expect(test_class::DownloadsByGemPerMinute.table_name).to eq('downloads_by_gem_per_minute')
expect(test_class::DownloadsByGemPerHour.table_name).to eq('downloads_by_gem_per_hour')
expect(test_class::DownloadsByGemPerDay.table_name).to eq('downloads_by_gem_per_day')
expect(test_class::DownloadsByGemPerMonth.table_name).to eq('downloads_by_gem_per_month')
end

it 'defines rollup scope for aggregates' do
test_class.create_continuous_aggregates
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth]
aggregate_classes.each do |agg_class|
expect(agg_class).to respond_to(:rollup)
expect(agg_class.rollup.to_sql).to include('time_bucket')
expect(agg_class.rollup.to_sql).to include('count(*) as total')
end
end

it 'defines time-based scopes for aggregates' do
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth]
aggregate_scopes = [:total_downloads, :downloads_by_gem, :downloads_by_version]

aggregate_scopes.each do |scope|
aggregate_classes.each do |agg_class|
expect(agg_class).to respond_to(scope)
end
end
end
end

describe '.create_continuous_aggregates' do
before do
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original
end

it 'creates materialized views for each aggregate' do
test_class.create_continuous_aggregates

expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_minute/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_month/i)
end

it 'sets up refresh policies for each aggregate' do
test_class.create_continuous_aggregates

expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_minutely/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_hour/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_day/i)
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_month/i)
end
end

describe 'refresh policies' do
it 'defines appropriate refresh policies for each timeframe' do
policies = {
minute: { start_offset: "INTERVAL '10 minutes'", end_offset: "INTERVAL '1 minute'", schedule_interval: "INTERVAL '1 minute'" },
hour: { start_offset: "INTERVAL '4 hour'", end_offset: "INTERVAL '1 hour'", schedule_interval: "INTERVAL '1 hour'" },
day: { start_offset: "INTERVAL '3 day'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" },
month: { start_offset: "INTERVAL '3 month'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" }
}

policies.each do |timeframe, expected_policy|
actual_policy = test_class.const_get(timeframe).refresh_policy
expect(actual_policy).to eq(expected_policy)
end
end
end
end

0 comments on commit 14b9fe4

Please sign in to comment.