Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions utils/incremental_refresh/example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Create a custom job
```sql
SELECT add_job(
'_timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner',
'1 minute',
config => '{"enable_tiered_reads": true}');
```

Add tasks
```sql
call _timescaledb_additional.schedule_cagg_refresh('%_fenceline_1hour', lower_bound=>'2024-01-01', dry_run=>false);
NOTICE: Scheduled incremental refreshes for hf_dss.g_data_fenceline_1hour (2024-01-01 00:00:00+00 - 2024-10-14 00:00:00+00). Tasks evaluated: 30, newly inserted: 30
```

Check the status
```sql
TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes;
```

```sql
SELECT
count(finished) finished,
SUM(CASE WHEN finished IS NULL THEN 1 ELSE 0 END) left
FROM _timescaledb_additional.incremental_continuous_aggregate_refreshes;
finished | left
----------+------
161 | 20
(1 row)
```
140 changes: 140 additions & 0 deletions utils/incremental_refresh/util.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
-- Calculate the time bucket using the continuous aggregate bucket function
-- configuration
CREATE OR REPLACE FUNCTION _timescaledb_additional.cagg_time_bucket(INTEGER, TIMESTAMPTZ)
RETURNS TIMESTAMPTZ AS
$$
DECLARE
params TEXT[];
stmt TEXT;
r RECORD;
result TIMESTAMPTZ;
BEGIN
SELECT * INTO r FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE mat_hypertable_id = $1;

IF NOT FOUND THEN
RAISE EXCEPTION 'Continuous Aggregate % not found', $1;
END IF;

params := array_append(params, format('%I => %L::timestamptz', 'ts', $2));

IF r.bucket_width IS NOT NULL THEN
params := array_append(params, format('%I => %L::interval', 'bucket_width', r.bucket_width));
END IF;

IF r.bucket_origin IS NOT NULL THEN
params := array_append(params, format('%I => %L::timestamptz', 'origin', r.bucket_origin));
END IF;

IF r.bucket_offset IS NOT NULL THEN
params := array_append(params, format('%I => %L::interval', 'offset', r.bucket_offset));
END IF;

IF r.bucket_timezone IS NOT NULL THEN
params := array_append(params, format('%I => %L::text', 'timezone', r.bucket_timezone));
END IF;

stmt := format('SELECT time_bucket(%s)', array_to_string(params, ', '));
RAISE DEBUG '%', stmt;

EXECUTE stmt
INTO result;

RETURN result;
END;
$$
LANGUAGE plpgsql STABLE;

-- Discover continuous aggregates built on top of tiered hypertables and
-- schedule their refresh
CREATE OR REPLACE PROCEDURE _timescaledb_additional.schedule_cagg_refresh(
name_mask TEXT DEFAULT '%',
nbuckets INTEGER DEFAULT 5,
dry_run BOOLEAN DEFAULT true,
priority INTEGER DEFAULT 100
) AS
$$
DECLARE
rec RECORD;
BEGIN
FOR rec IN (
-- Find caggs built on top of tiered hypertables
WITH ranges AS (
SELECT
cagg.mat_hypertable_id,
ht.schema_name,
ht.table_name,
(
SELECT column_type AS dim_type
FROM _timescaledb_catalog.dimension d
WHERE d.hypertable_id = ht.id
ORDER BY d.id ASC LIMIT 1
) AS dim_type,
user_view_schema,
user_view_name,
bf.bucket_width::interval AS bucket_width,
_timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.start) AS global_start,
_timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.end) + (bf.bucket_width::interval + '1 millisecond'::interval) AS global_end
FROM _timescaledb_catalog.continuous_agg cagg
JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id)
JOIN _timescaledb_catalog.hypertable ht ON (ht.id = cagg.raw_hypertable_id)
JOIN _osm_catalog.table_map tm ON (tm.hypertable_name = ht.table_name AND tm.hypertable_schema = ht.schema_name)
JOIN (
-- the time window of tiered data
SELECT
osm_table_id,
_osm_internal.dimension_pg_usec_to_timestamp(min(range_start)) as start,
_osm_internal.dimension_pg_usec_to_timestamp(max(range_end)) as end
FROM _osm_catalog.chunk_map
JOIN _osm_catalog.chunk_object_map USING (chunk_id)
GROUP BY osm_table_id
) AS range USING (osm_table_id)
)
SELECT
mat_hypertable_id,
dim_type,
user_view_schema,
user_view_name,
global_start,
global_end,
start,
start + (bucket_width * 5) AS end,
(extract(epoch from start) * 1000000)::bigint AS invalidation_start,
(extract(epoch from (start + (bucket_width * nbuckets))) * 1000000)::bigint AS invalidation_end
FROM
ranges,
-- Split ranges with 5 times the bucket width
LATERAL generate_series(ranges.global_start, ranges.global_end, (bucket_width * nbuckets)) AS start
Copy link
Member Author

@zilder zilder Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been looking through the changes, and I think one thing is missing from my original code. The user has this weird data points in year 1917 or so (probably inserted by mistake). And apparently they want to keep it that way. If we generate ranges with this query it would create all the ranges between 1917 and 2024, while they only have few data points in pre-2020. In my original code I had this join to only generate the ranges that intersect with existing tiered chunks:

        FROM timescaledb_osm.tiered_chunks ch
        JOIN _timescaledb_additional.generate_increments(start_t, end_t, increment_size) AS i
            ON tstzrange(i.incr_start, i.incr_end, '[)') && tstzrange(ch.range_start, ch.range_end, '[)')

WHERE user_view_name LIKE name_mask
)
LOOP
-- skip non-timestamptz based caggs
IF rec.dim_type != 'TIMESTAMPTZ'::REGTYPE THEN
RAISE NOTICE 'SKIPPING ''%.%'' (dim type ''%''): %-%',
rec.user_view_schema, rec.user_view_name,
rec.dim_type, rec.start, rec.end;
CONTINUE;
END IF;

IF dry_run THEN
-- do nothing on dry run
RAISE NOTICE 'refresh ''%.%'': %-%',
rec.user_view_schema, rec.user_view_name,
rec.start, rec.end;
ELSE
-- insert an invalidation record from
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
VALUES (
rec.mat_hypertable_id,
rec.invalidation_start,
rec.invalidation_end
);

-- schedule the refresh for given interval
INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes
(continuous_aggregate, window_start, window_end, priority)
VALUES
(cagg_regclass, rec.start, rec.end, priority);
END IF;
END LOOP;
END
$$ LANGUAGE plpgsql;
181 changes: 181 additions & 0 deletions utils/incremental_refresh/worker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
CREATE SCHEMA IF NOT EXISTS _timescaledb_additional;

CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes (
id bigint GENERATED ALWAYS AS IDENTITY,
continuous_aggregate regclass not null,
window_start timestamptz not null,
window_end timestamptz not null CHECK (window_end > window_start),
scheduled timestamptz not null default pg_catalog.clock_timestamp(),
priority int not null default 1,
started timestamptz,
finished timestamptz,
worker_pid integer,
primary key (id),
CONSTRAINT incr_cagg_refreshes_workers_have_started CHECK (num_nulls(worker_pid, started) IN (0, 2))
);

COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS
$$This column will be populated with the pid that is currently running this task.
This allows us to keep track of things, as well as allow us to reschedule an item if
a worker_pid is no longer active (for whatever reason)$$;

COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.scheduled IS
$$To ensure we do actually get to do all the work, the workers will always pick up the
task that has the lowest priority, and then which one was scheduled first.
In that way, we have a bit of a priority queue.$$;

-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been
-- picked up by any worker.
CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes(
continuous_aggregate,
window_start,
window_end
) WHERE worker_pid IS NULL AND finished IS NULL;

CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_find_first_work_item_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes(
priority,
scheduled
) WHERE worker_pid IS NULL;

CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_active_workers_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes(
worker_pid
) WHERE worker_pid IS NOT NULL;


DROP PROCEDURE IF EXISTS _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner;
CREATE OR REPLACE PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner (
job_id int,
config jsonb
) LANGUAGE plpgsql AS $BODY$
DECLARE
max_runtime interval := (config->>'max_runtime')::interval;
enable_tiered boolean := (config->>'enable_tiered_reads')::boolean;
old_enable_tiered_reads boolean;
global_start_time timestamptz := pg_catalog.clock_timestamp();
global_end_time timestamptz;
BEGIN
old_enable_tiered_reads := current_setting('timescaledb.enable_tiered_reads')::boolean;

IF enable_tiered IS NOT NULL THEN
IF enable_tiered THEN
SET timescaledb.enable_tiered_reads = 'on';
ELSE
SET timescaledb.enable_tiered_reads = 'off';
END IF;
END IF;

max_runtime := coalesce(max_runtime, interval '6 hours');
global_end_time := global_start_time + max_runtime;

WHILE pg_catalog.clock_timestamp() < global_end_time LOOP
SET LOCAL lock_timeout TO '1 min';

-- Prevent a hot loop
PERFORM pg_catalog.pg_sleep(0.2);

DECLARE
p_id bigint;
p_cagg regclass;
p_window_start timestamptz;
p_window_end timestamptz;
p_start_time timestamptz;
p_end_time timestamptz;
BEGIN
SELECT
q.id,
q.continuous_aggregate,
q.window_start,
q.window_end
INTO
p_id,
p_cagg,
p_window_start,
p_window_end
FROM
_timescaledb_additional.incremental_continuous_aggregate_refreshes AS q
WHERE
q.worker_pid IS NULL AND q.finished IS NULL
-- We don't want multiple workers to be active on the same range,
-- as ranges can differ in size, we'll use the overlap (&&) operator
-- to ensure we're good.
AND NOT EXISTS (
SELECT
FROM
_timescaledb_additional.incremental_continuous_aggregate_refreshes AS a
WHERE
a.worker_pid IS NOT NULL
AND a.finished IS NOT NULL
AND q.continuous_aggregate = a.continuous_aggregate
AND tstzrange(q.window_start, q.window_end, '[)') && tstzrange(a.window_start, a.window_end, '[)')
)
ORDER BY
q.priority ASC,
q.scheduled ASC
FOR NO KEY UPDATE SKIP LOCKED
LIMIT
1;

IF p_cagg IS NULL THEN
COMMIT;
-- There are no items in the queue that we can currently process. We therefore
-- sleep a while before continuing.
-- PERFORM pg_catalog.pg_sleep(3.0);
-- CONTINUE;
EXIT;
END IF;

UPDATE
_timescaledb_additional.incremental_continuous_aggregate_refreshes
SET
worker_pid = pg_backend_pid(),
started = clock_timestamp()
WHERE
id = p_id;
-- We need to ensure that all other workers now know we are working on this
-- task. We therefore need to commit once now.
COMMIT;

-- We take out a row-level-lock to signal to concurrent workers that *we*
-- are working on it. By taking this type of lock, we can clean up
-- this table from different tasks: They can update/delete these rows
-- if no active worker is working on them, and no lock is established.
PERFORM
FROM
_timescaledb_additional.incremental_continuous_aggregate_refreshes
WHERE
id = p_id
FOR NO KEY UPDATE;

CALL public.refresh_continuous_aggregate(
p_cagg,
p_window_start,
p_window_end
);

UPDATE
_timescaledb_additional.incremental_continuous_aggregate_refreshes
SET
finished = clock_timestamp()
WHERE
id = p_id;
COMMIT;

RAISE NOTICE
'% - Processing %, (% - %)',
pg_catalog.to_char(pg_catalog.clock_timestamp(), 'YYYY-MM-DD HH24:MI:SS.FF3OF'),
p_cagg,
p_window_start,
p_window_end;
END;
END LOOP;

IF old_enable_tiered_reads THEN
SET timescaledb.enable_tiered_reads = 'on';
ELSE
SET timescaledb.enable_tiered_reads = 'off';
END IF;

RAISE NOTICE 'Shutting down `task_refresh_continuous_aggregate_incremental_runner`';
END;
$BODY$;