mirror of https://github.com/citusdata/citus.git
Get prepared for subquery pushdown using INSERT ... SELECT logic
With this commit, INSERT ... SELECT query planning does not need partition key on the top level target list. Instead, we first look for the top level JOIN query. If that fails, we search for the RTE_RELATION (or RTE_RELATIONs in case of set operations). Then, add the qual there. The idea here is that add the qual to the level that is the topmost in the query tree among the other options. Then, expect postgres planner to distributed that restriction to all other tables as well. This commit is not a hard requirement for INSERT ... SELECT query planning given that the planner would going to be error out on queries where top level query does not have partition column.pull/1254/head
parent
681da71251
commit
1c321b7611
|
@ -135,6 +135,9 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
||||||
subqueryRte,
|
subqueryRte,
|
||||||
Oid *
|
Oid *
|
||||||
selectPartitionColumnTableId);
|
selectPartitionColumnTableId);
|
||||||
|
static Query * FindTopLevelJoinQuery(Query *query);
|
||||||
|
static void AddUninstantiatedEqualityQualToRelation(Query *query);
|
||||||
|
static Var * GetFirstTargetListVar(List *targetList);
|
||||||
static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar);
|
static void AddUninstantiatedEqualityQual(Query *query, Var *targetPartitionColumnVar);
|
||||||
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree);
|
||||||
|
|
||||||
|
@ -1092,9 +1095,7 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery)
|
||||||
{
|
{
|
||||||
Query *subquery = NULL;
|
Query *subquery = NULL;
|
||||||
RangeTblEntry *subqueryEntry = NULL;
|
RangeTblEntry *subqueryEntry = NULL;
|
||||||
ListCell *targetEntryCell = NULL;
|
Query *topLevelJoinQuery = NULL;
|
||||||
Var *targetPartitionColumnVar = NULL;
|
|
||||||
List *targetList = NULL;
|
|
||||||
|
|
||||||
Assert(InsertSelectQuery(originalQuery));
|
Assert(InsertSelectQuery(originalQuery));
|
||||||
|
|
||||||
|
@ -1113,31 +1114,180 @@ AddUninstantiatedPartitionRestriction(Query *originalQuery)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* iterate through the target list and find the partition column on the target list */
|
|
||||||
targetList = subquery->targetList;
|
|
||||||
foreach(targetEntryCell, targetList)
|
|
||||||
{
|
|
||||||
TargetEntry *targetEntry = lfirst(targetEntryCell);
|
|
||||||
|
|
||||||
if (IsPartitionColumn(targetEntry->expr, subquery) &&
|
topLevelJoinQuery = FindTopLevelJoinQuery(subquery);
|
||||||
IsA(targetEntry->expr, Var))
|
if (topLevelJoinQuery != NULL)
|
||||||
|
{
|
||||||
|
FromExpr *joinTree = topLevelJoinQuery->jointree;
|
||||||
|
List *whereClauseList = QualifierList(joinTree);
|
||||||
|
List *joinClauseList = JoinClauseList(whereClauseList);
|
||||||
|
|
||||||
|
OpExpr *operatorExpression = (OpExpr *) linitial(joinClauseList);
|
||||||
|
List *argumentList = operatorExpression->args;
|
||||||
|
|
||||||
|
/* get left and right side of the expression */
|
||||||
|
Node *leftArgument = (Node *) linitial(argumentList);
|
||||||
|
|
||||||
|
List *leftColumnList = pull_var_clause_default(leftArgument);
|
||||||
|
Var *leftColumn = (Var *) linitial(leftColumnList);
|
||||||
|
|
||||||
|
AddUninstantiatedEqualityQual(topLevelJoinQuery, leftColumn);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
AddUninstantiatedEqualityQualToRelation(subquery);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindTopLevelJoinQuery recursively traverses the query to find the
|
||||||
|
* top level JOIN query that exists in the query. If found, the query that
|
||||||
|
* includes the JOIN is returned. Else, NULL returned.
|
||||||
|
*/
|
||||||
|
static Query *
|
||||||
|
FindTopLevelJoinQuery(Query *query)
|
||||||
|
{
|
||||||
|
FromExpr *joinTree = NULL;
|
||||||
|
List *whereClauseList = NULL;
|
||||||
|
List *joinClauseList = NULL;
|
||||||
|
RangeTblEntry *rte = NULL;
|
||||||
|
|
||||||
|
joinTree = query->jointree;
|
||||||
|
whereClauseList = QualifierList(joinTree);
|
||||||
|
joinClauseList = JoinClauseList(whereClauseList);
|
||||||
|
|
||||||
|
if (list_length(joinClauseList) > 0)
|
||||||
|
{
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
|
||||||
|
rte = list_nth(query->rtable, 0);
|
||||||
|
if (rte->rtekind == RTE_RELATION)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rte->rtekind == RTE_SUBQUERY)
|
||||||
|
{
|
||||||
|
return FindTopLevelJoinQuery(rte->subquery);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AddUninstantiatedEqualityQualToRelation iterates over query's
|
||||||
|
* range table list and finds and adds the partition restriction
|
||||||
|
* to the range table entry that is either (i) RTE_RELATION or
|
||||||
|
* (ii) Recurse in to the two arguments of a set operation until
|
||||||
|
* RTE_RELATION is found.
|
||||||
|
*
|
||||||
|
* Once RTE_RELATION is found, add the qual using
|
||||||
|
* AddUninstantiatedEqualityQual().
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AddUninstantiatedEqualityQualToRelation(Query *query)
|
||||||
|
{
|
||||||
|
List *rangeTableList = query->rtable;
|
||||||
|
ListCell *rteCell = NULL;
|
||||||
|
foreach(rteCell, rangeTableList)
|
||||||
|
{
|
||||||
|
RangeTblEntry *rte = lfirst(rteCell);
|
||||||
|
|
||||||
|
if (rte->rtekind == RTE_RELATION)
|
||||||
{
|
{
|
||||||
targetPartitionColumnVar = (Var *) targetEntry->expr;
|
if (IsDistributedTable(rte->relid) && PartitionColumn(rte->relid, 1) != NULL)
|
||||||
|
{
|
||||||
|
Var *targetVar = GetFirstTargetListVar(query->targetList);
|
||||||
|
if (targetVar)
|
||||||
|
{
|
||||||
|
AddUninstantiatedEqualityQual(query, PartitionColumn(rte->relid,
|
||||||
|
targetVar->varno));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (rte->rtekind == RTE_SUBQUERY)
|
||||||
|
{
|
||||||
|
Query *subquery = rte->subquery;
|
||||||
|
SetOperationStmt *unionStatement = NULL;
|
||||||
|
Query *leftQuery = NULL;
|
||||||
|
Query *rightQuery = NULL;
|
||||||
|
RangeTblRef *leftRangeTableReference = NULL;
|
||||||
|
RangeTblRef *rightRangeTableReference = NULL;
|
||||||
|
List *unionQueryRangeTableList = NULL;
|
||||||
|
int leftTableIndex = 0;
|
||||||
|
int rightTableIndex = 0;
|
||||||
|
RangeTblEntry *leftRangeTableEntry = NULL;
|
||||||
|
RangeTblEntry *rightRangeTableEntry = NULL;
|
||||||
|
|
||||||
|
/* if does not have any set operations, recurse into the subquery */
|
||||||
|
if (subquery->setOperations == NULL)
|
||||||
|
{
|
||||||
|
AddUninstantiatedEqualityQualToRelation(rte->subquery);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
unionStatement = (SetOperationStmt *) subquery->setOperations;
|
||||||
|
|
||||||
|
leftRangeTableReference = (RangeTblRef *) unionStatement->larg;
|
||||||
|
rightRangeTableReference = (RangeTblRef *) unionStatement->rarg;
|
||||||
|
unionQueryRangeTableList = subquery->rtable;
|
||||||
|
|
||||||
|
leftTableIndex = leftRangeTableReference->rtindex - 1;
|
||||||
|
rightTableIndex = rightRangeTableReference->rtindex - 1;
|
||||||
|
|
||||||
|
leftRangeTableEntry = (RangeTblEntry *) list_nth(
|
||||||
|
unionQueryRangeTableList,
|
||||||
|
leftTableIndex);
|
||||||
|
rightRangeTableEntry = (RangeTblEntry *) list_nth(
|
||||||
|
unionQueryRangeTableList,
|
||||||
|
rightTableIndex);
|
||||||
|
|
||||||
|
Assert(leftRangeTableEntry->rtekind == RTE_SUBQUERY);
|
||||||
|
Assert(rightRangeTableEntry->rtekind == RTE_SUBQUERY);
|
||||||
|
|
||||||
|
leftQuery = leftRangeTableEntry->subquery;
|
||||||
|
rightQuery = rightRangeTableEntry->subquery;
|
||||||
|
|
||||||
|
AddUninstantiatedEqualityQualToRelation(leftQuery);
|
||||||
|
AddUninstantiatedEqualityQualToRelation(rightQuery);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(DEBUG4, (errmsg("unexpected rte kind:%d", rte->rtekind)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetFirstTargetListVar iterates through the given target list entries and returns
|
||||||
|
* the first target list entry whose type is Var. Otherwise, the function returns NULL.
|
||||||
|
*/
|
||||||
|
static Var *
|
||||||
|
GetFirstTargetListVar(List *targetList)
|
||||||
|
{
|
||||||
|
Var *targetVar = NULL;
|
||||||
|
ListCell *targetListCell = NULL;
|
||||||
|
|
||||||
|
foreach(targetListCell, targetList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetListCell);
|
||||||
|
|
||||||
|
if (IsA(targetEntry->expr, Var))
|
||||||
|
{
|
||||||
|
targetVar = (Var *) targetEntry->expr;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
return targetVar;
|
||||||
* If we cannot find the bare partition column, no need to add the qual since
|
|
||||||
* we're already going to error out on the multi planner.
|
|
||||||
*/
|
|
||||||
if (!targetPartitionColumnVar)
|
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* finally add the equality qual of target column to subquery */
|
|
||||||
AddUninstantiatedEqualityQual(subquery, targetPartitionColumnVar);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1225,35 +1225,8 @@ FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||||
AND raw_events_first.value_1 = 12;
|
AND raw_events_first.value_1 = 12;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300008 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300000 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-2147483648'::integer) AND (hashint4(raw_events_first.user_id) <= '-1073741825'::integer)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300009 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300001 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= '-1073741824'::integer) AND (hashint4(raw_events_first.user_id) <= '-1'::integer)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300010 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300002 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 0) AND (hashint4(raw_events_first.user_id) <= 1073741823)))
|
|
||||||
DEBUG: predicate pruning for shardId 13300000
|
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300004
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: distributed statement: INSERT INTO public.agg_events_13300011 AS citus_table_alias (user_id) SELECT raw_events_first.user_id FROM public.raw_events_first_13300003 raw_events_first, public.raw_events_second_13300007 raw_events_second WHERE (((raw_events_second.user_id = raw_events_first.value_1) AND (raw_events_first.value_1 = 12)) AND ((hashint4(raw_events_first.user_id) >= 1073741824) AND (hashint4(raw_events_first.user_id) <= 2147483647)))
|
|
||||||
DEBUG: Plan is router executable
|
|
||||||
-- some unsupported LEFT/INNER JOINs
|
-- some unsupported LEFT/INNER JOINs
|
||||||
-- JOIN on one table with partition column other is not
|
-- JOIN on one table with partition column other is not
|
||||||
INSERT INTO agg_events (user_id)
|
INSERT INTO agg_events (user_id)
|
||||||
|
@ -1292,9 +1265,6 @@ SELECT
|
||||||
raw_events_first.user_id
|
raw_events_first.user_id
|
||||||
FROM
|
FROM
|
||||||
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
raw_events_first LEFT JOIN raw_events_second ON raw_events_first.value_1 = raw_events_second.value_1;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
-- same as the above with INNER JOIN
|
-- same as the above with INNER JOIN
|
||||||
|
@ -1383,9 +1353,6 @@ SELECT raw_events_first.user_id
|
||||||
FROM raw_events_first,
|
FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1;
|
WHERE raw_events_second.user_id = raw_events_first.value_1;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
-- the following is again a very tricky query for Citus
|
-- the following is again a very tricky query for Citus
|
||||||
|
@ -1398,9 +1365,6 @@ FROM raw_events_first,
|
||||||
raw_events_second
|
raw_events_second
|
||||||
WHERE raw_events_second.user_id = raw_events_first.value_1
|
WHERE raw_events_second.user_id = raw_events_first.value_1
|
||||||
AND raw_events_first.value_2 = 12;
|
AND raw_events_first.value_2 = 12;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
-- foo is not joined on the partition key so the query is not
|
-- foo is not joined on the partition key so the query is not
|
||||||
|
@ -1630,12 +1594,6 @@ outer_most.id, max(outer_most.value)
|
||||||
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2
|
||||||
ON (f.id != f2.id)) as outer_most
|
ON (f.id != f2.id)) as outer_most
|
||||||
GROUP BY outer_most.id;
|
GROUP BY outer_most.id;
|
||||||
DEBUG: predicate pruning for shardId 13300001
|
|
||||||
DEBUG: predicate pruning for shardId 13300002
|
|
||||||
DEBUG: predicate pruning for shardId 13300003
|
|
||||||
DEBUG: predicate pruning for shardId 13300005
|
|
||||||
DEBUG: predicate pruning for shardId 13300006
|
|
||||||
DEBUG: predicate pruning for shardId 13300007
|
|
||||||
ERROR: cannot perform distributed planning for the given modification
|
ERROR: cannot perform distributed planning for the given modification
|
||||||
DETAIL: Select query cannot be pushed down to the worker.
|
DETAIL: Select query cannot be pushed down to the worker.
|
||||||
-- cannot pushdown since subquery returns another column than partition key
|
-- cannot pushdown since subquery returns another column than partition key
|
||||||
|
|
Loading…
Reference in New Issue