-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Continuous Aggregates DSL #73
Merged
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
14b9fe4
WIP on caggs DSL
jonatas 9339761
Decouple aggregation concepts
jonatas 6616a9a
Fix caggs references
jonatas e414595
Fix renaming caggs
jonatas 24f6a51
Rename typo
jonatas 52a3123
Drop continuous aggregates helper
jonatas def34c9
Try to fix spec: undefined method `timestamptz' for #<ActiveRecord::…
jonatas b8fdb3f
Rollup from previous scope as a nesting scope
jonatas 127e69d
Remove example const after load test
jonatas 3fd3287
Document the new macro
jonatas b970236
Add aggregations with macro
jonatas 247ef3b
Document SQL output
jonatas 93b4465
Fix rollup for ohlc pattern
jonatas 817caa8
Make inheritance use the hypertable as a parent class
jonatas 6a99928
Fix candlestick for using continuous_aggregates
jonatas a31f296
Remove experimental references
jonatas 06d97e0
Remove download references from specs
jonatas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
module Timescaledb | ||
module ContinuousAggregatesHelper | ||
extend ActiveSupport::Concern | ||
|
||
class_methods do | ||
def continuous_aggregates(options = {}) | ||
@time_column = options[:time_column] || 'ts' | ||
@timeframes = options[:timeframes] || [:minute, :hour, :day, :week, :month, :year] | ||
|
||
scopes = options[:scopes] || [] | ||
@aggregates = {} | ||
|
||
scopes.each do |scope_name| | ||
@aggregates[scope_name] = { | ||
scope_name: scope_name, | ||
select: nil, | ||
group_by: nil, | ||
refresh_policy: options[:refresh_policy] || {} | ||
} | ||
end | ||
|
||
# Allow for custom aggregate definitions to override or add to scope-based ones | ||
@aggregates.merge!(options[:aggregates] || {}) | ||
|
||
define_continuous_aggregate_classes | ||
end | ||
|
||
def refresh_aggregates(timeframes = nil) | ||
timeframes ||= @timeframes | ||
@aggregates.each do |aggregate_name, _| | ||
timeframes.each do |timeframe| | ||
klass = const_get("#{aggregate_name}_per_#{timeframe}".classify) | ||
klass.refresh! | ||
end | ||
end | ||
end | ||
|
||
def create_continuous_aggregates(with_data: false) | ||
@aggregates.each do |aggregate_name, config| | ||
previous_timeframe = nil | ||
@timeframes.each do |timeframe| | ||
klass = const_get("#{aggregate_name}_per_#{timeframe}".classify) | ||
interval = "'1 #{timeframe.to_s}'" | ||
base_query = | ||
if previous_timeframe | ||
prev_klass = const_get("#{aggregate_name}_per_#{previous_timeframe}".classify) | ||
prev_klass | ||
.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{config[:select]}") | ||
.group(1, *config[:group_by]) | ||
else | ||
scope = public_send(config[:scope_name]) | ||
select_values = scope.select_values.join(', ') | ||
group_values = scope.group_values | ||
|
||
config[:select] = select_values.gsub('count(*) as total', 'sum(total) as total') | ||
config[:group_by] = (2...(2 + group_values.size)).map(&:to_s).join(', ') | ||
|
||
self.select("time_bucket(#{interval}, #{@time_column}) as #{@time_column}, #{select_values}") | ||
.group(1, *group_values) | ||
end | ||
|
||
connection.execute <<~SQL | ||
CREATE MATERIALIZED VIEW IF NOT EXISTS #{klass.table_name} | ||
WITH (timescaledb.continuous) AS | ||
#{base_query.to_sql} | ||
#{with_data ? 'WITH DATA' : 'WITH NO DATA'}; | ||
SQL | ||
|
||
if (policy = klass.refresh_policy) | ||
connection.execute <<~SQL | ||
SELECT add_continuous_aggregate_policy('#{klass.table_name}', | ||
start_offset => INTERVAL '#{policy[:start_offset]}', | ||
end_offset => INTERVAL '#{policy[:end_offset]}', | ||
schedule_interval => INTERVAL '#{policy[:schedule_interval]}'); | ||
SQL | ||
end | ||
|
||
previous_timeframe = timeframe | ||
end | ||
end | ||
end | ||
|
||
private | ||
|
||
def define_continuous_aggregate_classes | ||
@aggregates.each do |aggregate_name, config| | ||
@timeframes.each do |timeframe| | ||
_table_name = "#{aggregate_name}_per_#{timeframe}" | ||
class_name = "#{aggregate_name}_per_#{timeframe}".classify | ||
const_set(class_name, Class.new(ActiveRecord::Base) do | ||
extend ActiveModel::Naming | ||
|
||
class << self | ||
attr_accessor :config, :timeframe | ||
end | ||
|
||
self.table_name = _table_name | ||
self.config = config | ||
self.timeframe = timeframe | ||
|
||
|
||
def self.refresh! | ||
connection.execute("CALL refresh_continuous_aggregate('#{table_name}', null, null);") | ||
end | ||
|
||
def readonly? | ||
true | ||
end | ||
|
||
def self.refresh_policy | ||
config[:refresh_policy]&.dig(timeframe) | ||
end | ||
end) | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
require 'spec_helper' | ||
|
||
class Download < ActiveRecord::Base | ||
include Timescaledb::ContinuousAggregatesHelper | ||
|
||
acts_as_hypertable time_column: 'ts' | ||
|
||
scope :total_downloads, -> { select("count(*) as total") } | ||
scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) } | ||
scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) } | ||
|
||
continuous_aggregates( | ||
time_column: 'ts', | ||
timeframes: [:minute, :hour, :day, :month], | ||
scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version], | ||
refresh_policy: { | ||
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, | ||
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, | ||
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" }, | ||
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" } | ||
} | ||
) | ||
end | ||
|
||
RSpec.describe Timescaledb::ContinuousAggregatesHelper do | ||
let(:test_class) do | ||
Download | ||
end | ||
|
||
before(:all) do | ||
ActiveRecord::Base.connection.instance_exec do | ||
hypertable_options = { | ||
time_column: 'ts', | ||
chunk_time_interval: '1 day', | ||
compress_segmentby: 'gem_name, gem_version', | ||
compress_orderby: 'ts DESC', | ||
} | ||
create_table(:downloads, id: false, hypertable: hypertable_options) do |t| | ||
t.timestamptz :ts, null: false | ||
t.text :gem_name, :gem_version, null: false | ||
t.jsonb :payload | ||
end | ||
end | ||
end | ||
|
||
after(:all) do | ||
ActiveRecord::Base.connection.drop_table :downloads, if_exists: true | ||
end | ||
|
||
describe '.continuous_aggregates' do | ||
it 'defines aggregate classes' do | ||
expect(test_class.const_defined?(:TotalDownloadsPerMinute)).to be true | ||
expect(test_class.const_defined?(:TotalDownloadsPerHour)).to be true | ||
expect(test_class.const_defined?(:TotalDownloadsPerDay)).to be true | ||
expect(test_class.const_defined?(:TotalDownloadsPerMonth)).to be true | ||
|
||
expect(test_class.const_defined?(:DownloadsByVersionPerMinute)).to be true | ||
expect(test_class.const_defined?(:DownloadsByVersionPerHour)).to be true | ||
expect(test_class.const_defined?(:DownloadsByVersionPerDay)).to be true | ||
expect(test_class.const_defined?(:DownloadsByVersionPerMonth)).to be true | ||
|
||
expect(test_class.const_defined?(:DownloadsByGemPerMinute)).to be true | ||
expect(test_class.const_defined?(:DownloadsByGemPerHour)).to be true | ||
expect(test_class.const_defined?(:DownloadsByGemPerDay)).to be true | ||
expect(test_class.const_defined?(:DownloadsByGemPerMonth)).to be true | ||
end | ||
|
||
it 'sets up correct table names for aggregates' do | ||
expect(test_class::TotalDownloadsPerMinute.table_name).to eq('total_downloads_per_minute') | ||
expect(test_class::TotalDownloadsPerHour.table_name).to eq('total_downloads_per_hour') | ||
expect(test_class::TotalDownloadsPerDay.table_name).to eq('total_downloads_per_day') | ||
expect(test_class::TotalDownloadsPerMonth.table_name).to eq('total_downloads_per_month') | ||
|
||
expect(test_class::DownloadsByVersionPerMinute.table_name).to eq('downloads_by_version_per_minute') | ||
expect(test_class::DownloadsByVersionPerHour.table_name).to eq('downloads_by_version_per_hour') | ||
expect(test_class::DownloadsByVersionPerDay.table_name).to eq('downloads_by_version_per_day') | ||
expect(test_class::DownloadsByVersionPerMonth.table_name).to eq('downloads_by_version_per_month') | ||
|
||
expect(test_class::DownloadsByGemPerMinute.table_name).to eq('downloads_by_gem_per_minute') | ||
expect(test_class::DownloadsByGemPerHour.table_name).to eq('downloads_by_gem_per_hour') | ||
expect(test_class::DownloadsByGemPerDay.table_name).to eq('downloads_by_gem_per_day') | ||
expect(test_class::DownloadsByGemPerMonth.table_name).to eq('downloads_by_gem_per_month') | ||
end | ||
|
||
it 'defines rollup scope for aggregates' do | ||
test_class.create_continuous_aggregates | ||
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] | ||
aggregate_classes.each do |agg_class| | ||
expect(agg_class).to respond_to(:rollup) | ||
expect(agg_class.rollup.to_sql).to include('time_bucket') | ||
expect(agg_class.rollup.to_sql).to include('count(*) as total') | ||
end | ||
end | ||
|
||
it 'defines time-based scopes for aggregates' do | ||
aggregate_classes = [test_class::TotalDownloadsPerMinute, test_class::TotalDownloadsPerHour, test_class::TotalDownloadsPerDay, test_class::TotalDownloadsPerMonth] | ||
aggregate_scopes = [:total_downloads, :downloads_by_gem, :downloads_by_version] | ||
|
||
aggregate_scopes.each do |scope| | ||
aggregate_classes.each do |agg_class| | ||
expect(agg_class).to respond_to(scope) | ||
end | ||
end | ||
end | ||
end | ||
|
||
describe '.create_continuous_aggregates' do | ||
before do | ||
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original | ||
end | ||
|
||
it 'creates materialized views for each aggregate' do | ||
test_class.create_continuous_aggregates | ||
|
||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_minute/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_hour/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_day/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/CREATE MATERIALIZED VIEW.*downloads_total_downloads_per_month/i) | ||
end | ||
|
||
it 'sets up refresh policies for each aggregate' do | ||
test_class.create_continuous_aggregates | ||
|
||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_minutely/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_hour/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_day/i) | ||
expect(ActiveRecord::Base.connection).to have_received(:execute).with(/add_continuous_aggregate_policy.*downloads_total_downloads_per_month/i) | ||
end | ||
end | ||
|
||
describe 'refresh policies' do | ||
it 'defines appropriate refresh policies for each timeframe' do | ||
policies = { | ||
minute: { start_offset: "INTERVAL '10 minutes'", end_offset: "INTERVAL '1 minute'", schedule_interval: "INTERVAL '1 minute'" }, | ||
hour: { start_offset: "INTERVAL '4 hour'", end_offset: "INTERVAL '1 hour'", schedule_interval: "INTERVAL '1 hour'" }, | ||
day: { start_offset: "INTERVAL '3 day'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" }, | ||
month: { start_offset: "INTERVAL '3 month'", end_offset: "INTERVAL '1 day'", schedule_interval: "INTERVAL '1 day'" } | ||
} | ||
|
||
policies.each do |timeframe, expected_policy| | ||
actual_policy = test_class.const_get(timeframe).refresh_policy | ||
expect(actual_policy).to eq(expected_policy) | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got some mad issue with this name, still not sure how to handle the naming clash. Just renaming this one as seems less prioritized.