Add top level union support

This commit adds support for UNION/UNION ALL subqueries that are
in the following form:

     .... (Q1 UNION Q2 UNION ...) as union_query ....

In other words, Citus supports allow top level
unions being wrapped into aggregations queries
and/or simple projection queries that only selects
some fields from the lower level queries.
pull/1323/head
Onder Kalaci 2017-03-29 15:03:54 +03:00
parent 51f80b47e4
commit 195a0fa270
10 changed files with 1795 additions and 44 deletions

View File

@ -151,7 +151,8 @@ static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column
/* Local functions forward declarations for subquery pushdown checks */
static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
PlannerRestrictionContext *
plannerRestrictionContext);
plannerRestrictionContext,
Query *originalQuery);
static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit);
static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit);
static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList);
@ -192,14 +193,15 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList);
* function finds the extended operator node, and splits this node into master
* and worker extended operator nodes.
*
* We also pass plannerRestrictionContext to the optimizer. The context
* is primarily used to decide whether the subquery is safe to pushdown.
* We also pass plannerRestrictionContext and originalQuery to the optimizer.
* These are primarily used to decide whether the subquery is safe to pushdown.
* If not, it helps to produce meaningful error messages for subquery
* pushdown planning.
*/
void
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
bool hasOrderByHllType = false;
List *selectNodeList = NIL;
@ -219,7 +221,8 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan,
ErrorIfContainsUnsupportedAggregate(logicalPlanNode);
/* check that we can pushdown subquery in the plan */
ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext);
ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext,
originalQuery);
/*
* If a select node exists, we use the idempower property to split the node
@ -2808,20 +2811,22 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column)
/*
* ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the
* logical plan and uses helper functions to check if we can push down subquery
* to worker nodes. These helper functions error out if we cannot push down the
* the subquery.
* logical plan using plannerRestrictionContext and the original query. It uses
* some helper functions to check if we can push down subquery to worker nodes.
* These helper functions error out if we cannot push down the subquery.
*/
static void
ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
PlannerRestrictionContext *plannerRestrictionContext)
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
Query *subquery = NULL;
List *extendedOpNodeList = NIL;
MultiTable *multiTable = NULL;
MultiExtendedOp *extendedOpNode = NULL;
bool outerQueryHasLimit = false;
bool restrictionEquivalenceForPartitionKeys = false;
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
/* check if logical plan includes a subquery */
List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode);
@ -2833,9 +2838,26 @@ ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode,
/* currently in the planner we only allow one subquery in from-clause*/
Assert(list_length(subqueryMultiTableList) == 1);
restrictionEquivalenceForPartitionKeys =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
if (!restrictionEquivalenceForPartitionKeys)
/*
* We're checking two things here:
* (i) If the query contains a top level union, ensure that all leaves
* return the partition key at the same position
* (ii) Else, check whether all relations joined on the partition key or not
*/
if (ContainsUnionSubquery(originalQuery))
{
if (!SafeToPushdownUnionSubquery(relationRestrictionContext))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown the subquery since all leaves of "
"the UNION does not include partition key at the "
"same position"),
errdetail("Each leaf query of the UNION should return "
"partition key at the same position on its "
"target list.")));
}
}
else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown the subquery since all relations are not "

View File

@ -2028,14 +2028,6 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
DistTableCacheEntry *targetCacheEntry = NULL;
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool restrictionEquivalenceForPartitionKeys PG_USED_FOR_ASSERTS_ONLY =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
/*
* If we're going to create tasks for subquery pushdown, all
* relations needs to be joined on the partition key.
*/
Assert(restrictionEquivalenceForPartitionKeys);
/* get list of all range tables in subquery tree */
ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList);
@ -2072,6 +2064,7 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
subqueryTask = SubqueryTaskCreate(subquery, targetShardInterval,
relationRestrictionContext, taskIdIndex);
/* add the task if it could be created */
if (subqueryTask != NULL)
{

View File

@ -311,7 +311,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query
{
/* Create and optimize logical plan */
MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query);
MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext);
MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext,
originalQuery);
/*
* This check is here to make it likely that all node types used in
@ -797,11 +798,10 @@ CopyPlanParamList(List *originalPlanParamList)
/*
* CreateAndPushPlannerRestrictionContext creates a new planner restriction context.
* Later, it creates a relation restriction context and a join restriction
* context, and sets those contexts in the planner restriction context. Finally,
* the planner restriction context is inserted to the beginning of the
* plannerRestrictionContextList and it is returned.
* CreateAndPushPlannerRestrictionContext creates a new relation restriction context
* and a new join context, inserts it to the beginning of the
* plannerRestrictionContextList. Finally, the planner restriction context is
* inserted to the beginning of the plannerRestrictionContextList and it is returned.
*/
static PlannerRestrictionContext *
CreateAndPushPlannerRestrictionContext(void)

View File

@ -89,6 +89,8 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery,
static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery);
static Task * RouterModifyTaskForShardInterval(Query *originalQuery,
ShardInterval *shardInterval,
RelationRestrictionContext *
@ -292,7 +294,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool restrictionEquivalenceForPartitionKeys = false;
bool safeToPushDownSubquery = false;
multiPlan->operation = originalQuery->commandType;
@ -308,8 +310,8 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
return multiPlan;
}
restrictionEquivalenceForPartitionKeys =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext,
originalQuery);
/*
* Plan select query for each shard in the target table. Do so by replacing the
@ -329,7 +331,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval,
relationRestrictionContext,
taskIdIndex,
restrictionEquivalenceForPartitionKeys);
safeToPushDownSubquery);
/* add the task if it could be created */
if (modifyTask != NULL)
@ -375,6 +377,36 @@ CreateInsertSelectRouterPlan(Query *originalQuery,
}
/*
* SafeToPushDownSubquery returns true if either
* (i) there exists join in the query and all relations joined on their
* partition keys
* (ii) there exists only union set operations and all relations has
* partition keys in the same ordinal position in the query
*/
static bool
SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery)
{
RelationRestrictionContext *relationRestrictionContext =
plannerRestrictionContext->relationRestrictionContext;
bool restrictionEquivalenceForPartitionKeys =
RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext);
if (restrictionEquivalenceForPartitionKeys)
{
return true;
}
if (ContainsUnionSubquery(originalQuery))
{
return SafeToPushdownUnionSubquery(relationRestrictionContext);
}
return false;
}
/*
* RouterModifyTaskForShardInterval creates a modify task by
* replacing the partitioning qual parameter added in multi_planner()
@ -390,7 +422,7 @@ static Task *
RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval,
RelationRestrictionContext *restrictionContext,
uint32 taskIdIndex,
bool allRelationsJoinedOnPartitionKey)
bool safeToPushdownSubquery)
{
Query *copiedQuery = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery);
@ -434,7 +466,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
List *extendedBaseRestrictInfo = originalBaseRestrictInfo;
Index rteIndex = restriction->index;
if (!allRelationsJoinedOnPartitionKey || allReferenceTables)
if (!safeToPushdownSubquery || allReferenceTables)
{
continue;
}

View File

@ -12,6 +12,7 @@
#include "distributed/multi_planner.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/relation_restriction_equivalence.h"
#include "nodes/nodeFuncs.h"
@ -61,6 +62,11 @@ typedef struct AttributeEquivalenceClassMember
} AttributeEquivalenceClassMember;
static Var * FindTranslatedVar(List *appendRelList, Oid relationOid,
Index relationRteIndex, Index *partitionKeyIndex);
static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *
restrictionContext);
static uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
static List * GenerateAttributeEquivalencesForRelationRestrictions(
RelationRestrictionContext *restrictionContext);
@ -113,6 +119,193 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
firstClass,
AttributeEquivalenceClass *
secondClass);
static Index RelationRestrictionPartitionKeyIndex(RelationRestriction *
relationRestriction);
/*
* SafeToPushdownUnionSubquery returns true if all the relations are returns
* partition keys in the same ordinal position.
*
* Note that the function expects (and asserts) the input query to be a top
* level union query defined by TopLevelUnionQuery().
*
* Lastly, the function fails to produce correct output if the target lists contains
* multiple partition keys on the target list such as the following:
*
* select count(*) from (
* select user_id, user_id from users_table
* union
* select 2, user_id from users_table) u;
*
* For the above query, although the second item in the target list make this query
* safe to push down, the function would fail to return true.
*/
bool
SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext)
{
Index unionQueryPartitionKeyIndex = 0;
AttributeEquivalenceClass *attributeEquivalance =
palloc0(sizeof(AttributeEquivalenceClass));
ListCell *relationRestrictionCell = NULL;
attributeEquivalance->equivalenceId = attributeEquivalenceId++;
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
List *appendRelList = relationPlannerRoot->append_rel_list;
Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
/*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres
* planner creates AppendRelInfos per each UNION ALL query that is pulled up.
* Then, postgres stores the related information in the append_rel_list on the
* plannerInfo struct.
*/
if (appendRelList != NULL)
{
varToBeAdded = FindTranslatedVar(appendRelList,
relationRestriction->relationId,
relationRestriction->index,
&partitionKeyIndex);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
{
return false;
}
}
else
{
partitionKeyIndex =
RelationRestrictionPartitionKeyIndex(relationRestriction);
/* union does not have partition key in the target list */
if (partitionKeyIndex == 0)
{
return false;
}
targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1);
if (!IsA(targetEntryToAdd->expr, Var))
{
return false;
}
varToBeAdded = (Var *) targetEntryToAdd->expr;
}
/*
* If the first relation doesn't have partition key on the target
* list of the query that the relation in, simply not allow to push down
* the query.
*/
if (partitionKeyIndex == InvalidAttrNumber)
{
return false;
}
/*
* We find the first relations partition key index in the target list. Later,
* we check whether all the relations have partition keys in the
* same position.
*/
if (unionQueryPartitionKeyIndex == InvalidAttrNumber)
{
unionQueryPartitionKeyIndex = partitionKeyIndex;
}
else if (unionQueryPartitionKeyIndex != partitionKeyIndex)
{
return false;
}
AddToAttributeEquivalenceClass(&attributeEquivalance, relationPlannerRoot,
varToBeAdded);
}
return EquivalenceListContainsRelationsEquality(list_make1(attributeEquivalance),
restrictionContext);
}
/*
* FindTranslatedVar iterates on the appendRelList and tries to find a translated
* child var identified by the relation id and the relation rte index.
*
* Note that postgres translates UNION ALL target list elements into translated_vars
* list on the corresponding AppendRelInfo struct. For details, see the related
* structs.
*
* The function returns NULL if it cannot find a translated var.
*/
static Var *
FindTranslatedVar(List *appendRelList, Oid relationOid, Index relationRteIndex,
Index *partitionKeyIndex)
{
ListCell *appendRelCell = NULL;
AppendRelInfo *targetAppendRelInfo = NULL;
ListCell *translatedVarCell = NULL;
AttrNumber childAttrNumber = 0;
Var *relationPartitionKey = NULL;
List *translaterVars = NULL;
*partitionKeyIndex = 0;
/* iterate on the queries that are part of UNION ALL subselects */
foreach(appendRelCell, appendRelList)
{
AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell);
/*
* We're only interested in the child rel that is equal to the
* relation we're investigating.
*/
if (appendRelInfo->child_relid == relationRteIndex)
{
targetAppendRelInfo = appendRelInfo;
break;
}
}
/* we couldn't find the necessary append rel info */
if (targetAppendRelInfo == NULL)
{
return NULL;
}
relationPartitionKey = PartitionKey(relationOid);
translaterVars = targetAppendRelInfo->translated_vars;
foreach(translatedVarCell, translaterVars)
{
Node *targetNode = (Node *) lfirst(translatedVarCell);
Var *targetVar = NULL;
childAttrNumber++;
if (!IsA(targetNode, Var))
{
continue;
}
targetVar = (Var *) lfirst(translatedVarCell);
if (targetVar->varno == relationRteIndex &&
targetVar->varattno == relationPartitionKey->varattno)
{
*partitionKeyIndex = childAttrNumber;
return targetVar;
}
}
return NULL;
}
/*
* RestrictionEquivalenceForPartitionKeys aims to deduce whether each of the RTE_RELATION
@ -140,14 +333,14 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass
* step, we try generate a common attribute equivalence class that holds as much as
* AttributeEquivalenceMembers whose attributes are a partition keys.
*
* AllRelationsJoinedOnPartitionKey uses both relation restrictions and join restrictions
* RestrictionEquivalenceForPartitionKeys uses both relation restrictions and join restrictions
* to find as much as information that Postgres planner provides to extensions. For the
* details of the usage, please see GenerateAttributeEquivalencesForRelationRestrictions()
* and GenerateAttributeEquivalencesForJoinRestrictions()
*/
bool
RestrictionEquivalenceForPartitionKeys(
PlannerRestrictionContext *plannerRestrictionContext)
RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
plannerRestrictionContext)
{
RelationRestrictionContext *restrictionContext =
plannerRestrictionContext->relationRestrictionContext;
@ -157,13 +350,9 @@ RestrictionEquivalenceForPartitionKeys(
List *relationRestrictionAttributeEquivalenceList = NIL;
List *joinRestrictionAttributeEquivalenceList = NIL;
List *allAttributeEquivalenceList = NIL;
AttributeEquivalenceClass *commonEquivalenceClass = NULL;
uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext);
uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList);
uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount;
ListCell *commonEqClassCell = NULL;
ListCell *relationRestrictionCell = NULL;
Relids commonRteIdentities = NULL;
/*
* If the query includes a single relation which is not a reference table,
@ -195,12 +384,33 @@ RestrictionEquivalenceForPartitionKeys(
list_concat(relationRestrictionAttributeEquivalenceList,
joinRestrictionAttributeEquivalenceList);
return EquivalenceListContainsRelationsEquality(allAttributeEquivalenceList,
restrictionContext);
}
/*
* EquivalenceListContainsRelationsEquality gets a list of attributed equivalence
* list and a relation restriction context. The function first generates a common
* equivalence class out of the attributeEquivalenceList. Later, the function checks
* whether all the relations exists in the common equivalence class.
*
*/
static bool
EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *restrictionContext)
{
AttributeEquivalenceClass *commonEquivalenceClass = NULL;
ListCell *commonEqClassCell = NULL;
ListCell *relationRestrictionCell = NULL;
Relids commonRteIdentities = NULL;
/*
* In general we're trying to expand existing the equivalence classes to find a
* common equivalence class. The main goal is to test whether this main class
* contains all partition keys of the existing relations.
*/
commonEquivalenceClass = GenerateCommonEquivalence(allAttributeEquivalenceList);
commonEquivalenceClass = GenerateCommonEquivalence(attributeEquivalenceList);
/* add the rte indexes of relations to a bitmap */
foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes)
@ -823,7 +1033,7 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
List *appendRelList = root->append_rel_list;
ListCell *appendRelCell = NULL;
/* iterate on the queries that are part of UNION ALL subselects */
/* iterate on the queries that are part of UNION ALL subqueries */
foreach(appendRelCell, appendRelList)
{
AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell);
@ -1022,7 +1232,6 @@ AttributeEquivalancesAreEqual(AttributeEquivalenceClass *firstAttributeEquivalan
return false;
}
foreach(firstAttributeEquivalanceCell, firstEquivalenceMemberList)
{
AttributeEquivalenceClassMember *firstEqMember =
@ -1052,3 +1261,108 @@ AttributeEquivalancesAreEqual(AttributeEquivalenceClass *firstAttributeEquivalan
return true;
}
/*
* ContainsUnionSubquery gets a queryTree and returns true if the query
* contains
* - a subquery with UNION set operation
* - no joins above the UNION set operation in the query tree
*
* Note that the function allows top level unions being wrapped into aggregations
* queries and/or simple projection queries that only selects some fields from
* the lower level queries.
*
* If there exists joins before the set operations, the function returns false.
* Similarly, if the query does not contain any union set operations, the
* function returns false.
*/
bool
ContainsUnionSubquery(Query *queryTree)
{
List *rangeTableList = queryTree->rtable;
Node *setOperations = queryTree->setOperations;
List *joinTreeTableIndexList = NIL;
Index subqueryRteIndex = 0;
uint32 joiningRangeTableCount = 0;
RangeTblEntry *rangeTableEntry = NULL;
Query *subqueryTree = NULL;
ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList);
joiningRangeTableCount = list_length(joinTreeTableIndexList);
/* don't allow joins on top of unions */
if (joiningRangeTableCount > 1)
{
return false;
}
subqueryRteIndex = linitial_int(joinTreeTableIndexList);
rangeTableEntry = rt_fetch(subqueryRteIndex, rangeTableList);
if (rangeTableEntry->rtekind != RTE_SUBQUERY)
{
return false;
}
subqueryTree = rangeTableEntry->subquery;
setOperations = subqueryTree->setOperations;
if (setOperations != NULL)
{
SetOperationStmt *setOperationStatement = (SetOperationStmt *) setOperations;
/*
* Note that the set operation tree is traversed elsewhere for ensuring
* that we only support UNIONs.
*/
if (setOperationStatement->op != SETOP_UNION)
{
return false;
}
return true;
}
return ContainsUnionSubquery(subqueryTree);
}
/*
* RelationRestrictionPartitionKeyIndex gets a relation restriction and finds the
* index that the partition key of the relation exists in the query. The query is
* found in the planner info of the relation restriction.
*/
static Index
RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
{
PlannerInfo *relationPlannerRoot = NULL;
Query *relationPlannerParseQuery = NULL;
List *relationTargetList = NIL;
ListCell *targetEntryCell = NULL;
Index partitionKeyTargetAttrIndex = 0;
relationPlannerRoot = relationRestriction->plannerInfo;
relationPlannerParseQuery = relationPlannerRoot->parse;
relationTargetList = relationPlannerParseQuery->targetList;
foreach(targetEntryCell, relationTargetList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
Expr *targetExpression = targetEntry->expr;
partitionKeyTargetAttrIndex++;
if (!targetEntry->resjunk &&
IsPartitionColumn(targetExpression, relationPlannerParseQuery) &&
IsA(targetExpression, Var))
{
Var *targetColumn = (Var *) targetExpression;
if (targetColumn->varno == relationRestriction->index)
{
return partitionKeyTargetAttrIndex;
}
}
}
return InvalidAttrNumber;
}

View File

@ -108,7 +108,8 @@ extern double CountDistinctErrorRate;
/* Function declaration for optimizing logical plans */
extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree,
PlannerRestrictionContext *plannerRestrictionContext);
PlannerRestrictionContext *plannerRestrictionContext,
Query *originalQuery);
/* Function declaration for getting partition method for the given relation */
extern char PartitionMethod(Oid relationId);

View File

@ -15,8 +15,10 @@
#include "distributed/multi_planner.h"
extern bool ContainsUnionSubquery(Query *queryTree);
extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
plannerRestrictionContext);
extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext);
#endif /* RELATION_RESTRICTION_EQUIVALENCE_H */

View File

@ -0,0 +1,777 @@
--
-- multi subquery toplevel union queries aims to expand existing subquery pushdown
-- regression tests to cover more cases
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
SET citus.subquery_pushdown TO true;
SET citus.enable_router_execution TO false;
-- a very simple union query
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
user_id | counter
---------+---------
7 | 9
8 | 9
15 | 9
16 | 9
20 | 9
(5 rows)
-- the same query with union all
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
user_id | counter
---------+---------
7 | 9
7 | 9
8 | 9
15 | 9
15 | 9
(5 rows)
-- the same query with group by
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
user_id | sum
---------+-----
49 | 22
15 | 19
26 | 17
48 | 17
61 | 17
(5 rows)
-- the same query with UNION ALL clause
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
user_id | sum
---------+-----
48 | 35
61 | 30
15 | 28
49 | 25
80 | 24
(5 rows)
-- the same query target list entries shuffled
SELECT user_id, sum(counter)
FROM (
SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
user_id | sum
---------+-----
49 | 22
15 | 19
26 | 17
48 | 17
61 | 17
(5 rows)
-- same query with GROUP BY
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (1, 2)
UNION
SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (5, 6)
) user_id
GROUP BY
user_id
--HAVING sum(counter) > 900
ORDER BY 1,2 DESC LIMIT 5;
user_id | sum
---------+------
1 | 518
2 | 637
4 | 343
6 | 354
7 | 1374
(5 rows)
-- the same query target list entries shuffled but this time the subqueries target list
-- is shuffled
SELECT user_id, sum(counter)
FROM (
SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (1, 2)
UNION
SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (5, 6)
) user_id
GROUP BY
user_id
--HAVING sum(counter) > 900
ORDER BY 1,2 DESC LIMIT 5;
user_id | sum
---------+------
1 | 518
2 | 637
4 | 343
6 | 354
7 | 1374
(5 rows)
-- similar query this time more subqueries and target list contains a resjunk entry
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
sum
-------
27772
25720
24993
24968
23508
(5 rows)
-- similar query as above, with UNION ALL
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 5000
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
sum
-------
27667
25080
24814
24365
23508
(5 rows)
-- unions within unions
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
( SELECT user_id,
sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id ) user_id_1
GROUP BY user_id)
UNION
( SELECT user_id,
sum(counter)
FROM
( SELECT
user_id, sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter
FROM events_table
GROUP BY user_id) user_id_2
GROUP BY user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
user_id | sum
---------+--------
23 | 126017
45 | 117323
25 | 116595
17 | 116520
90 | 115843
(5 rows)
-- top level unions are wrapped into top level aggregations
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT *
FROM
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
) as final_query
GROUP BY types
ORDER BY types;
types | sumofeventtype
-------+----------------
0 | 55
1 | 38
2 | 70
3 | 58
(4 rows)
-- exactly the same query
-- but wrapper unions are removed from the inner part of the query
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15))
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
) as final_query
GROUP BY types
ORDER BY types;
types | sumofeventtype
-------+----------------
0 | 55
1 | 38
2 | 70
3 | 58
(4 rows)
-- again excatly the same query with top level wrapper removed
SELECT ("q"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
GROUP BY types
ORDER BY types;
types | sumofeventtype
-------+----------------
0 | 55
1 | 38
2 | 70
3 | 58
(4 rows)
-- again same query but with only two top level empty queries (i.e., no group bys)
SELECT *
FROM
( SELECT *
FROM
( SELECT "t1"."user_id"
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
) AS t) "q" ORDER BY 1 LIMIT 5;
user_id
---------
0
0
0
1
1
(5 rows)
-- a very similar query UNION ALL
SELECT ("q"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION ALL
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION ALL
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION ALL
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
GROUP BY types
ORDER BY types;
types | sumofeventtype
-------+----------------
0 | 55
1 | 38
2 | 70
3 | 58
(4 rows)
-- some UNION ALL queries that are going to be pulled up
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_table)
) b;
count
-------
20002
(1 row)
-- similar query without top level agg
SELECT
user_id
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_table)
) b
ORDER BY 1 DESC
LIMIT 5;
user_id
---------
100
100
100
100
100
(5 rows)
-- similar query with multiple target list entries
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM users_table)
UNION ALL
(SELECT value_3, user_id FROM events_table)
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
user_id | value_3
---------+---------
100 | 999
100 | 997
100 | 991
100 | 989
100 | 988
(5 rows)
-- similar query group by inside the subqueries
SELECT
user_id, value_3_sum
FROM
(
(SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id)
UNION ALL
(SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id)
) b
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
user_id | value_3_sum
---------+-------------
10 | 64060
10 | 64060
62 | 62445
62 | 62445
26 | 60536
(5 rows)
-- similar query top level group by
SELECT
user_id, sum(value_3)
FROM
(
(SELECT value_3, user_id FROM users_table)
UNION ALL
(SELECT value_3, user_id FROM events_table)
) b
GROUP BY 1
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
user_id | sum
---------+--------
23 | 123923
25 | 118087
69 | 115828
26 | 114705
3 | 113915
(5 rows)
-- a long set operation list
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
user_id | value_3
---------+---------
100 | 951
99 | 558
99 | 14
98 | 987
98 | 577
(5 rows)
-- no partition key on the top
SELECT
max(value_3)
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
max
-----
997
997
996
995
995
(5 rows)
-- now lets also have some unsupported queries
-- group by is not on the partition key
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
UNION
SELECT value_1 as user_id, sum(value_2) AS counter FROM users_table GROUP BY value_1
) user_id
GROUP BY user_id;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- partition key is not selected
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT 2 * user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- excepts within unions are not supported
SELECT * FROM
(
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_1
GROUP BY user_id
)
UNION
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
EXCEPT
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_2
GROUP BY user_id)
) as ftop;
ERROR: cannot push down this subquery
DETAIL: Intersect and Except are currently unsupported
-- joins inside unions are not supported
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE users_table.user_id > events_table.user_id GROUP BY 1
) user_id
GROUP BY user_id;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- joins inside unions are not supported -- slightly more comlex than the above
SELECT * FROM
(
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_1
GROUP BY user_id
)
UNION
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE (events_table.user_id = users_table.user_id) GROUP BY events_table.user_id
) user_id_2
GROUP BY user_id)
) as ftop;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- offset inside the union
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id OFFSET 4
) user_id
GROUP BY user_id;
ERROR: cannot push down this subquery
DETAIL: Offset clause is currently unsupported
-- lower level union does not return partition key with the other relations
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
( SELECT user_id,
sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id ) user_id_1
GROUP BY user_id)
UNION
( SELECT user_id,
sum(counter)
FROM
( SELECT sum(value_2) AS counter,
user_id
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id) user_id_2
GROUP BY user_id)) AS ftop;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- some UNION all queries that are going to be pulled up
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT 2 * user_id FROM events_table)
) b;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- last query does not have partition key
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, value_2 FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
-- we don't allow joins within unions
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT users_table.user_id FROM events_table, users_table WHERE events_table.user_id = users_table.user_id)
) b;
ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position
DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list.
SET citus.subquery_pushdown TO false;
SET citus.enable_router_execution TO true;

View File

@ -41,6 +41,7 @@ test: multi_deparse_shard_query
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
test: multi_explain
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_union
test: multi_reference_table
test: multi_outer_join_reference
test: multi_single_relation_subquery

View File

@ -0,0 +1,609 @@
--
-- multi subquery toplevel union queries aims to expand existing subquery pushdown
-- regression tests to cover more cases
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
SET citus.subquery_pushdown TO true;
SET citus.enable_router_execution TO false;
-- a very simple union query
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with union all
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with group by
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with UNION ALL clause
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query target list entries shuffled
SELECT user_id, sum(counter)
FROM (
SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
GROUP BY 1
ORDER BY 2 DESC,1
LIMIT 5;
-- same query with GROUP BY
SELECT user_id, sum(counter)
FROM (
SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (1, 2)
UNION
SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (5, 6)
) user_id
GROUP BY
user_id
--HAVING sum(counter) > 900
ORDER BY 1,2 DESC LIMIT 5;
-- the same query target list entries shuffled but this time the subqueries target list
-- is shuffled
SELECT user_id, sum(counter)
FROM (
SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (1, 2)
UNION
SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (5, 6)
) user_id
GROUP BY
user_id
--HAVING sum(counter) > 900
ORDER BY 1,2 DESC LIMIT 5;
-- similar query this time more subqueries and target list contains a resjunk entry
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
-- similar query as above, with UNION ALL
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 5000
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION ALL
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
-- unions within unions
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
( SELECT user_id,
sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id ) user_id_1
GROUP BY user_id)
UNION
( SELECT user_id,
sum(counter)
FROM
( SELECT
user_id, sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter
FROM events_table
GROUP BY user_id) user_id_2
GROUP BY user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- top level unions are wrapped into top level aggregations
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT *
FROM
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
) as final_query
GROUP BY types
ORDER BY types;
-- exactly the same query
-- but wrapper unions are removed from the inner part of the query
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15))
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
) as final_query
GROUP BY types
ORDER BY types;
-- again excatly the same query with top level wrapper removed
SELECT ("q"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
GROUP BY types
ORDER BY types;
-- again same query but with only two top level empty queries (i.e., no group bys)
SELECT *
FROM
( SELECT *
FROM
( SELECT "t1"."user_id"
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
) AS t) "q" ORDER BY 1 LIMIT 5;
-- a very similar query UNION ALL
SELECT ("q"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT "t"."user_id",
"t"."time",
unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id",
min("t1"."time") AS "time",
array_agg(("t1"."event")
ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT "events"."user_id",
"events"."time",
0 AS event
FROM events_table as "events"
WHERE event_type IN (10, 11, 12, 13, 14, 15)
)
UNION ALL
(SELECT "events"."user_id", "events"."time", 1 AS event
FROM events_table as "events"
WHERE event_type IN (15, 16, 17, 18, 19) )
UNION ALL
(SELECT "events"."user_id", "events"."time", 2 AS event
FROM events_table as "events"
WHERE event_type IN (20, 21, 22, 23, 24, 25) )
UNION ALL
(SELECT "events"."user_id", "events"."time", 3 AS event
FROM events_table as "events"
WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1
GROUP BY "t1"."user_id") AS t) "q"
GROUP BY types
ORDER BY types;
-- some UNION ALL queries that are going to be pulled up
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_table)
) b;
-- similar query without top level agg
SELECT
user_id
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_table)
) b
ORDER BY 1 DESC
LIMIT 5;
-- similar query with multiple target list entries
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM users_table)
UNION ALL
(SELECT value_3, user_id FROM events_table)
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
-- similar query group by inside the subqueries
SELECT
user_id, value_3_sum
FROM
(
(SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id)
UNION ALL
(SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id)
) b
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- similar query top level group by
SELECT
user_id, sum(value_3)
FROM
(
(SELECT value_3, user_id FROM users_table)
UNION ALL
(SELECT value_3, user_id FROM events_table)
) b
GROUP BY 1
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- a long set operation list
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
-- no partition key on the top
SELECT
max(value_3)
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
GROUP BY user_id
ORDER BY 1 DESC
LIMIT 5;
-- now lets also have some unsupported queries
-- group by is not on the partition key
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
UNION
SELECT value_1 as user_id, sum(value_2) AS counter FROM users_table GROUP BY value_1
) user_id
GROUP BY user_id;
-- partition key is not selected
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT 2 * user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
-- excepts within unions are not supported
SELECT * FROM
(
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_1
GROUP BY user_id
)
UNION
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
EXCEPT
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_2
GROUP BY user_id)
) as ftop;
-- joins inside unions are not supported
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE users_table.user_id > events_table.user_id GROUP BY 1
) user_id
GROUP BY user_id;
-- joins inside unions are not supported -- slightly more comlex than the above
SELECT * FROM
(
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
) user_id_1
GROUP BY user_id
)
UNION
(
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id
UNION
SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE (events_table.user_id = users_table.user_id) GROUP BY events_table.user_id
) user_id_2
GROUP BY user_id)
) as ftop;
-- offset inside the union
SELECT user_id, sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id OFFSET 4
) user_id
GROUP BY user_id;
-- lower level union does not return partition key with the other relations
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
( SELECT user_id,
sum(value_2) AS counter
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id ) user_id_1
GROUP BY user_id)
UNION
( SELECT user_id,
sum(counter)
FROM
( SELECT sum(value_2) AS counter,
user_id
FROM users_table
GROUP BY user_id
UNION
SELECT user_id,
sum(value_2) AS counter
FROM events_table
GROUP BY user_id) user_id_2
GROUP BY user_id)) AS ftop;
-- some UNION all queries that are going to be pulled up
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT 2 * user_id FROM events_table)
) b;
-- last query does not have partition key
SELECT
user_id, value_3
FROM
(
(SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20))
UNION ALL
(SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25))
UNION ALL
(SELECT value_3, value_2 FROM events_table where event_type IN (26, 27, 28, 29, 30))
) b
ORDER BY 1 DESC, 2 DESC
LIMIT 5;
-- we don't allow joins within unions
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT users_table.user_id FROM events_table, users_table WHERE events_table.user_id = users_table.user_id)
) b;
SET citus.subquery_pushdown TO false;
SET citus.enable_router_execution TO true;