Skip to content

Commit

Permalink
Add support for router INSERT .. SELECT commands
Browse files Browse the repository at this point in the history
Tradionally our planner works in the following order:
   router - > pushdown -> repartition -> pull to coordinator

However, for INSERT .. SELECT commands, we did not support "router".

In practice, that is not a big issue, because pushdown planning
can handle router case as well.

However, with PG 16, certain outer joins are converted to
JOIN without any conditions (e.g., JOIN .. ON (true)) and
the filters are pushed down to the tables.

When the filters are pushed down to the tables, router planner
can detect. However, pushdown planner relies on JOIN conditions.

An example query:
```
INSERT INTO agg_events (user_id)
        SELECT raw_events_first.user_id
        FROM raw_events_first LEFT JOIN raw_events_second
        	ON raw_events_first.user_id = raw_events_second.user_id
        WHERE raw_events_first.user_id = 10;
```

As a side effect of this change, now we can also relax
certain limitation that "pushdown" planner emposes, but not
"router". So, with this PR, we also allow those.
  • Loading branch information
onderkalaci committed Jul 28, 2023
1 parent 846cbc3 commit 74ced12
Show file tree
Hide file tree
Showing 24 changed files with 486 additions and 397 deletions.
68 changes: 57 additions & 11 deletions src/backend/distributed/planner/insert_select_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ static DistributedPlan * CreateInsertSelectPlanInternal(uint64 planId,
static DistributedPlan * CreateDistributedInsertSelectPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool InsertSelectRouter(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
CitusTableCacheEntry *targetTableCacheEntry,
ShardInterval *shardInterval,
Expand All @@ -75,6 +77,7 @@ static DeferredErrorMessage * DistributedInsertSelectSupported(Query *queryTree,
RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte,
bool allReferenceTables,
bool routerSelect,
PlannerRestrictionContext *
plannerRestrictionContext);
static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
Expand Down Expand Up @@ -282,6 +285,8 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool routerSelect =
InsertSelectRouter(copyObject(originalQuery), plannerRestrictionContext);

distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);

Expand All @@ -293,13 +298,27 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
insertRte,
subqueryRte,
allReferenceTables,
routerSelect,
plannerRestrictionContext);
if (distributedPlan->planningError)
{
return distributedPlan;
}


/*
* if the query goes to a single node ("router" in Citus' parlance),
* we don't need to go through AllDistributionKeysInQueryAreEqual checks.
*
* For PG16+, this is required as some of the outer JOINs are converted to
* "ON(true)" and filters are pushed down to the table scans. As
* AllDistributionKeysInQueryAreEqual rely on JOIN filters, it will fail to
* detect the router case. However, we can still detect it by checking if
* the query is a router query as the router query checks the filters on
* the tables.
*/
bool allDistributionKeysInQueryAreEqual =
routerSelect ||
AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);

/*
Expand Down Expand Up @@ -361,6 +380,23 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
}


/*
* InsertSelectRouter is a helper function that returns true of the SELECT
* part of the INSERT .. SELECT query is a router query.
*/
static bool
InsertSelectRouter(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext)
{
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
DistributedPlan *distributedPlan = CreateRouterPlan(subqueryRte->subquery,
subqueryRte->subquery,
plannerRestrictionContext);

return distributedPlan->planningError == NULL;
}


/*
* CreateInsertSelectIntoLocalTablePlan creates the plan for INSERT .. SELECT queries
* where the selected table is distributed and the inserted table is not.
Expand Down Expand Up @@ -615,6 +651,7 @@ CreateTargetListForCombineQuery(List *targetList)
static DeferredErrorMessage *
DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
RangeTblEntry *subqueryRte, bool allReferenceTables,
bool routerSelect,
PlannerRestrictionContext *plannerRestrictionContext)
{
Oid selectPartitionColumnTableId = InvalidOid;
Expand Down Expand Up @@ -689,19 +726,28 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
NULL, NULL);
}

/* first apply toplevel pushdown checks to SELECT query */
DeferredErrorMessage *error = DeferErrorIfUnsupportedSubqueryPushdown(subquery,
plannerRestrictionContext);
if (error)
{
return error;
}
DeferredErrorMessage *error = NULL;

/* then apply subquery pushdown checks to SELECT query */
error = DeferErrorIfCannotPushdownSubquery(subquery, false);
if (error)
/*
* We can skip SQL support related checks for router queries as
* they are safe to route with any SQL.
*/
if (!routerSelect)
{
return error;
/* first apply toplevel pushdown checks to SELECT query */
error =
DeferErrorIfUnsupportedSubqueryPushdown(subquery, plannerRestrictionContext);
if (error)
{
return error;
}

/* then apply subquery pushdown checks to SELECT query */
error = DeferErrorIfCannotPushdownSubquery(subquery, false);
if (error)
{
return error;
}
}

if (IsCitusTableType(targetRelationId, CITUS_LOCAL_TABLE))
Expand Down
8 changes: 3 additions & 5 deletions src/test/regress/expected/citus_local_tables_queries.out
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,9 @@ INSERT INTO citus_local_table
SELECT * from citus_local_table_2;
NOTICE: executing the command locally: INSERT INTO citus_local_table_queries.citus_local_table_1509001 AS citus_table_alias (a, b) SELECT citus_local_table_2.a, citus_local_table_2.b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2
INSERT INTO citus_local_table
SELECT * from citus_local_table_2
ORDER BY 1,2
LIMIT 10;
NOTICE: executing the command locally: SELECT a, b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2 ORDER BY a, b LIMIT 10
NOTICE: executing the copy locally for shard xxxxx
SELECT sum(a), b from citus_local_table_2
GROUP BY b;
NOTICE: executing the command locally: INSERT INTO citus_local_table_queries.citus_local_table_1509001 AS citus_table_alias (a, b) SELECT sum(citus_local_table_2.a) AS sum, citus_local_table_2.b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2 GROUP BY citus_local_table_2.b
INSERT INTO citus_local_table
SELECT * from postgres_local_table;
NOTICE: executing the copy locally for shard xxxxx
Expand Down
8 changes: 3 additions & 5 deletions src/test/regress/expected/citus_local_tables_queries_0.out
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,9 @@ INSERT INTO citus_local_table
SELECT * from citus_local_table_2;
NOTICE: executing the command locally: INSERT INTO citus_local_table_queries.citus_local_table_1509001 AS citus_table_alias (a, b) SELECT a, b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2
INSERT INTO citus_local_table
SELECT * from citus_local_table_2
ORDER BY 1,2
LIMIT 10;
NOTICE: executing the command locally: SELECT a, b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2 ORDER BY a, b LIMIT 10
NOTICE: executing the copy locally for shard xxxxx
SELECT sum(a), b from citus_local_table_2
GROUP BY b;
NOTICE: executing the command locally: INSERT INTO citus_local_table_queries.citus_local_table_1509001 AS citus_table_alias (a, b) SELECT sum(a) AS sum, b FROM citus_local_table_queries.citus_local_table_2_1509002 citus_local_table_2 GROUP BY b
INSERT INTO citus_local_table
SELECT * from postgres_local_table;
NOTICE: executing the copy locally for shard xxxxx
Expand Down
Loading

0 comments on commit 74ced12

Please sign in to comment.