From a66a1a7054f935451ad24417782133f75f3a9d0f Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 29 Mar 2017 15:03:54 +0300 Subject: [PATCH] Add top level union support This commit adds support for UNION/UNION ALL subqueries that are in the following form: .... (Q1 UNION Q2 UNION ...) as union_query .... In other words, Citus supports allow top level unions being wrapped into aggregations queries and/or simple projection queries that only selects some fields from the lower level queries. --- .../planner/multi_logical_optimizer.c | 48 +- .../planner/multi_physical_planner.c | 9 +- .../distributed/planner/multi_planner.c | 12 +- .../planner/multi_router_planner.c | 44 +- .../relation_restriction_equivalence.c | 334 +++++++- .../distributed/multi_logical_optimizer.h | 3 +- .../relation_restriction_equivalence.h | 2 + .../regress/expected/multi_subquery_union.out | 777 ++++++++++++++++++ src/test/regress/multi_schedule | 1 + src/test/regress/sql/multi_subquery_union.sql | 609 ++++++++++++++ 10 files changed, 1795 insertions(+), 44 deletions(-) create mode 100644 src/test/regress/expected/multi_subquery_union.out create mode 100644 src/test/regress/sql/multi_subquery_union.sql diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index 4388dcc94..32a734702 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -151,7 +151,8 @@ static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column /* Local functions forward declarations for subquery pushdown checks */ static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, PlannerRestrictionContext * - plannerRestrictionContext); + plannerRestrictionContext, + Query *originalQuery); static void ErrorIfCannotPushdownSubquery(Query *subqueryTree, bool outerQueryHasLimit); static void ErrorIfUnsupportedSetOperation(Query *subqueryTree, bool outerQueryHasLimit); static bool ExtractSetOperationStatmentWalker(Node *node, List **setOperationList); @@ -192,14 +193,15 @@ static bool HasOrderByHllType(List *sortClauseList, List *targetList); * function finds the extended operator node, and splits this node into master * and worker extended operator nodes. * - * We also pass plannerRestrictionContext to the optimizer. The context - * is primarily used to decide whether the subquery is safe to pushdown. + * We also pass plannerRestrictionContext and originalQuery to the optimizer. + * These are primarily used to decide whether the subquery is safe to pushdown. * If not, it helps to produce meaningful error messages for subquery * pushdown planning. */ void MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery) { bool hasOrderByHllType = false; List *selectNodeList = NIL; @@ -219,7 +221,8 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan, ErrorIfContainsUnsupportedAggregate(logicalPlanNode); /* check that we can pushdown subquery in the plan */ - ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext); + ErrorIfContainsUnsupportedSubquery(logicalPlanNode, plannerRestrictionContext, + originalQuery); /* * If a select node exists, we use the idempower property to split the node @@ -2808,20 +2811,22 @@ GroupedByColumn(List *groupClauseList, List *targetList, Var *column) /* * ErrorIfContainsUnsupportedSubquery extracts subquery multi table from the - * logical plan and uses helper functions to check if we can push down subquery - * to worker nodes. These helper functions error out if we cannot push down the - * the subquery. + * logical plan using plannerRestrictionContext and the original query. It uses + * some helper functions to check if we can push down subquery to worker nodes. + * These helper functions error out if we cannot push down the subquery. */ static void ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, - PlannerRestrictionContext *plannerRestrictionContext) + PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery) { Query *subquery = NULL; List *extendedOpNodeList = NIL; MultiTable *multiTable = NULL; MultiExtendedOp *extendedOpNode = NULL; bool outerQueryHasLimit = false; - bool restrictionEquivalenceForPartitionKeys = false; + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; /* check if logical plan includes a subquery */ List *subqueryMultiTableList = SubqueryMultiTableList(logicalPlanNode); @@ -2833,9 +2838,26 @@ ErrorIfContainsUnsupportedSubquery(MultiNode *logicalPlanNode, /* currently in the planner we only allow one subquery in from-clause*/ Assert(list_length(subqueryMultiTableList) == 1); - restrictionEquivalenceForPartitionKeys = - RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); - if (!restrictionEquivalenceForPartitionKeys) + /* + * We're checking two things here: + * (i) If the query contains a top level union, ensure that all leaves + * return the partition key at the same position + * (ii) Else, check whether all relations joined on the partition key or not + */ + if (ContainsUnionSubquery(originalQuery)) + { + if (!SafeToPushdownUnionSubquery(relationRestrictionContext)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot pushdown the subquery since all leaves of " + "the UNION does not include partition key at the " + "same position"), + errdetail("Each leaf query of the UNION should return " + "partition key at the same position on its " + "target list."))); + } + } + else if (!RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot pushdown the subquery since all relations are not " diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c0006ce8b..cb4625e1e 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2030,14 +2030,6 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte DistTableCacheEntry *targetCacheEntry = NULL; RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; - bool restrictionEquivalenceForPartitionKeys PG_USED_FOR_ASSERTS_ONLY = - RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); - - /* - * If we're going to create tasks for subquery pushdown, all - * relations needs to be joined on the partition key. - */ - Assert(restrictionEquivalenceForPartitionKeys); /* get list of all range tables in subquery tree */ ExtractRangeTableRelationWalker((Node *) subquery, &rangeTableList); @@ -2075,6 +2067,7 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte subqueryTask = SubqueryTaskCreate(subquery, targetShardInterval, relationRestrictionContext, taskIdIndex); + /* add the task if it could be created */ if (subqueryTask != NULL) { diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index 1c124e0df..49a81ee3b 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -312,7 +312,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query { /* Create and optimize logical plan */ MultiTreeRoot *logicalPlan = MultiLogicalPlanCreate(originalQuery, query); - MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext); + MultiLogicalPlanOptimize(logicalPlan, plannerRestrictionContext, + originalQuery); /* * This check is here to make it likely that all node types used in @@ -767,11 +768,10 @@ CopyPlanParamList(List *originalPlanParamList) /* - * CreateAndPushPlannerRestrictionContext creates a new planner restriction context. - * Later, it creates a relation restriction context and a join restriction - * context, and sets those contexts in the planner restriction context. Finally, - * the planner restriction context is inserted to the beginning of the - * plannerRestrictionContextList and it is returned. + * CreateAndPushPlannerRestrictionContext creates a new relation restriction context + * and a new join context, inserts it to the beginning of the + * plannerRestrictionContextList. Finally, the planner restriction context is + * inserted to the beginning of the plannerRestrictionContextList and it is returned. */ static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index ec297b70f..7e74e9f39 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -88,6 +88,8 @@ static MultiPlan * CreateSingleTaskRouterPlan(Query *originalQuery, static MultiPlan * CreateInsertSelectRouterPlan(Query *originalQuery, PlannerRestrictionContext * plannerRestrictionContext); +static bool SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery); static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, RelationRestrictionContext * @@ -291,7 +293,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, RelationRestrictionContext *relationRestrictionContext = plannerRestrictionContext->relationRestrictionContext; bool allReferenceTables = relationRestrictionContext->allReferenceTables; - bool restrictionEquivalenceForPartitionKeys = false; + bool safeToPushDownSubquery = false; multiPlan->operation = originalQuery->commandType; @@ -307,8 +309,8 @@ CreateInsertSelectRouterPlan(Query *originalQuery, return multiPlan; } - restrictionEquivalenceForPartitionKeys = - RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); + safeToPushDownSubquery = SafeToPushDownSubquery(plannerRestrictionContext, + originalQuery); /* * Plan select query for each shard in the target table. Do so by replacing the @@ -328,7 +330,7 @@ CreateInsertSelectRouterPlan(Query *originalQuery, modifyTask = RouterModifyTaskForShardInterval(originalQuery, targetShardInterval, relationRestrictionContext, taskIdIndex, - restrictionEquivalenceForPartitionKeys); + safeToPushDownSubquery); /* add the task if it could be created */ if (modifyTask != NULL) @@ -374,6 +376,36 @@ CreateInsertSelectRouterPlan(Query *originalQuery, } +/* + * SafeToPushDownSubquery returns true if either + * (i) there exists join in the query and all relations joined on their + * partition keys + * (ii) there exists only union set operations and all relations has + * partition keys in the same ordinal position in the query + */ +static bool +SafeToPushDownSubquery(PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery) +{ + RelationRestrictionContext *relationRestrictionContext = + plannerRestrictionContext->relationRestrictionContext; + bool restrictionEquivalenceForPartitionKeys = + RestrictionEquivalenceForPartitionKeys(plannerRestrictionContext); + + if (restrictionEquivalenceForPartitionKeys) + { + return true; + } + + if (ContainsUnionSubquery(originalQuery)) + { + return SafeToPushdownUnionSubquery(relationRestrictionContext); + } + + return false; +} + + /* * RouterModifyTaskForShardInterval creates a modify task by * replacing the partitioning qual parameter added in multi_planner() @@ -389,7 +421,7 @@ static Task * RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInterval, RelationRestrictionContext *restrictionContext, uint32 taskIdIndex, - bool allRelationsJoinedOnPartitionKey) + bool safeToPushdownSubquery) { Query *copiedQuery = copyObject(originalQuery); RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); @@ -433,7 +465,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter List *extendedBaseRestrictInfo = originalBaseRestrictInfo; Index rteIndex = restriction->index; - if (!allRelationsJoinedOnPartitionKey || allReferenceTables) + if (!safeToPushdownSubquery || allReferenceTables) { continue; } diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index 7d7992cda..3af5056a2 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -12,6 +12,7 @@ #include "distributed/multi_planner.h" #include "distributed/multi_logical_planner.h" +#include "distributed/multi_logical_optimizer.h" #include "distributed/pg_dist_partition.h" #include "distributed/relation_restriction_equivalence.h" #include "nodes/nodeFuncs.h" @@ -61,6 +62,11 @@ typedef struct AttributeEquivalenceClassMember } AttributeEquivalenceClassMember; +static Var * FindTranslatedVar(List *appendRelList, Oid relationOid, + Index relationRteIndex, Index *partitionKeyIndex); +static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, + RelationRestrictionContext * + restrictionContext); static uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext); static List * GenerateAttributeEquivalencesForRelationRestrictions( RelationRestrictionContext *restrictionContext); @@ -113,6 +119,193 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass firstClass, AttributeEquivalenceClass * secondClass); +static Index RelationRestrictionPartitionKeyIndex(RelationRestriction * + relationRestriction); + + +/* + * SafeToPushdownUnionSubquery returns true if all the relations are returns + * partition keys in the same ordinal position. + * + * Note that the function expects (and asserts) the input query to be a top + * level union query defined by TopLevelUnionQuery(). + * + * Lastly, the function fails to produce correct output if the target lists contains + * multiple partition keys on the target list such as the following: + * + * select count(*) from ( + * select user_id, user_id from users_table + * union + * select 2, user_id from users_table) u; + * + * For the above query, although the second item in the target list make this query + * safe to push down, the function would fail to return true. + */ +bool +SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext) +{ + Index unionQueryPartitionKeyIndex = 0; + AttributeEquivalenceClass *attributeEquivalance = + palloc0(sizeof(AttributeEquivalenceClass)); + ListCell *relationRestrictionCell = NULL; + + attributeEquivalance->equivalenceId = attributeEquivalenceId++; + + foreach(relationRestrictionCell, restrictionContext->relationRestrictionList) + { + RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); + Index partitionKeyIndex = InvalidAttrNumber; + PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo; + List *targetList = relationPlannerRoot->parse->targetList; + List *appendRelList = relationPlannerRoot->append_rel_list; + Var *varToBeAdded = NULL; + TargetEntry *targetEntryToAdd = NULL; + + /* + * We first check whether UNION ALLs are pulled up or not. Note that Postgres + * planner creates AppendRelInfos per each UNION ALL query that is pulled up. + * Then, postgres stores the related information in the append_rel_list on the + * plannerInfo struct. + */ + if (appendRelList != NULL) + { + varToBeAdded = FindTranslatedVar(appendRelList, + relationRestriction->relationId, + relationRestriction->index, + &partitionKeyIndex); + + /* union does not have partition key in the target list */ + if (partitionKeyIndex == 0) + { + return false; + } + } + else + { + partitionKeyIndex = + RelationRestrictionPartitionKeyIndex(relationRestriction); + + /* union does not have partition key in the target list */ + if (partitionKeyIndex == 0) + { + return false; + } + + targetEntryToAdd = list_nth(targetList, partitionKeyIndex - 1); + if (!IsA(targetEntryToAdd->expr, Var)) + { + return false; + } + + varToBeAdded = (Var *) targetEntryToAdd->expr; + } + + /* + * If the first relation doesn't have partition key on the target + * list of the query that the relation in, simply not allow to push down + * the query. + */ + if (partitionKeyIndex == InvalidAttrNumber) + { + return false; + } + + /* + * We find the first relations partition key index in the target list. Later, + * we check whether all the relations have partition keys in the + * same position. + */ + if (unionQueryPartitionKeyIndex == InvalidAttrNumber) + { + unionQueryPartitionKeyIndex = partitionKeyIndex; + } + else if (unionQueryPartitionKeyIndex != partitionKeyIndex) + { + return false; + } + + AddToAttributeEquivalenceClass(&attributeEquivalance, relationPlannerRoot, + varToBeAdded); + } + + return EquivalenceListContainsRelationsEquality(list_make1(attributeEquivalance), + restrictionContext); +} + + +/* + * FindTranslatedVar iterates on the appendRelList and tries to find a translated + * child var identified by the relation id and the relation rte index. + * + * Note that postgres translates UNION ALL target list elements into translated_vars + * list on the corresponding AppendRelInfo struct. For details, see the related + * structs. + * + * The function returns NULL if it cannot find a translated var. + */ +static Var * +FindTranslatedVar(List *appendRelList, Oid relationOid, Index relationRteIndex, + Index *partitionKeyIndex) +{ + ListCell *appendRelCell = NULL; + AppendRelInfo *targetAppendRelInfo = NULL; + ListCell *translatedVarCell = NULL; + AttrNumber childAttrNumber = 0; + Var *relationPartitionKey = NULL; + List *translaterVars = NULL; + + *partitionKeyIndex = 0; + + /* iterate on the queries that are part of UNION ALL subselects */ + foreach(appendRelCell, appendRelList) + { + AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell); + + /* + * We're only interested in the child rel that is equal to the + * relation we're investigating. + */ + if (appendRelInfo->child_relid == relationRteIndex) + { + targetAppendRelInfo = appendRelInfo; + break; + } + } + + /* we couldn't find the necessary append rel info */ + if (targetAppendRelInfo == NULL) + { + return NULL; + } + + relationPartitionKey = PartitionKey(relationOid); + + translaterVars = targetAppendRelInfo->translated_vars; + foreach(translatedVarCell, translaterVars) + { + Node *targetNode = (Node *) lfirst(translatedVarCell); + Var *targetVar = NULL; + + childAttrNumber++; + + if (!IsA(targetNode, Var)) + { + continue; + } + + targetVar = (Var *) lfirst(translatedVarCell); + if (targetVar->varno == relationRteIndex && + targetVar->varattno == relationPartitionKey->varattno) + { + *partitionKeyIndex = childAttrNumber; + + return targetVar; + } + } + + return NULL; +} + /* * RestrictionEquivalenceForPartitionKeys aims to deduce whether each of the RTE_RELATION @@ -140,14 +333,14 @@ static void ListConcatUniqueAttributeClassMemberLists(AttributeEquivalenceClass * step, we try generate a common attribute equivalence class that holds as much as * AttributeEquivalenceMembers whose attributes are a partition keys. * - * AllRelationsJoinedOnPartitionKey uses both relation restrictions and join restrictions + * RestrictionEquivalenceForPartitionKeys uses both relation restrictions and join restrictions * to find as much as information that Postgres planner provides to extensions. For the * details of the usage, please see GenerateAttributeEquivalencesForRelationRestrictions() * and GenerateAttributeEquivalencesForJoinRestrictions() */ bool -RestrictionEquivalenceForPartitionKeys( - PlannerRestrictionContext *plannerRestrictionContext) +RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * + plannerRestrictionContext) { RelationRestrictionContext *restrictionContext = plannerRestrictionContext->relationRestrictionContext; @@ -157,13 +350,9 @@ RestrictionEquivalenceForPartitionKeys( List *relationRestrictionAttributeEquivalenceList = NIL; List *joinRestrictionAttributeEquivalenceList = NIL; List *allAttributeEquivalenceList = NIL; - AttributeEquivalenceClass *commonEquivalenceClass = NULL; uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext); uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList); uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount; - ListCell *commonEqClassCell = NULL; - ListCell *relationRestrictionCell = NULL; - Relids commonRteIdentities = NULL; /* * If the query includes a single relation which is not a reference table, @@ -195,12 +384,33 @@ RestrictionEquivalenceForPartitionKeys( list_concat(relationRestrictionAttributeEquivalenceList, joinRestrictionAttributeEquivalenceList); + return EquivalenceListContainsRelationsEquality(allAttributeEquivalenceList, + restrictionContext); +} + + +/* + * EquivalenceListContainsRelationsEquality gets a list of attributed equivalence + * list and a relation restriction context. The function first generates a common + * equivalence class out of the attributeEquivalenceList. Later, the function checks + * whether all the relations exists in the common equivalence class. + * + */ +static bool +EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, + RelationRestrictionContext *restrictionContext) +{ + AttributeEquivalenceClass *commonEquivalenceClass = NULL; + ListCell *commonEqClassCell = NULL; + ListCell *relationRestrictionCell = NULL; + Relids commonRteIdentities = NULL; + /* * In general we're trying to expand existing the equivalence classes to find a * common equivalence class. The main goal is to test whether this main class * contains all partition keys of the existing relations. */ - commonEquivalenceClass = GenerateCommonEquivalence(allAttributeEquivalenceList); + commonEquivalenceClass = GenerateCommonEquivalence(attributeEquivalenceList); /* add the rte indexes of relations to a bitmap */ foreach(commonEqClassCell, commonEquivalenceClass->equivalentAttributes) @@ -823,7 +1033,7 @@ AddUnionAllSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass ** List *appendRelList = root->append_rel_list; ListCell *appendRelCell = NULL; - /* iterate on the queries that are part of UNION ALL subselects */ + /* iterate on the queries that are part of UNION ALL subqueries */ foreach(appendRelCell, appendRelList) { AppendRelInfo *appendRelInfo = (AppendRelInfo *) lfirst(appendRelCell); @@ -1022,7 +1232,6 @@ AttributeEquivalancesAreEqual(AttributeEquivalenceClass *firstAttributeEquivalan return false; } - foreach(firstAttributeEquivalanceCell, firstEquivalenceMemberList) { AttributeEquivalenceClassMember *firstEqMember = @@ -1052,3 +1261,108 @@ AttributeEquivalancesAreEqual(AttributeEquivalenceClass *firstAttributeEquivalan return true; } + + +/* + * ContainsUnionSubquery gets a queryTree and returns true if the query + * contains + * - a subquery with UNION set operation + * - no joins above the UNION set operation in the query tree + * + * Note that the function allows top level unions being wrapped into aggregations + * queries and/or simple projection queries that only selects some fields from + * the lower level queries. + * + * If there exists joins before the set operations, the function returns false. + * Similarly, if the query does not contain any union set operations, the + * function returns false. + */ +bool +ContainsUnionSubquery(Query *queryTree) +{ + List *rangeTableList = queryTree->rtable; + Node *setOperations = queryTree->setOperations; + List *joinTreeTableIndexList = NIL; + Index subqueryRteIndex = 0; + uint32 joiningRangeTableCount = 0; + RangeTblEntry *rangeTableEntry = NULL; + Query *subqueryTree = NULL; + + ExtractRangeTableIndexWalker((Node *) queryTree->jointree, &joinTreeTableIndexList); + joiningRangeTableCount = list_length(joinTreeTableIndexList); + + /* don't allow joins on top of unions */ + if (joiningRangeTableCount > 1) + { + return false; + } + + subqueryRteIndex = linitial_int(joinTreeTableIndexList); + rangeTableEntry = rt_fetch(subqueryRteIndex, rangeTableList); + if (rangeTableEntry->rtekind != RTE_SUBQUERY) + { + return false; + } + + subqueryTree = rangeTableEntry->subquery; + setOperations = subqueryTree->setOperations; + if (setOperations != NULL) + { + SetOperationStmt *setOperationStatement = (SetOperationStmt *) setOperations; + + /* + * Note that the set operation tree is traversed elsewhere for ensuring + * that we only support UNIONs. + */ + if (setOperationStatement->op != SETOP_UNION) + { + return false; + } + + return true; + } + + return ContainsUnionSubquery(subqueryTree); +} + + +/* + * RelationRestrictionPartitionKeyIndex gets a relation restriction and finds the + * index that the partition key of the relation exists in the query. The query is + * found in the planner info of the relation restriction. + */ +static Index +RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction) +{ + PlannerInfo *relationPlannerRoot = NULL; + Query *relationPlannerParseQuery = NULL; + List *relationTargetList = NIL; + ListCell *targetEntryCell = NULL; + Index partitionKeyTargetAttrIndex = 0; + + relationPlannerRoot = relationRestriction->plannerInfo; + relationPlannerParseQuery = relationPlannerRoot->parse; + relationTargetList = relationPlannerParseQuery->targetList; + + foreach(targetEntryCell, relationTargetList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Expr *targetExpression = targetEntry->expr; + + partitionKeyTargetAttrIndex++; + + if (!targetEntry->resjunk && + IsPartitionColumn(targetExpression, relationPlannerParseQuery) && + IsA(targetExpression, Var)) + { + Var *targetColumn = (Var *) targetExpression; + + if (targetColumn->varno == relationRestriction->index) + { + return partitionKeyTargetAttrIndex; + } + } + } + + return InvalidAttrNumber; +} diff --git a/src/include/distributed/multi_logical_optimizer.h b/src/include/distributed/multi_logical_optimizer.h index a3273e13c..d10a9d1aa 100644 --- a/src/include/distributed/multi_logical_optimizer.h +++ b/src/include/distributed/multi_logical_optimizer.h @@ -108,7 +108,8 @@ extern double CountDistinctErrorRate; /* Function declaration for optimizing logical plans */ extern void MultiLogicalPlanOptimize(MultiTreeRoot *multiTree, - PlannerRestrictionContext *plannerRestrictionContext); + PlannerRestrictionContext *plannerRestrictionContext, + Query *originalQuery); /* Function declaration for getting partition method for the given relation */ extern char PartitionMethod(Oid relationId); diff --git a/src/include/distributed/relation_restriction_equivalence.h b/src/include/distributed/relation_restriction_equivalence.h index be7c25674..f06a707c5 100644 --- a/src/include/distributed/relation_restriction_equivalence.h +++ b/src/include/distributed/relation_restriction_equivalence.h @@ -15,8 +15,10 @@ #include "distributed/multi_planner.h" +extern bool ContainsUnionSubquery(Query *queryTree); extern bool RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext * plannerRestrictionContext); +extern bool SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext); #endif /* RELATION_RESTRICTION_EQUIVALENCE_H */ diff --git a/src/test/regress/expected/multi_subquery_union.out b/src/test/regress/expected/multi_subquery_union.out new file mode 100644 index 000000000..ee5d58e11 --- /dev/null +++ b/src/test/regress/expected/multi_subquery_union.out @@ -0,0 +1,777 @@ +-- +-- multi subquery toplevel union queries aims to expand existing subquery pushdown +-- regression tests to cover more cases +-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql +-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests +-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; +SET citus.subquery_pushdown TO true; +SET citus.enable_router_execution TO false; +-- a very simple union query +SELECT user_id, counter +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +ORDER BY 2 DESC,1 +LIMIT 5; + user_id | counter +---------+--------- + 7 | 9 + 8 | 9 + 15 | 9 + 16 | 9 + 20 | 9 +(5 rows) + +-- the same query with union all +SELECT user_id, counter +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION ALL + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +ORDER BY 2 DESC,1 +LIMIT 5; + user_id | counter +---------+--------- + 7 | 9 + 7 | 9 + 8 | 9 + 15 | 9 + 15 | 9 +(5 rows) + +-- the same query with group by +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + user_id | sum +---------+----- + 49 | 22 + 15 | 19 + 26 | 17 + 48 | 17 + 61 | 17 +(5 rows) + +-- the same query with UNION ALL clause +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION ALL + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + user_id | sum +---------+----- + 48 | 35 + 61 | 30 + 15 | 28 + 49 | 25 + 80 | 24 +(5 rows) + +-- the same query target list entries shuffled +SELECT user_id, sum(counter) +FROM ( + SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + user_id | sum +---------+----- + 49 | 22 + 15 | 19 + 26 | 17 + 48 | 17 + 61 | 17 +(5 rows) + +-- same query with GROUP BY +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (1, 2) + UNION + SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (5, 6) +) user_id +GROUP BY + user_id +--HAVING sum(counter) > 900 +ORDER BY 1,2 DESC LIMIT 5; + user_id | sum +---------+------ + 1 | 518 + 2 | 637 + 4 | 343 + 6 | 354 + 7 | 1374 +(5 rows) + +-- the same query target list entries shuffled but this time the subqueries target list +-- is shuffled +SELECT user_id, sum(counter) +FROM ( + SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (1, 2) + UNION + SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (5, 6) +) user_id +GROUP BY + user_id +--HAVING sum(counter) > 900 +ORDER BY 1,2 DESC LIMIT 5; + user_id | sum +---------+------ + 1 | 518 + 2 | 637 + 4 | 343 + 6 | 354 + 7 | 1374 +(5 rows) + +-- similar query this time more subqueries and target list contains a resjunk entry +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; + sum +------- + 27772 + 25720 + 24993 + 24968 + 23508 +(5 rows) + +-- similar query as above, with UNION ALL +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 5000 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; + sum +------- + 27667 + 25080 + 24814 + 24365 + 23508 +(5 rows) + +-- unions within unions +SELECT * +FROM ( + ( SELECT user_id, + sum(counter) + FROM + ( SELECT user_id, + sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id ) user_id_1 + GROUP BY user_id) + UNION + ( SELECT user_id, + sum(counter) + FROM + ( SELECT + user_id, sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + SELECT user_id, sum(value_2) AS counter + + FROM events_table + GROUP BY user_id) user_id_2 + GROUP BY user_id)) AS ftop +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + user_id | sum +---------+-------- + 23 | 126017 + 45 | 117323 + 25 | 116595 + 17 | 116520 + 90 | 115843 +(5 rows) + +-- top level unions are wrapped into top level aggregations +SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT *, random() + FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT * + FROM + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) events_subquery_2) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1 + GROUP BY "t1"."user_id") AS t) "q" +) as final_query +GROUP BY types +ORDER BY types; + types | sumofeventtype +-------+---------------- + 0 | 55 + 1 | 38 + 2 | 70 + 3 | 58 +(4 rows) + +-- exactly the same query +-- but wrapper unions are removed from the inner part of the query +SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT *, random() + FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15)) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +) as final_query +GROUP BY types +ORDER BY types; + types | sumofeventtype +-------+---------------- + 0 | 55 + 1 | 38 + 2 | 70 + 3 | 58 +(4 rows) + +-- again excatly the same query with top level wrapper removed +SELECT ("q"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +GROUP BY types +ORDER BY types; + types | sumofeventtype +-------+---------------- + 0 | 55 + 1 | 38 + 2 | 70 + 3 | 58 +(4 rows) + +-- again same query but with only two top level empty queries (i.e., no group bys) +SELECT * +FROM + ( SELECT * + FROM + ( SELECT "t1"."user_id" + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + ) AS t) "q" ORDER BY 1 LIMIT 5; + user_id +--------- + 0 + 0 + 0 + 1 + 1 +(5 rows) + +-- a very similar query UNION ALL +SELECT ("q"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION ALL + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION ALL + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION ALL + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +GROUP BY types +ORDER BY types; + types | sumofeventtype +-------+---------------- + 0 | 55 + 1 | 38 + 2 | 70 + 3 | 58 +(4 rows) + +-- some UNION ALL queries that are going to be pulled up +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT user_id FROM events_table) +) b; + count +------- + 20002 +(1 row) + +-- similar query without top level agg +SELECT + user_id +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT user_id FROM events_table) +) b +ORDER BY 1 DESC +LIMIT 5; + user_id +--------- + 100 + 100 + 100 + 100 + 100 +(5 rows) + +-- similar query with multiple target list entries +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM users_table) + UNION ALL + (SELECT value_3, user_id FROM events_table) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; + user_id | value_3 +---------+--------- + 100 | 999 + 100 | 997 + 100 | 991 + 100 | 989 + 100 | 988 +(5 rows) + +-- similar query group by inside the subqueries +SELECT + user_id, value_3_sum +FROM +( + (SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id) + UNION ALL + (SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id) +) b +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + user_id | value_3_sum +---------+------------- + 10 | 64060 + 10 | 64060 + 62 | 62445 + 62 | 62445 + 26 | 60536 +(5 rows) + +-- similar query top level group by +SELECT + user_id, sum(value_3) +FROM +( + (SELECT value_3, user_id FROM users_table) + UNION ALL + (SELECT value_3, user_id FROM events_table) +) b +GROUP BY 1 +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + user_id | sum +---------+-------- + 23 | 123923 + 25 | 118087 + 69 | 115828 + 26 | 114705 + 3 | 113915 +(5 rows) + +-- a long set operation list +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; + user_id | value_3 +---------+--------- + 100 | 951 + 99 | 558 + 99 | 14 + 98 | 987 + 98 | 577 +(5 rows) + +-- no partition key on the top +SELECT + max(value_3) +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +GROUP BY user_id +ORDER BY 1 DESC +LIMIT 5; + max +----- + 997 + 997 + 996 + 995 + 995 +(5 rows) + +-- now lets also have some unsupported queries +-- group by is not on the partition key +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + UNION + SELECT value_1 as user_id, sum(value_2) AS counter FROM users_table GROUP BY value_1 +) user_id +GROUP BY user_id; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- partition key is not selected +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT 2 * user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- excepts within unions are not supported +SELECT * FROM +( +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + ) user_id_1 + GROUP BY user_id +) +UNION +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + EXCEPT + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id +) user_id_2 + GROUP BY user_id) +) as ftop; +ERROR: cannot push down this subquery +DETAIL: Intersect and Except are currently unsupported +-- joins inside unions are not supported +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE users_table.user_id > events_table.user_id GROUP BY 1 +) user_id +GROUP BY user_id; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- joins inside unions are not supported -- slightly more comlex than the above +SELECT * FROM +( +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + ) user_id_1 + GROUP BY user_id +) +UNION +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE (events_table.user_id = users_table.user_id) GROUP BY events_table.user_id +) user_id_2 + GROUP BY user_id) +) as ftop; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- offset inside the union +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id OFFSET 4 +) user_id +GROUP BY user_id; +ERROR: cannot push down this subquery +DETAIL: Offset clause is currently unsupported +-- lower level union does not return partition key with the other relations +SELECT * +FROM ( + ( SELECT user_id, + sum(counter) + FROM + ( SELECT user_id, + sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id ) user_id_1 + GROUP BY user_id) + UNION + ( SELECT user_id, + sum(counter) + FROM + ( SELECT sum(value_2) AS counter, + user_id + FROM users_table + GROUP BY user_id + + UNION + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id) user_id_2 + GROUP BY user_id)) AS ftop; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- some UNION all queries that are going to be pulled up +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT 2 * user_id FROM events_table) +) b; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- last query does not have partition key +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, value_2 FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +-- we don't allow joins within unions +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT users_table.user_id FROM events_table, users_table WHERE events_table.user_id = users_table.user_id) +) b; +ERROR: cannot pushdown the subquery since all leaves of the UNION does not include partition key at the same position +DETAIL: Each leaf query of the UNION should return partition key at the same position on its target list. +SET citus.subquery_pushdown TO false; +SET citus.enable_router_execution TO true; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 4362c4314..1ca71eec6 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -41,6 +41,7 @@ test: multi_deparse_shard_query test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery test: multi_explain test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics +test: multi_subquery_union test: multi_reference_table test: multi_outer_join_reference test: multi_single_relation_subquery diff --git a/src/test/regress/sql/multi_subquery_union.sql b/src/test/regress/sql/multi_subquery_union.sql new file mode 100644 index 000000000..f459abed9 --- /dev/null +++ b/src/test/regress/sql/multi_subquery_union.sql @@ -0,0 +1,609 @@ +-- +-- multi subquery toplevel union queries aims to expand existing subquery pushdown +-- regression tests to cover more cases +-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql + +-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests +-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000; + +SET citus.subquery_pushdown TO true; +SET citus.enable_router_execution TO false; +-- a very simple union query +SELECT user_id, counter +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +ORDER BY 2 DESC,1 +LIMIT 5; + +-- the same query with union all +SELECT user_id, counter +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION ALL + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +ORDER BY 2 DESC,1 +LIMIT 5; + +-- the same query with group by +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + +-- the same query with UNION ALL clause +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION ALL + SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + +-- the same query target list entries shuffled +SELECT user_id, sum(counter) +FROM ( + SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (1, 2, 3, 4, 5) + UNION + SELECT value_2 % 10 AS counter, user_id FROM events_table WHERE event_type IN (5, 6, 7, 8, 9, 10) +) user_id +GROUP BY 1 +ORDER BY 2 DESC,1 +LIMIT 5; + +-- same query with GROUP BY +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (1, 2) + UNION + SELECT user_id, value_2 AS counter FROM events_table WHERE event_type IN (5, 6) +) user_id +GROUP BY + user_id +--HAVING sum(counter) > 900 +ORDER BY 1,2 DESC LIMIT 5; + + +-- the same query target list entries shuffled but this time the subqueries target list +-- is shuffled +SELECT user_id, sum(counter) +FROM ( + SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (1, 2) + UNION + SELECT value_2 AS counter, user_id FROM events_table WHERE event_type IN (5, 6) +) user_id +GROUP BY + user_id +--HAVING sum(counter) > 900 +ORDER BY 1,2 DESC LIMIT 5; + + +-- similar query this time more subqueries and target list contains a resjunk entry +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; + +-- similar query as above, with UNION ALL +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 5000 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION ALL + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; + +-- unions within unions +SELECT * +FROM ( + ( SELECT user_id, + sum(counter) + FROM + ( SELECT user_id, + sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id ) user_id_1 + GROUP BY user_id) + UNION + ( SELECT user_id, + sum(counter) + FROM + ( SELECT + user_id, sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + + SELECT user_id, sum(value_2) AS counter + + FROM events_table + GROUP BY user_id) user_id_2 + GROUP BY user_id)) AS ftop +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + +-- top level unions are wrapped into top level aggregations +SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT *, random() + FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT * + FROM + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) events_subquery_2) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3) + UNION + (SELECT * + FROM + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1 + GROUP BY "t1"."user_id") AS t) "q" +) as final_query +GROUP BY types +ORDER BY types; + +-- exactly the same query +-- but wrapper unions are removed from the inner part of the query +SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT *, random() + FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15)) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +) as final_query +GROUP BY types +ORDER BY types; + +-- again excatly the same query with top level wrapper removed +SELECT ("q"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +GROUP BY types +ORDER BY types; + +-- again same query but with only two top level empty queries (i.e., no group bys) +SELECT * +FROM + ( SELECT * + FROM + ( SELECT "t1"."user_id" + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + ) AS t) "q" ORDER BY 1 LIMIT 5; + +-- a very similar query UNION ALL +SELECT ("q"."event_types") as types, count(*) AS sumOfEventType +FROM + ( SELECT "t"."user_id", + "t"."time", + unnest("t"."collected_events") AS "event_types" + FROM + ( SELECT "t1"."user_id", + min("t1"."time") AS "time", + array_agg(("t1"."event") + ORDER BY TIME ASC, event DESC) AS collected_events + FROM ( + (SELECT "events"."user_id", + "events"."time", + 0 AS event + FROM events_table as "events" + WHERE event_type IN (10, 11, 12, 13, 14, 15) + ) + UNION ALL + (SELECT "events"."user_id", "events"."time", 1 AS event + FROM events_table as "events" + WHERE event_type IN (15, 16, 17, 18, 19) ) + UNION ALL + + (SELECT "events"."user_id", "events"."time", 2 AS event + FROM events_table as "events" + WHERE event_type IN (20, 21, 22, 23, 24, 25) ) + UNION ALL + + (SELECT "events"."user_id", "events"."time", 3 AS event + FROM events_table as "events" + WHERE event_type IN (26, 27, 28, 29, 30, 13))) t1 + GROUP BY "t1"."user_id") AS t) "q" +GROUP BY types +ORDER BY types; + +-- some UNION ALL queries that are going to be pulled up +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT user_id FROM events_table) +) b; + +-- similar query without top level agg +SELECT + user_id +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT user_id FROM events_table) +) b +ORDER BY 1 DESC +LIMIT 5; + +-- similar query with multiple target list entries +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM users_table) + UNION ALL + (SELECT value_3, user_id FROM events_table) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; + +-- similar query group by inside the subqueries +SELECT + user_id, value_3_sum +FROM +( + (SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id) + UNION ALL + (SELECT sum(value_3) as value_3_sum, user_id FROM users_table GROUP BY user_id) +) b +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + +-- similar query top level group by +SELECT + user_id, sum(value_3) +FROM +( + (SELECT value_3, user_id FROM users_table) + UNION ALL + (SELECT value_3, user_id FROM events_table) +) b +GROUP BY 1 +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + +-- a long set operation list +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; + +-- no partition key on the top +SELECT + max(value_3) +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +GROUP BY user_id +ORDER BY 1 DESC +LIMIT 5; + + +-- now lets also have some unsupported queries + +-- group by is not on the partition key +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + UNION + SELECT value_1 as user_id, sum(value_2) AS counter FROM users_table GROUP BY value_1 +) user_id +GROUP BY user_id; + +-- partition key is not selected +SELECT sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500 + UNION + SELECT 2 * user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500 +) user_id +GROUP BY user_id ORDER BY 1 DESC LIMIT 5; + +-- excepts within unions are not supported +SELECT * FROM +( +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + ) user_id_1 + GROUP BY user_id +) +UNION +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + EXCEPT + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id +) user_id_2 + GROUP BY user_id) +) as ftop; + +-- joins inside unions are not supported +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE users_table.user_id > events_table.user_id GROUP BY 1 +) user_id +GROUP BY user_id; + +-- joins inside unions are not supported -- slightly more comlex than the above +SELECT * FROM +( +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + ) user_id_1 + GROUP BY user_id +) +UNION +( + SELECT user_id, sum(counter) + FROM ( + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id + UNION + SELECT events_table.user_id, sum(events_table.value_2) AS counter FROM events_table, users_table WHERE (events_table.user_id = users_table.user_id) GROUP BY events_table.user_id +) user_id_2 + GROUP BY user_id) +) as ftop; + +-- offset inside the union +SELECT user_id, sum(counter) +FROM ( + SELECT user_id, sum(value_2) AS counter FROM events_table GROUP BY user_id + UNION + SELECT user_id, sum(value_2) AS counter FROM users_table GROUP BY user_id OFFSET 4 +) user_id +GROUP BY user_id; + +-- lower level union does not return partition key with the other relations +SELECT * +FROM ( + ( SELECT user_id, + sum(counter) + FROM + ( SELECT user_id, + sum(value_2) AS counter + FROM users_table + GROUP BY user_id + + UNION + + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id ) user_id_1 + GROUP BY user_id) + UNION + ( SELECT user_id, + sum(counter) + FROM + ( SELECT sum(value_2) AS counter, + user_id + FROM users_table + GROUP BY user_id + + UNION + + SELECT user_id, + sum(value_2) AS counter + FROM events_table + GROUP BY user_id) user_id_2 + GROUP BY user_id)) AS ftop; + +-- some UNION all queries that are going to be pulled up +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT 2 * user_id FROM events_table) +) b; + +-- last query does not have partition key +SELECT + user_id, value_3 +FROM +( + (SELECT value_3, user_id FROM events_table where event_type IN (1, 2, 3, 4, 5)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (6, 7, 8, 9, 10)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (11, 12, 13, 14, 15)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (16, 17, 18, 19, 20)) + UNION ALL + (SELECT value_3, user_id FROM events_table where event_type IN (21, 22, 23, 24, 25)) + UNION ALL + (SELECT value_3, value_2 FROM events_table where event_type IN (26, 27, 28, 29, 30)) +) b +ORDER BY 1 DESC, 2 DESC +LIMIT 5; + +-- we don't allow joins within unions +SELECT + count(*) +FROM +( + (SELECT user_id FROM users_table) + UNION ALL + (SELECT users_table.user_id FROM events_table, users_table WHERE events_table.user_id = users_table.user_id) +) b; + +SET citus.subquery_pushdown TO false; +SET citus.enable_router_execution TO true;