-
Notifications
You must be signed in to change notification settings - Fork 19
Incremental CAgg refresh using a work queue #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # Continous Aggregates, incremental parallel setup | ||
|
|
||
| This code is exploring the possibilities to do incremental CAgg refreshes in | ||
| parallel. The setup it uses is as following. | ||
|
|
||
| At a very high level these are the components: | ||
|
|
||
| - a table that acts as a work queue: | ||
| `_timescaledb_additional.incremental_continuous_aggregate_refreshes` | ||
| - one (or more) producer jobs that schedule CAgg refreshes | ||
| - one (or more) consumer jobs that process the jobs based on priority | ||
|
|
||
| The producer jobs can be scheduled very frequently, as no duplicate tasks will | ||
| be written to the work queue. | ||
|
|
||
| ## Producer | ||
|
|
||
| We have a producer procedure | ||
| (`schedule_refresh_continuous_aggregate_incremental`), which schedules tasks to | ||
| be picked up by the consumers. | ||
|
|
||
| The configuration for this call contains the following keys: | ||
|
|
||
| ```json | ||
| { | ||
| "end_offset": "similar to end-offset in the policy", | ||
| "start_offset": "similar to start-offset in the policy", | ||
| "continuous_aggregate": "regclass / fully qualified name of the user view for the CAgg", | ||
| "increment_size": "the size of each individual task, default: chunk_interval", | ||
| "priority": "priority for these tasks. Lower numbers get processed earlier, default: 100" | ||
| } | ||
| ``` | ||
|
|
||
| ### Producer Examples | ||
|
|
||
| #### Schedule multiple jobs for this cagg, with increments of 1 week | ||
|
|
||
| We schedule 2 sets | ||
|
|
||
| ```sql | ||
| CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( | ||
| job_id => null, | ||
| config => ' | ||
| { | ||
| "end_offset": "6 weeks", | ||
| "start_offset": "3 years", | ||
| "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", | ||
| "increment_size": "3 days" | ||
| }'); | ||
| ``` | ||
|
|
||
| with the most recent data having the highest priority: | ||
|
|
||
| ```sql | ||
| CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental( | ||
| job_id => null, | ||
| config => ' | ||
| { | ||
| "end_offset": "1 day", | ||
| "start_offset": "6 weeks", | ||
| "continuous_aggregate": "public.test_cagg_incr_refresh_cagg", | ||
| "increment_size": "1 week", | ||
| "priority": 1 | ||
| }'); | ||
| ``` | ||
|
|
||
| ## Consumer | ||
|
|
||
| For the consumer(s), we schedule as many jobs as we want to be able to run in | ||
| parallel. Likely, a reasonable maximum for these is not too high, for example, | ||
| 4-6. While we *can* do incremental CAgg refreshes, we cannot (as of december | ||
| 2024) schedule parallel refreshes for the same CAgg. This should therefore never | ||
| be higher than your number of CAggs. | ||
|
|
||
| These jobs will be consuming a connection all the time, as they are designed to | ||
| run all the time. | ||
|
|
||
| ```sql | ||
| SELECT | ||
| public.add_job( | ||
| proc => '_timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner'::regproc, | ||
| -- This isn't really needed, but this ensures the workers do not run forever, | ||
| -- but once they terminate, they will be restarted within 15 minutes or so. | ||
| schedule_interval => interval '15 minutes', | ||
| config => '{"max_runtime": "11 hours"}', | ||
| initial_start => now() | ||
| ) | ||
| FROM | ||
| generate_series(1, 4); | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
|
|
||
| DROP PROCEDURE IF EXISTS _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner; | ||
| CREATE PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner ( | ||
| job_id int, | ||
| config jsonb | ||
| ) LANGUAGE plpgsql AS $BODY$ | ||
| DECLARE | ||
| max_runtime interval := (config->>'max_runtime')::interval; | ||
| global_start_time timestamptz := pg_catalog.clock_timestamp(); | ||
| global_end_time timestamptz; | ||
| app_name text; | ||
| BEGIN | ||
| max_runtime := coalesce(max_runtime, interval '6 hours'); | ||
| global_end_time := global_start_time + max_runtime; | ||
|
|
||
| WHILE pg_catalog.clock_timestamp() < global_end_time LOOP | ||
| SET search_path TO 'pg_catalog,pg_temp'; | ||
| SET lock_timeout TO '3s'; | ||
| SET application_name TO 'cagg incremental refresh consumer - idle'; | ||
|
|
||
| -- Prevent a hot loop | ||
| PERFORM pg_catalog.pg_sleep(1.0); | ||
|
|
||
| -- By serializing the picking items from the queue, we prevent some race conditions. | ||
| LOCK TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes | ||
| IN ACCESS EXCLUSIVE MODE; | ||
|
|
||
| SET application_name TO 'cagg incremental refresh consumer - retrieving new task'; | ||
|
|
||
| DECLARE | ||
| p_id bigint; | ||
| p_cagg regclass; | ||
| p_window_start timestamptz; | ||
| p_window_end timestamptz; | ||
| p_start_time timestamptz; | ||
| p_end_time timestamptz; | ||
| p_mat_hypertable_id int; | ||
| p_job_id int; | ||
| BEGIN | ||
|
Comment on lines
+30
to
+39
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need a subtransaction here? |
||
| SELECT | ||
| q.id, | ||
| q.continuous_aggregate, | ||
| q.window_start, | ||
| q.window_end, | ||
| cagg.mat_hypertable_id, | ||
| coalesce(jobs.job_id, -1) | ||
| INTO | ||
| p_id, | ||
| p_cagg, | ||
| p_window_start, | ||
| p_window_end, | ||
| p_mat_hypertable_id, | ||
| p_job_id | ||
| FROM | ||
| _timescaledb_additional.incremental_continuous_aggregate_refreshes AS q | ||
| JOIN | ||
| pg_catalog.pg_class AS pc ON (q.continuous_aggregate=oid) | ||
| JOIN | ||
| pg_catalog.pg_namespace AS pn ON (relnamespace=pn.oid) | ||
| JOIN | ||
| _timescaledb_catalog.continuous_agg AS cagg ON (cagg.user_view_schema=nspname AND cagg.user_view_name=pc.relname) | ||
| JOIN | ||
| _timescaledb_catalog.hypertable AS h ON (cagg.mat_hypertable_id=h.id) | ||
| LEFT JOIN | ||
| timescaledb_information.jobs ON (proc_name='policy_refresh_continuous_aggregate' AND proc_schema='_timescaledb_functions' AND jobs.config->>'mat_hypertable_id' = cagg.mat_hypertable_id::text) | ||
| WHERE | ||
| q.worker_pid IS NULL AND q.finished IS NULL | ||
| -- We don't want multiple workers to be active on the same CAgg, | ||
| AND NOT EXISTS ( | ||
| SELECT | ||
| FROM | ||
| _timescaledb_additional.incremental_continuous_aggregate_refreshes AS a | ||
| JOIN | ||
| pg_catalog.pg_stat_activity ON (pid=worker_pid) | ||
| WHERE | ||
| a.finished IS NULL | ||
| -- If pids ever get recycled (container/machine restart), | ||
| -- this filter ensures we ignore the old ones | ||
| AND started > backend_start | ||
| AND q.continuous_aggregate = a.continuous_aggregate | ||
| ) | ||
| ORDER BY | ||
| q.priority ASC, | ||
| q.scheduled ASC | ||
| FOR NO KEY UPDATE OF q SKIP LOCKED | ||
| LIMIT | ||
| 1; | ||
|
|
||
| IF p_cagg IS NULL THEN | ||
| COMMIT; | ||
| -- There are no items in the queue that we can currently process. We therefore | ||
| -- sleep a while longer before attempting to try again. | ||
| IF global_end_time - interval '30 seconds' < now () THEN | ||
| EXIT; | ||
| ELSE | ||
| SET application_name TO 'cagg incremental refresh consumer - waiting for next task'; | ||
| PERFORM pg_catalog.pg_sleep(0.1); | ||
| CONTINUE; | ||
|
Comment on lines
+95
to
+98
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fabriziomello we can also decide to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this is something that definitely we need to thing about... then our scheduler can launch again the worker |
||
| END IF; | ||
| END IF; | ||
|
|
||
| UPDATE | ||
| _timescaledb_additional.incremental_continuous_aggregate_refreshes | ||
| SET | ||
| worker_pid = pg_backend_pid(), | ||
| started = clock_timestamp() | ||
| WHERE | ||
| id = p_id; | ||
|
|
||
| -- Inform others of what we are doing. | ||
| app_name := ' refresh ' || p_window_start::date; | ||
| IF p_window_end::date != p_window_start::date THEN | ||
| app_name := app_name || ' ' || p_window_end::date; | ||
| ELSE | ||
| app_name := app_name || to_char(p_window_start, 'THH24:MI'); | ||
| END IF; | ||
| IF length(app_name) + length(p_cagg::text) > 63 THEN | ||
| app_name := '...' || right(p_cagg::text, 60 - length(app_name)) || app_name; | ||
| ELSE | ||
| app_name := p_cagg::text || app_name; | ||
| END IF; | ||
| PERFORM pg_catalog.set_config( | ||
| 'application_name', | ||
| app_name, | ||
| false | ||
| ); | ||
|
|
||
| RAISE NOTICE | ||
| '% - Processing %, (% - %)', | ||
| pg_catalog.to_char(pg_catalog.clock_timestamp(), 'YYYY-MM-DD HH24:MI:SS.FF3OF'), | ||
| p_cagg, | ||
| p_window_start, | ||
| p_window_end; | ||
|
|
||
| -- We need to ensure that all other workers now know we are working on this | ||
| -- task. We therefore need to commit once now. This also releases our | ||
| -- access exclusive lock on the queue table. | ||
| COMMIT; | ||
|
|
||
| -- We take out a row-level-lock to signal to concurrent workers that *we* | ||
| -- are working on it. By taking this type of lock, we can clean up | ||
| -- this table from different tasks: They can update/delete these rows | ||
| -- if no active worker is working on them, and no lock is established. | ||
| PERFORM | ||
| FROM | ||
| _timescaledb_additional.incremental_continuous_aggregate_refreshes | ||
| WHERE | ||
| id = p_id | ||
| FOR NO KEY UPDATE; | ||
|
|
||
| CALL _timescaledb_functions.policy_refresh_continuous_aggregate( | ||
| -1, | ||
| config => jsonb_build_object( | ||
| 'end_offset', (clock_timestamp() - p_window_end)::interval(0), | ||
| 'start_offset', (clock_timestamp() - p_window_start)::interval(0), | ||
| 'mat_hypertable_id', p_mat_hypertable_id | ||
| ) | ||
| ); | ||
|
|
||
| UPDATE | ||
| _timescaledb_additional.incremental_continuous_aggregate_refreshes | ||
| SET | ||
| finished = clock_timestamp() | ||
| WHERE | ||
| id = p_id; | ||
| COMMIT; | ||
|
|
||
| SET application_name TO 'cagg incremental refresh consumer - idle'; | ||
| END; | ||
| END LOOP; | ||
|
|
||
| RAISE NOTICE 'Shutting down worker, as we exceeded our maximum runtime (%)', max_runtime; | ||
| END; | ||
| $BODY$; | ||
|
|
||
| GRANT EXECUTE ON PROCEDURE _timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner TO pg_database_owner; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My be worthy of a comment, but the tx that takes this lock is always short lived.
It doesn't do heavy lifting, it populates
worker_pidand commits.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this?? We're using FOR UPDATE when picking an item from the queue. Asking cause it makes FOR UPDATE SKIP LOCKED useless no?