diff --git a/utils/incremental_refresh/consumer.sql b/utils/incremental_refresh/consumer.sql new file mode 100644 index 0000000..f083e30 --- /dev/null +++ b/utils/incremental_refresh/consumer.sql @@ -0,0 +1,195 @@ +DROP PROCEDURE IF EXISTS _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner; +CREATE PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner ( + job_id int, + config jsonb +) LANGUAGE plpgsql AS $BODY$ +DECLARE + max_runtime interval := (config->>'max_runtime')::interval; + enable_tiered boolean := (config->>'enable_tiered_reads')::boolean; + global_start_time timestamptz := pg_catalog.clock_timestamp(); + global_end_time timestamptz; + app_name text; +BEGIN + max_runtime := coalesce(max_runtime, interval '1 hours'); + global_end_time := global_start_time + max_runtime; + + -- Cleanup lost tasks + UPDATE + _timescaledb_additional.incremental_continuous_aggregate_refreshes + SET + worker_pid = NULL, + started = NULL + WHERE + started IS NOT NULL + AND finished IS NULL + AND (NOT EXISTS (SELECT FROM pg_stat_activity WHERE pid = worker_pid) OR worker_pid IS NULL); + + WHILE pg_catalog.clock_timestamp() < global_end_time LOOP + SET search_path TO 'pg_catalog,pg_temp'; + SET lock_timeout TO '3s'; + SET application_name TO 'cagg incremental refresh consumer - idle'; + + -- Prevent a hot loop + -- PERFORM pg_catalog.pg_sleep(0.1); + + -- By serializing the picking items from the queue, we prevent some race conditions. + -- LOCK TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes + -- IN ACCESS EXCLUSIVE MODE; + + SET application_name TO 'cagg incremental refresh consumer - retrieving new task'; + + DECLARE + p_id bigint; + p_cagg regclass; + p_window_start timestamptz; + p_window_end timestamptz; + p_start_time timestamptz; + p_end_time timestamptz; + p_mat_hypertable_id int; + p_job_id int; + BEGIN + SELECT + q.id, + q.continuous_aggregate, + q.window_start, + q.window_end, + cagg.mat_hypertable_id, + coalesce(jobs.job_id, -1) + INTO + p_id, + p_cagg, + p_window_start, + p_window_end, + p_mat_hypertable_id, + p_job_id + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes AS q + JOIN + pg_catalog.pg_class AS pc ON (q.continuous_aggregate=oid) + JOIN + pg_catalog.pg_namespace AS pn ON (relnamespace=pn.oid) + JOIN + _timescaledb_catalog.continuous_agg AS cagg ON (cagg.user_view_schema=nspname AND cagg.user_view_name=pc.relname) + JOIN + _timescaledb_catalog.hypertable AS h ON (cagg.mat_hypertable_id=h.id) + LEFT JOIN + timescaledb_information.jobs ON (proc_name='policy_refresh_continuous_aggregate' AND proc_schema='_timescaledb_functions' AND jobs.config->>'mat_hypertable_id' = cagg.mat_hypertable_id::text) + WHERE + q.worker_pid IS NULL AND q.finished IS NULL + -- We don't want multiple workers to be active on the same CAgg, + AND NOT EXISTS ( + SELECT + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes AS a + JOIN + pg_catalog.pg_stat_activity ON (pid=worker_pid) + WHERE + a.finished IS NULL + -- If pids ever get recycled (container/machine restart), + -- this filter ensures we ignore the old ones + AND started > backend_start + AND q.continuous_aggregate = a.continuous_aggregate + ) + ORDER BY + q.priority ASC, + q.scheduled ASC + FOR UPDATE OF q SKIP LOCKED + LIMIT + 1; + + IF p_cagg IS NULL THEN + COMMIT; + -- There are no items in the queue that we can currently process. We therefore + -- sleep a while longer before attempting to try again. + IF global_end_time - interval '30 seconds' < now () THEN + EXIT; + ELSE + SET application_name TO 'cagg incremental refresh consumer - waiting for next task'; + -- PERFORM pg_catalog.pg_sleep(0.001); + CONTINUE; + END IF; + END IF; + + UPDATE + _timescaledb_additional.incremental_continuous_aggregate_refreshes + SET + worker_pid = pg_backend_pid(), + started = clock_timestamp() + WHERE + id = p_id; + + -- Inform others of what we are doing. + app_name := ' refresh ' || p_window_start::date; + IF p_window_end::date != p_window_start::date THEN + app_name := app_name || ' ' || p_window_end::date; + ELSE + app_name := app_name || to_char(p_window_start, 'THH24:MI'); + END IF; + IF length(app_name) + length(p_cagg::text) > 63 THEN + app_name := '...' || right(p_cagg::text, 60 - length(app_name)) || app_name; + ELSE + app_name := p_cagg::text || app_name; + END IF; + PERFORM pg_catalog.set_config( + 'application_name', + app_name, + false + ); + + RAISE DEBUG + '% - Processing %, (% - %)', + pg_catalog.to_char(pg_catalog.clock_timestamp(), 'YYYY-MM-DD HH24:MI:SS.FF3OF'), + p_cagg, + p_window_start, + p_window_end; + + -- We need to ensure that all other workers now know we are working on this + -- task. We therefore need to commit once now. This also releases our + -- access exclusive lock on the queue table. + COMMIT; + + -- We take out a row-level-lock to signal to concurrent workers that *we* + -- are working on it. By taking this type of lock, we can clean up + -- this table from different tasks: They can update/delete these rows + -- if no active worker is working on them, and no lock is established. + PERFORM + FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes + WHERE + id = p_id + FOR NO KEY UPDATE; + + IF enable_tiered IS NOT NULL THEN + PERFORM pg_catalog.set_config( + 'timescaledb.enable_tiered_reads', + enable_tiered::text, + false + ); + END IF; + + CALL public.refresh_continuous_aggregate( + p_cagg, + p_window_start, + p_window_end + ); + + UPDATE + _timescaledb_additional.incremental_continuous_aggregate_refreshes + SET + finished = clock_timestamp() + WHERE + id = p_id; + COMMIT; + + RESET timescaledb.enable_tiered_reads; + SET application_name TO 'cagg incremental refresh consumer - idle'; + END; + END LOOP; + + -- @TODO: Check there's no range to be migrated and disable the job + + RAISE NOTICE 'Shutting down worker, as we exceeded our maximum runtime (%)', max_runtime; +END; +$BODY$; + +GRANT EXECUTE ON PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner TO pg_database_owner; diff --git a/utils/incremental_refresh/example.md b/utils/incremental_refresh/example.md new file mode 100644 index 0000000..3b36e5a --- /dev/null +++ b/utils/incremental_refresh/example.md @@ -0,0 +1,43 @@ +Create a custom job +```sql +SELECT add_job( + '_timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner', + '1 minute', + config => '{"enable_tiered_reads": true}'); +``` + +Produce ranges to be refreshed +```sql +CALL _timescaledb_additional.schedule_osm_cagg_refresh('g_data_16_41_1minute'); -- An specific CAgg +CALL _timescaledb_additional.schedule_osm_cagg_refresh(); -- All CAggs +``` + +Check the queue status +```sql +SELECT + continuous_aggregate, + count(*) FILTER (WHERE started IS NULL) AS "not started", + count(*) FILTER (WHERE started IS NOT NULL AND finished IS NULL) AS "started", + count(*) FILTER (WHERE started IS NOT NULL AND finished IS NOT NULL) AS "finished" +FROM + _timescaledb_additional.incremental_continuous_aggregate_refreshes +GROUP BY + continuous_aggregate +ORDER BY + continuous_aggregate; +``` + +Check the jobs execution +```sql +SELECT + clock_timestamp()::timestamptz(0), + pid, + wait_event, + application_name, + (now() - xact_start)::interval(0) AS xact_age +FROM + pg_stat_activity +WHERE + state <> 'idle' + AND application_name LIKE '%refresh%'; +``` diff --git a/utils/incremental_refresh/producer.sql b/utils/incremental_refresh/producer.sql new file mode 100644 index 0000000..728d5b5 --- /dev/null +++ b/utils/incremental_refresh/producer.sql @@ -0,0 +1,284 @@ +CREATE SCHEMA IF NOT EXISTS _timescaledb_additional; + +CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes ( + id bigint GENERATED ALWAYS AS IDENTITY, + continuous_aggregate regclass not null, + window_start timestamptz not null, + window_end timestamptz not null CHECK (window_end > window_start), + scheduled timestamptz not null default pg_catalog.clock_timestamp(), + priority int not null default 1, + started timestamptz, + finished timestamptz, + worker_pid integer, + primary key (id), + CONSTRAINT incr_cagg_refreshes_workers_have_started CHECK (num_nulls(worker_pid, started) IN (0, 2)) +); + +GRANT USAGE ON SCHEMA _timescaledb_additional TO public; +REVOKE ALL ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes FROM PUBLIC; +GRANT SELECT ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes TO public; +GRANT ALL ON TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes TO pg_database_owner; + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.worker_pid IS +$$This column will be populated with the pid that is currently running this task. +This allows us to keep track of things, as well as allow us to reschedule an item if +a worker_pid is no longer active (for whatever reason)$$; + +COMMENT ON COLUMN _timescaledb_additional.incremental_continuous_aggregate_refreshes.scheduled IS +$$To ensure we do actually get to do all the work, the workers will always pick up the +task that has the lowest priority, and then which one was scheduled first. +In that way, we have a bit of a priority queue.$$; + +-- We want to avoid scheduling the same thing twice, for those tasks that have not yet been +-- picked up by any worker. +CREATE UNIQUE INDEX IF NOT EXISTS incr_cagg_refreshes_distinct_tasks_unq ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + continuous_aggregate, + window_start, + window_end +) WHERE worker_pid IS NULL AND finished IS NULL; + +CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_find_first_work_item_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + priority, + scheduled +) WHERE worker_pid IS NULL; + +CREATE INDEX IF NOT EXISTS incr_cagg_refreshes_active_workers_idx ON _timescaledb_additional.incremental_continuous_aggregate_refreshes( + worker_pid +) WHERE worker_pid IS NOT NULL; + +-- Calculate the time bucket using the continuous aggregate bucket function +-- configuration +CREATE OR REPLACE FUNCTION _timescaledb_additional.cagg_time_bucket(INTEGER, TIMESTAMPTZ) +RETURNS TIMESTAMPTZ AS +$$ +DECLARE + params TEXT[]; + stmt TEXT; + r RECORD; + result TIMESTAMPTZ; +BEGIN + SELECT * INTO r FROM _timescaledb_catalog.continuous_aggs_bucket_function WHERE mat_hypertable_id = $1; + + IF NOT FOUND THEN + RAISE EXCEPTION 'Continuous Aggregate % not found', $1; + END IF; + + params := array_append(params, format('%I => %L::timestamptz', 'ts', $2)); + + IF r.bucket_width IS NOT NULL THEN + params := array_append(params, format('%I => %L::interval', 'bucket_width', r.bucket_width)); + END IF; + + IF r.bucket_origin IS NOT NULL THEN + params := array_append(params, format('%I => %L::timestamptz', 'origin', r.bucket_origin)); + END IF; + + IF r.bucket_offset IS NOT NULL THEN + params := array_append(params, format('%I => %L::interval', 'offset', r.bucket_offset)); + END IF; + + IF r.bucket_timezone IS NOT NULL THEN + params := array_append(params, format('%I => %L::text', 'timezone', r.bucket_timezone)); + END IF; + + stmt := format('SELECT time_bucket(%s)', array_to_string(params, ', ')); + RAISE DEBUG '%', stmt; + + EXECUTE stmt + INTO result; + + RETURN result; +END; +$$ +LANGUAGE plpgsql STABLE; + +-- Discover continuous aggregates built on top of tiered hypertables and +-- schedule their refresh +-- CREATE OR REPLACE PROCEDURE _timescaledb_additional.schedule_osm_cagg_refresh( +-- name_mask TEXT DEFAULT '%', +-- nbuckets INTEGER DEFAULT 5, +-- dry_run BOOLEAN DEFAULT true, +-- priority INTEGER DEFAULT 100 +-- ) AS +-- $$ +-- BEGIN +-- -- Find caggs built on top of tiered hypertables +-- WITH ranges AS ( +-- SELECT +-- cagg.mat_hypertable_id, +-- ht.schema_name, +-- ht.table_name, +-- ( +-- SELECT column_type +-- FROM _timescaledb_catalog.dimension d +-- WHERE d.hypertable_id = ht.id +-- ORDER BY d.id ASC LIMIT 1 +-- ) AS dim_type, +-- user_view_schema, +-- user_view_name, +-- bf.bucket_width::interval AS bucket_width, +-- _timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.start) AS global_start, +-- _timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.end) + (bf.bucket_width::interval + '1 millisecond'::interval) AS global_end +-- FROM _timescaledb_catalog.continuous_agg cagg +-- JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id) +-- JOIN _timescaledb_catalog.hypertable ht ON (ht.id = cagg.raw_hypertable_id) +-- JOIN _osm_catalog.table_map tm ON (tm.hypertable_name = ht.table_name AND tm.hypertable_schema = ht.schema_name) +-- JOIN ( +-- -- the time window of tiered data +-- SELECT +-- osm_table_id, +-- _osm_internal.dimension_pg_usec_to_timestamp(min(range_start)) as start, +-- _osm_internal.dimension_pg_usec_to_timestamp(max(range_end)) as end +-- FROM _osm_catalog.chunk_map +-- JOIN _osm_catalog.chunk_object_map USING (chunk_id) +-- GROUP BY osm_table_id +-- ) AS range USING (osm_table_id) +-- WHERE user_view_name LIKE name_mask +-- ) +-- INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log +-- SELECT +-- ranges.mat_hypertable_id, +-- (extract(epoch from global_start) * 1000000)::bigint AS invalidation_start, +-- (extract(epoch from global_end) * 1000000)::bigint AS invalidation_end +-- FROM +-- ranges +-- WHERE +-- dim_type = 'TIMESTAMPTZ'::REGTYPE; + +-- WITH ranges AS ( +-- SELECT +-- cagg.mat_hypertable_id, +-- ht.schema_name, +-- ht.table_name, +-- ( +-- SELECT column_type AS dim_type +-- FROM _timescaledb_catalog.dimension d +-- WHERE d.hypertable_id = ht.id +-- ORDER BY d.id ASC LIMIT 1 +-- ) AS dim_type, +-- user_view_schema, +-- user_view_name, +-- bf.bucket_width::interval AS bucket_width, +-- _timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.start) AS global_start, +-- _timescaledb_additional.cagg_time_bucket(mat_hypertable_id, range.end) + (bf.bucket_width::interval + '1 millisecond'::interval) AS global_end +-- FROM _timescaledb_catalog.continuous_agg cagg +-- JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id) +-- JOIN _timescaledb_catalog.hypertable ht ON (ht.id = cagg.raw_hypertable_id) +-- JOIN _osm_catalog.table_map tm ON (tm.hypertable_name = ht.table_name AND tm.hypertable_schema = ht.schema_name) +-- JOIN ( +-- -- the time window of tiered data +-- SELECT +-- osm_table_id, +-- _osm_internal.dimension_pg_usec_to_timestamp(min(range_start)) as start, +-- _osm_internal.dimension_pg_usec_to_timestamp(max(range_end)) as end +-- FROM _osm_catalog.chunk_map +-- JOIN _osm_catalog.chunk_object_map USING (chunk_id) +-- GROUP BY osm_table_id +-- ) AS range USING (osm_table_id) +-- WHERE user_view_name LIKE name_mask +-- ) +-- -- schedule the refresh for given interval +-- INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes +-- (continuous_aggregate, window_start, window_end, priority) +-- SELECT +-- format('%I.%I', ranges.user_view_schema, ranges.user_view_name)::regclass, +-- start AS window_start, +-- start + (ranges.bucket_width * nbuckets) AS window_end, +-- priority +-- FROM +-- ranges +-- -- Split ranges with 5 times the bucket width +-- JOIN LATERAL generate_series(ranges.global_start, ranges.global_end, (bucket_width * nbuckets)) AS start ON true +-- -- JOIN timescaledb_osm.tiered_chunks ch +-- -- ON (ranges.schema_name, ranges.table_name) = (ch.hypertable_schema, ch.hypertable_name) +-- -- AND tstzrange(start, (start + (ranges.bucket_width * nbuckets)), '[)') && tstzrange(ch.range_start, ch.range_end, '[)') + +-- WHERE +-- dim_type = 'TIMESTAMPTZ'::REGTYPE; + +-- END +-- $$ LANGUAGE plpgsql; + +CREATE OR REPLACE PROCEDURE _timescaledb_additional.schedule_osm_cagg_refresh( + name_mask TEXT DEFAULT '%', + nbuckets INTEGER DEFAULT 5, + dry_run BOOLEAN DEFAULT true, + priority INTEGER DEFAULT 100 +) AS +$$ +BEGIN + + -- WITH RECURSIVE caggs AS ( + -- SELECT mat_hypertable_id, parent_mat_hypertable_id, user_view_name + -- FROM _timescaledb_catalog.continuous_agg + -- WHERE user_view_name = 'metrics_by_week' + -- UNION ALL + -- SELECT continuous_agg.mat_hypertable_id, continuous_agg.parent_mat_hypertable_id, continuous_agg.user_view_name + -- FROM _timescaledb_catalog.continuous_agg + -- JOIN caggs ON caggs.parent_mat_hypertable_id = continuous_agg.mat_hypertable_id + -- ) + -- SELECT * FROM caggs ORDER BY mat_hypertable_id; + + -- WITH RECURSIVE caggs AS ( + -- SELECT + -- cagg.mat_hypertable_id, + -- cagg.parent_mat_hypertable_id, + -- bf.bucket_width::interval AS bucket_width, + -- tch.range_start, + -- tch.range_end + -- FROM + -- timescaledb_osm.tiered_chunks tch + -- JOIN _timescaledb_catalog.hypertable ht ON (tch.hypertable_name = ht.table_name AND tch.hypertable_schema = ht.schema_name) + -- JOIN _timescaledb_catalog.continuous_agg cagg ON (cagg.raw_hypertable_id = ht.id) + -- JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id) + -- WHERE cagg.parent_mat_hypertable_id IS NULL + -- UNION ALL + -- SELECT + -- cagg.mat_hypertable_id, + -- cagg.parent_mat_hypertable_id, + -- bf.bucket_width::interval AS bucket_width, + -- caggs.range_start, + -- caggs.range_end + -- FROM + -- caggs + -- JOIN _timescaledb_catalog.continuous_agg cagg ON (cagg.parent_mat_hypertable_id = caggs.mat_hypertable_id) + -- JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf ON (bf.mat_hypertable_id = cagg.mat_hypertable_id) + -- ) + -- SELECT * FROM caggs; + + + -- Find caggs built on top of tiered hypertables + INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + SELECT + mat_hypertable_id, + ((extract(epoch from _timescaledb_additional.cagg_time_bucket(cagg.mat_hypertable_id, MIN(range_start)))) * 1000000)::bigint AS invalidation_start, + ((extract(epoch from _timescaledb_additional.cagg_time_bucket(cagg.mat_hypertable_id, MAX(range_end)) + (bf.bucket_width::interval + interval '1 millisecond')))* 1000000)::bigint AS invalidation_end + FROM + timescaledb_osm.tiered_chunks tch + JOIN _timescaledb_catalog.hypertable ht ON (tch.hypertable_name = ht.table_name AND tch.hypertable_schema = ht.schema_name) + JOIN _timescaledb_catalog.continuous_agg cagg ON (cagg.raw_hypertable_id = ht.id) + JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id) + WHERE + user_view_name LIKE name_mask + GROUP BY + mat_hypertable_id, bf.bucket_width; + + -- schedule the refresh for given interval + INSERT INTO _timescaledb_additional.incremental_continuous_aggregate_refreshes + (continuous_aggregate, window_start, window_end, priority) + SELECT + format('%I.%I', cagg.user_view_schema, cagg.user_view_name)::regclass, + _timescaledb_additional.cagg_time_bucket(cagg.mat_hypertable_id, range_start) AS window_start, + _timescaledb_additional.cagg_time_bucket(cagg.mat_hypertable_id, range_end) + (bf.bucket_width::interval + interval '1 millisecond') AS window_end, + priority + FROM + timescaledb_osm.tiered_chunks tch + JOIN _timescaledb_catalog.hypertable ht ON (tch.hypertable_name = ht.table_name AND tch.hypertable_schema = ht.schema_name) + JOIN _timescaledb_catalog.continuous_agg cagg ON (cagg.raw_hypertable_id = ht.id) + JOIN _timescaledb_catalog.continuous_aggs_bucket_function bf USING (mat_hypertable_id) + WHERE + user_view_name LIKE name_mask + ORDER BY + range_start; +END +$$ LANGUAGE plpgsql;