Skip to content

Commit 8dbf03d

Browse files
Wip, reword
1 parent daf13d7 commit 8dbf03d

File tree

4 files changed

+127
-0
lines changed

4 files changed

+127
-0
lines changed

utils/cagg_refresh_producer_consumer/README.md

Whitespace-only changes.

utils/cagg_refresh_producer_consumer/consumer.sql

Whitespace-only changes.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
\i producer.sql
2+
\i consumer.sql
3+
4+
CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental(
5+
job_id => null,
6+
config =>
7+
'{
8+
"end_offset": "7 days",
9+
"start_offset": "2 years",
10+
"continuous_aggregate": "public.stats_five_minutes"
11+
}'
12+
);
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
CREATE SCHEMA IF NOT EXISTS _timescaledb_additional;
2+
3+
CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes (
4+
continuous_aggregate regclass not null,
5+
window_start timestamptz not null,
6+
window_end timestamptz not null CHECK (window_end > window_start),
7+
scheduled timestamptz not null default pg_catalog.clock_timestamp(),
8+
worker_pid integer
9+
);
10+
11+
COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS
12+
$$This column will be populated with the pid that is currently running this task.
13+
This allows us to keep track of things, as well as allow us to reschedule an item if
14+
a worker_pid is no longer active (for whatever reason)$$;
15+
16+
COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.scheduled IS
17+
$$To ensure we do actually get to do all the work, the workers will always pick up the
18+
task that was scheduled first. In that way, we have a bit of a priority queue.$$;
19+
20+
-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been
21+
-- picked up by any worker.
22+
CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes(
23+
continuous_aggregate,
24+
window_start,
25+
window_end
26+
) WHERE worker_pid IS NULL;
27+
28+
DROP PROCEDURE IF EXISTS _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental;
29+
CREATE PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental (
30+
job_id int,
31+
config jsonb
32+
) LANGUAGE plpgsql AS $BODY$
33+
DECLARE
34+
cagg_regclass regclass := (config ->> 'continuous_aggregate')::regclass;
35+
start_offset INTERVAL := (config ->> 'start_offset')::INTERVAL;
36+
end_offset INTERVAL := (config ->> 'end_offset')::INTERVAL;
37+
increment_size INTERVAL := (config ->> 'increment_size')::INTERVAL;
38+
BEGIN
39+
IF pg_catalog.num_nulls(cagg_regclass, start_offset, end_offset) > 0 THEN
40+
RAISE EXCEPTION 'Invalid configuration for scheduling an incremental refresh: %', config;
41+
END IF;
42+
43+
-- We gather some data on the CAgg itself, its name, and its oid,
44+
-- as well as the size of the increment if it wasn't specified
45+
IF increment_size IS NULL THEN
46+
SELECT
47+
-- We default to the dimension interval_length if not explicitly specified
48+
coalesce(increment_size, interval_length * interval '1 microsecond')
49+
INTO
50+
increment_size
51+
FROM
52+
_timescaledb_catalog.continuous_agg AS cagg
53+
JOIN
54+
_timescaledb_catalog.hypertable AS h ON (h.id = raw_hypertable_id)
55+
JOIN
56+
_timescaledb_catalog.dimension AS dim ON (h.id = dim.hypertable_id)
57+
WHERE
58+
format('%I.%I', user_view_schema, user_view_name)::regclass = cagg_regclass
59+
-- If there are multiple dimensions, we only want the first one
60+
ORDER BY
61+
dim.id ASC
62+
LIMIT
63+
1;
64+
END IF;
65+
66+
DECLARE
67+
start_t timestamptz := now() - start_offset;
68+
end_t timestamptz := now() - end_offset;
69+
70+
incr_start timestamptz := public.time_bucket(increment_size, now() - start_offset);
71+
incr_end timestamptz := incr_start;
72+
73+
count bigint := 0;
74+
added bigint := 0;
75+
hit bool := false;
76+
BEGIN
77+
WHILE incr_end < end_t
78+
LOOP
79+
incr_end := public.time_bucket(increment_size, incr_start + increment_size);
80+
81+
INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes
82+
(continuous_aggregate, window_start, window_end)
83+
VALUES
84+
(cagg_regclass, incr_start, incr_end)
85+
ON CONFLICT
86+
DO NOTHING
87+
RETURNING
88+
true
89+
INTO
90+
hit;
91+
92+
count := count + 1;
93+
IF hit THEN
94+
added := added + 1;
95+
END IF;
96+
97+
incr_start := incr_end;
98+
END LOOP;
99+
100+
RAISE NOTICE
101+
'Scheduled incremental refreshes for % (% - %). Tasks evaluated: %, newly inserted: %',
102+
cagg_regclass::text,
103+
start_t,
104+
end_t,
105+
count,
106+
added;
107+
END;
108+
END;
109+
$BODY$;
110+
111+
COMMENT ON PROCEDURE _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental IS
112+
$$schedule_refresh_continuous_aggregate_incremental is a pretty non-intelligent procedure.
113+
For the provided continuous aggregate it will write records into this table:
114+
_timescaledb_additional.incremental_continuous_aggregate_refreshes
115+
Which will then be tasks picked up by task_refresh_continuous_aggregate_incremental_run$$;

0 commit comments

Comments
 (0)