Skip to content

Commit db27250

Browse files
Wip, reword
1 parent daf13d7 commit db27250

File tree

4 files changed

+106
-0
lines changed

4 files changed

+106
-0
lines changed

utils/cagg_refresh_producer_consumer/README.md

Whitespace-only changes.

utils/cagg_refresh_producer_consumer/consumer.sql

Whitespace-only changes.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
\i producer.sql
2+
\i consumer.sql
3+
4+
CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental(
5+
job_id => null,
6+
config =>
7+
'{
8+
"end_offset": "7 days",
9+
"start_offset": "2 years",
10+
"continuous_aggregate": "public.stats_five_minutes"
11+
}'
12+
);
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
CREATE SCHEMA IF NOT EXISTS _timescaledb_additional;
2+
3+
CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes (
4+
continuous_aggregate regclass not null,
5+
window_start timestamptz not null,
6+
window_end timestamptz not null CHECK (window_end > window_start),
7+
worker_pid integer
8+
);
9+
10+
COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS
11+
$$This column will be populated with the pid that is currently running this task.
12+
This allows us to keep track of things, as well as allow us to reschedule an item if
13+
a worker_pid is no longer active (for whatever reason)$$;
14+
15+
-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been
16+
-- picked up by any worker.
17+
CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes(
18+
continuous_aggregate,
19+
window_start,
20+
window_end
21+
) WHERE worker_pid IS NULL;
22+
23+
DROP PROCEDURE IF EXISTS _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental;
24+
CREATE PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental (
25+
job_id int,
26+
config jsonb
27+
) LANGUAGE plpgsql AS $BODY$
28+
DECLARE
29+
cagg_regclass regclass := (config ->> 'continuous_aggregate')::regclass;
30+
start_offset INTERVAL := (config ->> 'start_offset')::INTERVAL;
31+
end_offset INTERVAL := (config ->> 'end_offset')::INTERVAL;
32+
increment_size INTERVAL := (config ->> 'increment_size')::INTERVAL;
33+
BEGIN
34+
IF pg_catalog.num_nulls(cagg_regclass, start_offset, end_offset) > 0 THEN
35+
RAISE EXCEPTION 'Invalid configuration for scheduling an incremental refresh: %', config;
36+
END IF;
37+
38+
-- We gather some data on the CAgg itself, its name, and its oid,
39+
-- as well as the size of the increment if it wasn't specified
40+
IF increment_size IS NULL THEN
41+
SELECT
42+
-- We default to the dimension interval_length if not explicitly specified
43+
coalesce(increment_size, interval_length * interval '1 microsecond')
44+
INTO
45+
increment_size
46+
FROM
47+
_timescaledb_catalog.continuous_agg AS cagg
48+
JOIN
49+
_timescaledb_catalog.hypertable AS h ON (h.id = raw_hypertable_id)
50+
JOIN
51+
_timescaledb_catalog.dimension AS dim ON (h.id = dim.hypertable_id)
52+
WHERE
53+
format('%I.%I', user_view_schema, user_view_name)::regclass = cagg_regclass
54+
-- If there are multiple dimensions, we only want the first one
55+
ORDER BY
56+
dim.id ASC
57+
LIMIT
58+
1;
59+
END IF;
60+
61+
DECLARE
62+
start_t timestamptz := now() - start_offset;
63+
end_t timestamptz := now() - end_offset;
64+
65+
incr_start timestamptz := now() - start_offset;
66+
incr_end timestamptz := incr_start;
67+
BEGIN
68+
WHILE incr_end < end_t
69+
LOOP
70+
-- We'd like to align the increments to the interval, for that, we use time_bucket.
71+
incr_end := public.time_bucket(increment_size, incr_start + increment_size);
72+
-- We do however not want to go beyond what the configuration asked for
73+
incr_end := least(end_t, incr_end);
74+
75+
RAISE NOTICE 'Scheduling %, % - %', cagg_regclass::text, incr_start, incr_end;
76+
77+
INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes
78+
(continuous_aggregate, window_start, window_end)
79+
VALUES
80+
(cagg_regclass, incr_start, incr_end)
81+
ON CONFLICT
82+
DO NOTHING;
83+
84+
incr_start := incr_end;
85+
END LOOP;
86+
END;
87+
END;
88+
$BODY$;
89+
90+
COMMENT ON PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental IS
91+
$$schedule_refresh_continuous_aggregate_incremental is a pretty non-intelligent procedure.
92+
For the provided continuous aggregate it will write records into this table:
93+
_timescaledb_additional.incremental_continuous_aggregate_refreshes
94+
Which will then be tasks picked up by task_refresh_continuous_aggregate_incremental_run$$;

0 commit comments

Comments
 (0)