diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e70dc11027f..f6e4db7ee3c 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -299,10 +299,10 @@ RETURNING *; -- INSERT..SELECT via coordinator consists of two steps, select + COPY -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally -INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; -NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +INSERT INTO distributed_table SELECT sum(key), value FROM distributed_table WHERE key = 1 GROUP BY value ON CONFLICT DO NOTHING; +NOTICE: executing the command locally: SELECT int4(sum(key)) AS key, value FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) GROUP BY value NOTICE: executing the copy locally for colocated file with shard xxxxx -NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT intermediate_result.key, intermediate_result.value, intermediate_result.age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value) SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; @@ -752,46 +752,46 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); -NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 -NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 -NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 -NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_0']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_0']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_0']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_0']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_0']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_0']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_0']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_0']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_1']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_1']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_1']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_1']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_1']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_1']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_1']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_1']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_2']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_2']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_2']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_2']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_2']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_2']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_2']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_2']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_3']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_3']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_3']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_3']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_3']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_3']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_3']::text[],'localhost',57637) bytes -NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_3']::text[],'localhost',57638) bytes -NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_0,repartition_65_2_0,repartition_65_3_0,repartition_65_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_0,repartition_66_2_0,repartition_66_3_0,repartition_66_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_1,repartition_65_2_1,repartition_65_3_1,repartition_65_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_1,repartition_66_2_1,repartition_66_3_1,repartition_66_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_2,repartition_65_2_2,repartition_65_3_2,repartition_65_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_2,repartition_66_2_2,repartition_66_3_2,repartition_66_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_3,repartition_65_2_3,repartition_65_3_3,repartition_65_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_3,repartition_66_2_3,repartition_66_3_3,repartition_66_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT partition_index, 'repartition_70_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_70_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_70_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_70_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_71_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_71_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_71_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_71_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_70_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_71_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_70_1_0,repartition_70_2_0,repartition_70_3_0,repartition_70_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_71_1_0,repartition_71_2_0,repartition_71_3_0,repartition_71_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_70_1_1,repartition_70_2_1,repartition_70_3_1,repartition_70_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_71_1_1,repartition_71_2_1,repartition_71_3_1,repartition_71_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_70_1_2,repartition_70_2_2,repartition_70_3_2,repartition_70_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_71_1_2,repartition_71_2_2,repartition_71_3_2,repartition_71_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_70_1_3,repartition_70_2_3,repartition_70_3_3,repartition_70_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_71_1_3,repartition_71_2_3,repartition_71_3_3,repartition_71_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true count --------------------------------------------------------------------- 2 diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 4bf5aeec4bb..8acbc2978d1 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -211,7 +211,7 @@ RETURNING *; -- INSERT..SELECT via coordinator consists of two steps, select + COPY -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally -INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; +INSERT INTO distributed_table SELECT sum(key), value FROM distributed_table WHERE key = 1 GROUP BY value ON CONFLICT DO NOTHING; INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution