Merge pull request #1532 from citusdata/subquery_pushdown_on_reference_tables

Subquery pushdown on reference tables
pull/1503/head
Burak Velioglu 2017-08-11 10:32:22 +03:00 committed by GitHub
commit c4eb6c5153
22 changed files with 2233 additions and 153 deletions

View File

@ -464,6 +464,13 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
}
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
/* means it is a reference table and do not add any shard interval information */
if (shardOpExpressions == NIL)
{
continue;
}
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions);
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);

View File

@ -85,6 +85,8 @@ static FieldSelect * CompositeFieldRecursive(Expr *expression, Query *query);
static bool FullCompositeFieldList(List *compositeFieldList);
static MultiNode * MultiPlanTree(Query *queryTree);
static void ErrorIfQueryNotSupported(Query *queryTree);
static bool HasUnsupportedReferenceTableJoin(
PlannerRestrictionContext *plannerRestrictionContext);
static bool HasUnsupportedJoinWalker(Node *node, void *context);
static bool ErrorHintRequired(const char *errorHint, Query *queryTree);
static DeferredErrorMessage * DeferErrorIfUnsupportedSubqueryRepartition(Query *
@ -94,6 +96,8 @@ static bool HasOuterJoin(Query *queryTree);
static bool HasOuterJoinWalker(Node *node, void *maxJoinLevel);
static bool HasComplexJoinOrder(Query *queryTree);
static bool HasComplexRangeTableType(Query *queryTree);
static bool RelationInfoHasReferenceTable(PlannerInfo *plannerInfo,
RelOptInfo *relationInfo);
static void ValidateClauseList(List *clauseList);
static void ValidateSubqueryPushdownClauseList(List *clauseList);
static bool ExtractFromExpressionWalker(Node *node,
@ -188,7 +192,6 @@ MultiLogicalPlanCreate(Query *originalQuery, Query *queryTree,
{
originalQuery = (Query *) ResolveExternalParams((Node *) originalQuery,
boundParams);
multiQueryNode = MultiSubqueryPlanTree(originalQuery, queryTree,
plannerRestrictionContext);
}
@ -539,6 +542,14 @@ DeferErrorIfUnsupportedSubqueryPushdown(Query *originalQuery,
"one another relation using distribution keys and "
"equality operator.", NULL);
}
else if (HasUnsupportedReferenceTableJoin(plannerRestrictionContext))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot pushdown the subquery",
"There exist a reference table in the outer part of the "
"outer join",
NULL);
}
/*
* We first extract all the queries that appear in the original query. Later,
@ -871,6 +882,10 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
{
SetOperationStmt *setOperation =
(SetOperationStmt *) lfirst(setOperationStatmentCell);
Node *leftArg = setOperation->larg;
Node *rightArg = setOperation->rarg;
int leftArgRTI = 0;
int rightArgRTI = 0;
if (setOperation->op != SETOP_UNION)
{
@ -878,6 +893,36 @@ DeferErrorIfUnsupportedUnionQuery(Query *subqueryTree,
"cannot push down this subquery",
"Intersect and Except are currently unsupported", NULL);
}
if (IsA(leftArg, RangeTblRef))
{
Node *leftArgSubquery = NULL;
leftArgRTI = ((RangeTblRef *) leftArg)->rtindex;
leftArgSubquery = (Node *) rt_fetch(leftArgRTI,
subqueryTree->rtable)->subquery;
if (HasReferenceTable(leftArgSubquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot push down this subquery ",
"Reference tables are not supported with union"
" operator", NULL);
}
}
if (IsA(rightArg, RangeTblRef))
{
Node *rightArgSubquery = NULL;
rightArgRTI = ((RangeTblRef *) rightArg)->rtindex;
rightArgSubquery = (Node *) rt_fetch(rightArgRTI,
subqueryTree->rtable)->subquery;
if (HasReferenceTable(rightArgSubquery))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"cannot push down this subquery",
"Reference tables are not supported with union"
" operator", NULL);
}
}
}
return NULL;
@ -982,7 +1027,7 @@ DeferErrorIfUnsupportedTableCombination(Query *queryTree)
/*
* TargetListOnPartitionColumn checks if at least one target list entry is on
* partition column.
* partition column or the table is a reference table.
*/
static bool
TargetListOnPartitionColumn(Query *query, List *targetEntryList)
@ -997,6 +1042,23 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
Expr *targetExpression = targetEntry->expr;
bool isPartitionColumn = IsPartitionColumn(targetExpression, query);
Oid relationId = InvalidOid;
Var *column = NULL;
FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column);
/*
* If the expression belongs to reference table directly returns true.
* We can assume that target list entry always on partition column of
* reference tables.
*/
if (IsDistributedTable(relationId) && PartitionMethod(relationId) ==
DISTRIBUTE_BY_NONE)
{
targetListOnPartitionColumn = true;
break;
}
if (isPartitionColumn)
{
FieldSelect *compositeField = CompositeFieldRecursive(targetExpression,
@ -1363,6 +1425,112 @@ MultiPlanTree(Query *queryTree)
}
/*
* HasUnsupportedReferenceTableJoin returns true if there exists a outer join
* between reference table and distributed tables which does not follow
* the rules :
* - Reference tables can not be located in the outer part of the semi join or the
* anti join. Otherwise, we may have duplicate results. Although getting duplicate
* results is not possible by checking the equality on the column of the reference
* table and partition column of distributed table, we still keep these checks.
* Because, using the reference table in the outer part of the semi join or anti
* join is not very common.
* - Reference tables can not be located in the outer part of the left join
* (Note that PostgreSQL converts right joins to left joins. While converting
* join types, innerrel and outerrel are also switched.) Otherwise we will
* definitely have duplicate rows. Beside, reference tables can not be used
* with full outer joins because of the same reason.
*/
static bool
HasUnsupportedReferenceTableJoin(PlannerRestrictionContext *plannerRestrictionContext)
{
List *joinRestrictionList =
plannerRestrictionContext->joinRestrictionContext->joinRestrictionList;
ListCell *joinRestrictionCell = NULL;
foreach(joinRestrictionCell, joinRestrictionList)
{
JoinRestriction *joinRestriction = (JoinRestriction *) lfirst(
joinRestrictionCell);
JoinType joinType = joinRestriction->joinType;
PlannerInfo *plannerInfo = joinRestriction->plannerInfo;
RelOptInfo *innerrel = joinRestriction->innerrel;
RelOptInfo *outerrel = joinRestriction->outerrel;
if (joinType == JOIN_SEMI || joinType == JOIN_ANTI || joinType == JOIN_LEFT)
{
if (RelationInfoHasReferenceTable(plannerInfo, outerrel))
{
return true;
}
}
else if (joinType == JOIN_FULL)
{
if (RelationInfoHasReferenceTable(plannerInfo, innerrel) ||
RelationInfoHasReferenceTable(plannerInfo, outerrel))
{
return true;
}
}
}
return false;
}
/*
* RelationInfoHasReferenceTable check whether the relationInfo has reference table.
* Since relation ids of relationInfo indexes to the range table entry list of
* planner info, planner info is also passed.
*/
static bool
RelationInfoHasReferenceTable(PlannerInfo *plannerInfo, RelOptInfo *relationInfo)
{
Relids relids = bms_copy(relationInfo->relids);
int relationId = -1;
while ((relationId = bms_first_member(relids)) >= 0)
{
RangeTblEntry *rangeTableEntry = plannerInfo->simple_rte_array[relationId];
/* relationInfo has this range table entry */
if (HasReferenceTable((Node *) rangeTableEntry))
{
return true;
}
}
return false;
}
/*
* HasReferenceTable checks whether there exist a reference table in the
* given node.
*/
bool
HasReferenceTable(Node *node)
{
List *relationList = NIL;
ListCell *relationCell = NULL;
ExtractRangeTableRelationWalkerWithRTEExpand(node, &relationList);
foreach(relationCell, relationList)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(relationCell);
Oid relationId = rangeTableEntry->relid;
if (IsDistributedTable(relationId) && PartitionMethod(relationId) ==
DISTRIBUTE_BY_NONE)
{
return true;
}
}
return false;
}
/*
* ErrorIfQueryNotSupported checks that we can perform distributed planning for
* the given query. The checks in this function will be removed as we support
@ -2728,6 +2896,46 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
}
/*
* ExtractRangeTableRelationWalkerWithRTEExpand obtains the list of relations
* from the given node. Note that the difference between this function and
* ExtractRangeTableRelationWalker is that this one recursively
* walk into range table entries if it can.
*/
bool
ExtractRangeTableRelationWalkerWithRTEExpand(Node *node, List **rangeTableRelationList)
{
bool walkIsComplete = false;
if (node == NULL)
{
return walkIsComplete;
}
else if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
List *rangeTableList = list_make1(rangeTableEntry);
if (rangeTableEntry->rtekind == RTE_RELATION)
{
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTableEntry);
}
else
{
walkIsComplete = range_table_walker(rangeTableList,
ExtractRangeTableRelationWalkerWithRTEExpand,
rangeTableRelationList, 0);
}
}
else
{
walkIsComplete = ExtractRangeTableRelationWalker(node, rangeTableRelationList);
}
return walkIsComplete;
}
/*
* ExtractRangeTableEntryWalker walks over a query tree, and finds all range
* table entries. For recursing into the query tree, this function uses the

View File

@ -2061,7 +2061,16 @@ SubquerySqlTaskList(Job *job, PlannerRestrictionContext *plannerRestrictionConte
break;
}
Assert(targetCacheEntry != NULL);
/*
* That means all tables are reference tables and we can pick any any of them
* as an anchor table.
*/
if (targetCacheEntry == NULL)
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(rangeTableList);
relationId = rangeTableEntry->relid;
targetCacheEntry = DistributedTableCacheEntry(relationId);
}
shardCount = targetCacheEntry->shardIntervalArrayLength;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
@ -2102,6 +2111,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
{
Oid firstTableRelationId = InvalidOid;
List *relationIdList = RelationIdList(query);
List *nonReferenceRelations = NIL;
ListCell *relationIdCell = NULL;
uint32 relationIndex = 0;
uint32 rangeDistributedRelationCount = 0;
@ -2114,17 +2124,26 @@ ErrorIfUnsupportedShardDistribution(Query *query)
if (partitionMethod == DISTRIBUTE_BY_RANGE)
{
rangeDistributedRelationCount++;
nonReferenceRelations = lappend_oid(nonReferenceRelations,
relationId);
}
else if (partitionMethod == DISTRIBUTE_BY_HASH)
{
hashDistributedRelationCount++;
nonReferenceRelations = lappend_oid(nonReferenceRelations,
relationId);
}
else if (partitionMethod == DISTRIBUTE_BY_NONE)
{
/* do not need to handle reference tables */
continue;
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot push down this subquery"),
errdetail("Currently range and hash partitioned "
"relations are supported")));
errdetail("Currently append partitioned relations "
"are not supported")));
}
}
@ -2136,7 +2155,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
"partitioned relations are unsupported")));
}
foreach(relationIdCell, relationIdList)
foreach(relationIdCell, nonReferenceRelations)
{
Oid relationId = lfirst_oid(relationIdCell);
bool coPartitionedTables = false;
@ -2298,9 +2317,6 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
RestrictInfo *shardRestrictionList = NULL;
DeferredErrorMessage *planningError = NULL;
/* such queries should go through router planner */
Assert(!restrictionContext->allReferenceTables);
/*
* Add the restriction qual parameter value in all baserestrictinfos.
* Note that this has to be done on a copy, as the originals are needed
@ -2315,6 +2331,12 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
shardOpExpressions = ShardIntervalOpExpressions(shardInterval, rteIndex);
/* means it is a reference table and do not add any shard interval info */
if (shardOpExpressions == NIL)
{
continue;
}
shardRestrictionList = make_simple_restrictinfo((Expr *) shardOpExpressions);
extendedBaseRestrictInfo = lappend(extendedBaseRestrictInfo,
shardRestrictionList);
@ -2369,7 +2391,7 @@ SubqueryTaskCreate(Query *originalQuery, ShardInterval *shardInterval,
subqueryTask = CreateBasicTask(jobId, taskId, SQL_TASK, queryString->data);
subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = selectAnchorShardId;
subqueryTask->anchorShardId = shardInterval->shardId;
subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList;

View File

@ -740,6 +740,8 @@ multi_join_restriction_hook(PlannerInfo *root,
joinRestriction->joinType = jointype;
joinRestriction->joinRestrictInfoList = restrictInfoList;
joinRestriction->plannerInfo = root;
joinRestriction->innerrel = innerrel;
joinRestriction->outerrel = outerrel;
joinRestrictionContext->joinRestrictionList =
lappend(joinRestrictionContext->joinRestrictionList, joinRestriction);

View File

@ -275,7 +275,7 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query,
* The function returns hashed columns generated by MakeInt4Column() for the hash
* partitioned tables in place of partition columns.
*
* The function errors out if the given shard interval does not belong to a hash,
* The function returns NIL if shard interval does not belong to a hash,
* range and append distributed tables.
*
* NB: If you update this, also look at PrunableExpressionsWalker().
@ -300,10 +300,8 @@ ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex)
}
else
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot create shard interval operator expression for "
"distributed relations other than hash, range and append distributed "
"relations")));
/* do not add any shard range interval for reference tables */
return NIL;
}
/* build the base expression for constraint */

View File

@ -67,7 +67,6 @@ static Var * FindTranslatedVar(List *appendRelList, Oid relationOid,
static bool EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
RelationRestrictionContext *
restrictionContext);
static uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
static List * GenerateAttributeEquivalencesForRelationRestrictions(
RelationRestrictionContext *restrictionContext);
static AttributeEquivalenceClass * AttributeEquivalenceClassForEquivalenceClass(
@ -125,7 +124,8 @@ static Index RelationRestrictionPartitionKeyIndex(RelationRestriction *
/*
* SafeToPushdownUnionSubquery returns true if all the relations are returns
* partition keys in the same ordinal position.
* partition keys in the same ordinal position and there is no reference table
* exists.
*
* Note that the function expects (and asserts) the input query to be a top
* level union query defined by TopLevelUnionQuery().
@ -154,6 +154,7 @@ SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext)
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
Oid relationId = relationRestriction->relationId;
Index partitionKeyIndex = InvalidAttrNumber;
PlannerInfo *relationPlannerRoot = relationRestriction->plannerInfo;
List *targetList = relationPlannerRoot->parse->targetList;
@ -161,6 +162,20 @@ SafeToPushdownUnionSubquery(RelationRestrictionContext *restrictionContext)
Var *varToBeAdded = NULL;
TargetEntry *targetEntryToAdd = NULL;
/*
* Although it is not the best place to error out when facing with reference
* tables, we decide to error out here. Otherwise, we need to add equality
* for each reference table and it is more complex to implement. In the
* future implementation all checks will be gathered to single point.
*/
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot pushdown this query"),
errdetail(
"Reference tables are not allowed with set operations")));
}
/*
* We first check whether UNION ALLs are pulled up or not. Note that Postgres
* planner creates AppendRelInfos per each UNION ALL query that is pulled up.
@ -314,10 +329,9 @@ FindTranslatedVar(List *appendRelList, Oid relationOid, Index relationRteIndex,
* joined on their partition keys.
*
* The function returns true if all relations are joined on their partition keys.
* Otherwise, the function returns false. Since reference tables do not have partition
* keys, we skip processing them. Also, if the query includes only a single non-reference
* distributed relation, the function returns true since it doesn't make sense to check
* for partition key equality in that case.
* Otherwise, the function returns false. In order to support reference tables
* with subqueries, equality between attributes of reference tables and partition
* key of distributed tables are also considered.
*
* In order to do that, we invented a new equivalence class namely:
* AttributeEquivalenceClass. In very simple words, a AttributeEquivalenceClass is
@ -350,24 +364,15 @@ RestrictionEquivalenceForPartitionKeys(PlannerRestrictionContext *
List *relationRestrictionAttributeEquivalenceList = NIL;
List *joinRestrictionAttributeEquivalenceList = NIL;
List *allAttributeEquivalenceList = NIL;
uint32 referenceRelationCount = ReferenceRelationCount(restrictionContext);
uint32 totalRelationCount = list_length(restrictionContext->relationRestrictionList);
uint32 nonReferenceRelationCount = totalRelationCount - referenceRelationCount;
/*
* If the query includes a single relation which is not a reference table,
* we should not check the partition column equality.
* Consider two example cases:
* (i) The query includes only a single colocated relation
* (ii) A colocated relation is joined with a (or multiple) reference
* table(s) where colocated relation is not joined on the partition key
*
* For the above two cases, we don't need to execute the partition column equality
* algorithm. The reason is that the essence of this function is to ensure that the
* tasks that are going to be created should not need data from other tasks. In both
* cases mentioned above, the necessary data per task would be on available.
* If the query includes only one relation, we should not check the partition
* column equality. Single table should not need to fetch data from other nodes
* except it's own node(s).
*/
if (nonReferenceRelationCount <= 1)
if (totalRelationCount == 1)
{
return true;
}
@ -429,8 +434,7 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
(RelationRestriction *) lfirst(relationRestrictionCell);
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
if (DistPartitionKey(relationRestriction->relationId) &&
!bms_is_member(rteIdentity, commonRteIdentities))
if (!bms_is_member(rteIdentity, commonRteIdentities))
{
return false;
}
@ -440,31 +444,6 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
}
/*
* ReferenceRelationCount iterates over the relations and returns the reference table
* relation count.
*/
static uint32
ReferenceRelationCount(RelationRestrictionContext *restrictionContext)
{
ListCell *relationRestrictionCell = NULL;
uint32 referenceRelationCount = 0;
foreach(relationRestrictionCell, restrictionContext->relationRestrictionList)
{
RelationRestriction *relationRestriction =
(RelationRestriction *) lfirst(relationRestrictionCell);
if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE)
{
referenceRelationCount++;
}
}
return referenceRelationCount;
}
/*
* GenerateAttributeEquivalencesForRelationRestrictions gets a relation restriction
* context and returns a list of AttributeEquivalenceClass.
@ -642,7 +621,7 @@ GetVarFromAssignedParam(List *parentPlannerParamList, Param *plannerParam)
/*
* GenerateCommonEquivalence gets a list of unrelated AttributeEquiavalanceClass
* whose all members are partition keys.
* whose all members are partition keys or a column of reference table.
*
* With the equivalence classes, the function follows the algorithm
* outlined below:
@ -1092,9 +1071,6 @@ AddUnionSetOperationsToAttributeEquivalenceClass(AttributeEquivalenceClass **
* class using the rteIdentity provided by the rangeTableEntry. Note that
* rteIdentities are only assigned to RTE_RELATIONs and this function asserts
* the input rte to be an RTE_RELATION.
*
* Note that this function only adds partition keys to the attributeEquivalanceClass.
* This implies that there wouldn't be any columns for reference tables.
*/
static void
AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
@ -1103,19 +1079,13 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
Var *varToBeAdded)
{
AttributeEquivalenceClassMember *attributeEqMember = NULL;
Oid relationId = InvalidOid;
Var *relationPartitionKey = NULL;
Oid relationId = rangeTableEntry->relid;
Var *relationPartitionKey = DistPartitionKey(relationId);
Assert(rangeTableEntry->rtekind == RTE_RELATION);
relationId = rangeTableEntry->relid;
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
{
return;
}
relationPartitionKey = DistPartitionKey(relationId);
if (relationPartitionKey->varattno != varToBeAdded->varattno)
if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
relationPartitionKey->varattno != varToBeAdded->varattno)
{
return;
}

View File

@ -206,6 +206,8 @@ extern List * TableEntryList(List *rangeTableList);
extern List * UsedTableEntryList(Query *query);
extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList);
extern bool ExtractRangeTableRelationWalkerWithRTEExpand(Node *node,
List **rangeTableList);
extern List * pull_var_clause_default(Node *node);
extern bool OperatorImplementsEquality(Oid opno);

View File

@ -286,6 +286,7 @@ extern Const * MakeInt4Constant(Datum constantValue);
extern int CompareShardPlacements(const void *leftElement, const void *rightElement);
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval);
extern bool HasReferenceTable(Node *node);
/* function declarations for Task and Task list operations */
extern bool TasksEqual(const Task *a, const Task *b);

View File

@ -52,6 +52,8 @@ typedef struct JoinRestriction
JoinType joinType;
List *joinRestrictInfoList;
PlannerInfo *plannerInfo;
RelOptInfo *innerrel;
RelOptInfo *outerrel;
} JoinRestriction;
typedef struct PlannerRestrictionContext

View File

@ -446,15 +446,16 @@ Aggregate
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> GroupAggregate
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Hash Join
Hash Cond: (users.composite_id = events.composite_id)
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
-- Union and left join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
@ -531,33 +532,34 @@ HashAggregate
Sort Key: subquery_top.hasdone
-> Subquery Scan on subquery_top
-> GroupAggregate
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone
-> Hash Left Join
Hash Cond: ((NULL::user_composite_type) = subquery_2.composite_id)
-> Unique
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), (NULL::user_composite_type), ('action=>1'::text), events.event_time
-> Append
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Hash Cond: (users.composite_id = subquery_2.composite_id)
-> HashAggregate
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), users.composite_id, ('action=>1'::text), events.event_time
-> Append
-> Hash Join
Hash Cond: (users.composite_id = events.composite_id)
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events
Filter: ((event_type)::text = 'click'::text)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events_1.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events_1
-> Hash Join
Hash Cond: (users_1.composite_id = events_1.composite_id)
-> Seq Scan on users_1400029 users_1
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events_1
Filter: ((event_type)::text = 'submit'::text)
-> Hash
-> Subquery Scan on subquery_2
-> Unique
-> Sort
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_1400027 events_2
-> Seq Scan on events_1400025 events_2
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
-- Union, left join and having subquery pushdown
EXPLAIN (COSTS OFF)
@ -703,13 +705,12 @@ Limit
-> Limit
-> Sort
Sort Key: users.lastseen DESC
-> Subquery Scan on users
-> Result
One-Time Filter: false
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Limit
-> Sort
Sort Key: events.event_time DESC
-> Seq Scan on events_1400027 events
-> Seq Scan on events_1400025 events
Filter: (composite_id = users.composite_id)
-- Test all tasks output
SET citus.explain_all_tasks TO on;

View File

@ -446,15 +446,16 @@ Aggregate
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> GroupAggregate
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id)
-> Hash Join
Hash Cond: (users.composite_id = events.composite_id)
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events
Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[]))
-- Union and left join subquery pushdown
EXPLAIN (COSTS OFF)
SELECT
@ -531,33 +532,34 @@ HashAggregate
Sort Key: subquery_top.hasdone
-> Subquery Scan on subquery_top
-> GroupAggregate
Group Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), subquery_2.hasdone
Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), subquery_2.hasdone
-> Hash Left Join
Hash Cond: ((NULL::user_composite_type) = subquery_2.composite_id)
-> Unique
-> Sort
Sort Key: (((NULL::user_composite_type)).tenant_id), (((NULL::user_composite_type)).user_id), (NULL::user_composite_type), ('action=>1'::text), events.event_time
-> Append
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events
Hash Cond: (users.composite_id = subquery_2.composite_id)
-> HashAggregate
Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), users.composite_id, ('action=>1'::text), events.event_time
-> Append
-> Hash Join
Hash Cond: (users.composite_id = events.composite_id)
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events
Filter: ((event_type)::text = 'click'::text)
-> Nested Loop
Join Filter: ((NULL::user_composite_type) = events_1.composite_id)
-> Result
One-Time Filter: false
-> Seq Scan on events_1400027 events_1
-> Hash Join
Hash Cond: (users_1.composite_id = events_1.composite_id)
-> Seq Scan on users_1400029 users_1
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Hash
-> Seq Scan on events_1400025 events_1
Filter: ((event_type)::text = 'submit'::text)
-> Hash
-> Subquery Scan on subquery_2
-> Unique
-> Sort
Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id)
-> Seq Scan on events_1400027 events_2
-> Seq Scan on events_1400025 events_2
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text))
-- Union, left join and having subquery pushdown
EXPLAIN (COSTS OFF)
@ -703,13 +705,12 @@ Limit
-> Limit
-> Sort
Sort Key: users.lastseen DESC
-> Subquery Scan on users
-> Result
One-Time Filter: false
-> Seq Scan on users_1400029 users
Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type))
-> Limit
-> Sort
Sort Key: events.event_time DESC
-> Seq Scan on events_1400027 events
-> Seq Scan on events_1400025 events
Filter: (composite_id = users.composite_id)
-- Test all tasks output
SET citus.explain_all_tasks TO on;

View File

@ -1122,8 +1122,7 @@ WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
DEBUG: only reference tables may be queried when targeting a reference table with distributed INSERT ... SELECT
DEBUG: Collecting INSERT ... SELECT results on coordinator
-- now, insert into the hash partitioned table and use reference
-- tables in the SELECT queries
-- not pushable due to lack of equality between partition column and column of reference table
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT
@ -1133,12 +1132,8 @@ FROM
WHERE
colocated_table_test_2.value_4 = reference_table_test.value_4
RETURNING value_1, value_2;
value_1 | value_2
---------+---------
1 | 1
2 | 2
(2 rows)
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- some more complex queries (Note that there are more complex queries in multi_insert_select.sql)
INSERT INTO
colocated_table_test (value_1, value_2)
@ -1149,12 +1144,8 @@ FROM
WHERE
colocated_table_test_2.value_2 = reference_table_test.value_2
RETURNING value_1, value_2;
value_1 | value_2
---------+---------
1 | 1
2 | 2
(2 rows)
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- partition column value comes from reference table, goes via coordinator
INSERT INTO
colocated_table_test (value_1, value_2)
@ -1615,7 +1606,7 @@ INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
SELECT master_modify_multiple_shards('DELETE FROM colocated_table_test');
master_modify_multiple_shards
-------------------------------
10
6
(1 row)
ROLLBACK;

View File

@ -0,0 +1,665 @@
--
-- multi subquery complex queries aims to expand existing subquery pushdown
-- regression tests to cover more caeses
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
--
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.enable_router_execution TO FALSE;
CREATE TABLE user_buy_test_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('user_buy_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO user_buy_test_table VALUES(1,2,1);
INSERT INTO user_buy_test_table VALUES(2,3,4);
INSERT INTO user_buy_test_table VALUES(3,4,2);
INSERT INTO user_buy_test_table VALUES(7,5,2);
CREATE TABLE users_return_test_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('users_return_test_table', 'user_id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO users_return_test_table VALUES(4,1,1);
INSERT INTO users_return_test_table VALUES(1,3,1);
INSERT INTO users_return_test_table VALUES(3,2,2);
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
-- Simple Join test with reference table
SELECT count(*) FROM
(SELECT random() FROM user_buy_test_table JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
count
-------
3
(1 row)
-- Should work, reference table at the inner side is allowed
SELECT count(*) FROM
(SELECT random(), k_no FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1 WHERE k_no = 47;
count
-------
1
(1 row)
-- Should not work, no equality between partition column and reference table
SELECT * FROM
(SELECT random() FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.item_id = users_ref_test_table.id) subquery_1;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- Should not work, no equality between partition column and reference table
SELECT * FROM
(SELECT random() FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.user_id > users_ref_test_table.id) subquery_1;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- Shouldn't work, reference table at the outer side is not allowed
SELECT * FROM
(SELECT random() FROM users_ref_test_table LEFT JOIN user_buy_test_table
ON users_ref_test_table.id = user_buy_test_table.user_id) subquery_1;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- Should work, reference table at the inner side is allowed
SELECT count(*) FROM
(SELECT random() FROM users_ref_test_table RIGHT JOIN user_buy_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
count
-------
4
(1 row)
-- Shouldn't work, reference table at the outer side is not allowed
SELECT * FROM
(SELECT random() FROM user_buy_test_table RIGHT JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- Should pass since reference table locates in the inner part of each left join
SELECT count(*) FROM
(SELECT tt1.user_id, random() FROM user_buy_test_table AS tt1 JOIN users_return_test_table as tt2
ON tt1.user_id = tt2.user_id) subquery_1
LEFT JOIN
(SELECT tt1.user_id, random() FROM user_buy_test_table as tt1 LEFT JOIN users_ref_test_table as ref
ON tt1.user_id = ref.id) subquery_2 ON subquery_1.user_id = subquery_2.user_id;
count
-------
2
(1 row)
-- Should not pass since reference table locates in the outer part of right join
SELECT * FROM
(SELECT tt1.user_id, random() FROM user_buy_test_table AS tt1 JOIN users_return_test_table as tt2
ON tt1.user_id = tt2.user_id) subquery_1
RIGHT JOIN
(SELECT tt1.user_id, random() FROM user_buy_test_table as tt1 JOIN users_ref_test_table as ref
ON tt1.user_id = ref.id) subquery_2 ON subquery_1.user_id = subquery_2.user_id;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- LATERAL JOINs used with INNER JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT user_id, lastseen
FROM
(SELECT
"some_users_data".user_id, lastseen
FROM
(SELECT
filter_users_1.user_id, time AS lastseen
FROM
(SELECT
user_where_1_1.user_id
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1
ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id))
filter_users_1
JOIN LATERAL
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 12 and user_id < 16 AND
user_id = filter_users_1.user_id
ORDER BY
time DESC
LIMIT 1) "last_events_1"
ON TRUE
ORDER BY
time DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
"users"."value_2" > 70
LIMIT 1) "some_users_data"
ON TRUE
ORDER BY
lastseen DESC
LIMIT 10) "some_users"
ORDER BY
user_id DESC
LIMIT 10;
user_id | lastseen
---------+---------------------------------
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
14 | Tue Jan 21 05:46:51.286381 2014
(10 rows)
SET citus.subquery_pushdown to OFF;
-- NESTED INNER JOINs with reference tables
SELECT
count(*) AS value, "generated_group_field"
FROM
(SELECT
DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field"
FROM
(SELECT
"eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field"
FROM
(SELECT
*
FROM
(SELECT
"events"."time", "events"."user_id", "events"."value_2"
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries"
INNER JOIN
(SELECT
user_where_1_1.real_user_id
FROM
(SELECT
"users"."user_id" as real_user_id
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1
ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1"
ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
generated_group_field DESC, value DESC;
value | generated_group_field
-------+-----------------------
1 | 966
1 | 917
1 | 905
1 | 868
1 | 836
1 | 791
1 | 671
1 | 642
1 | 358
1 | 317
1 | 307
1 | 302
1 | 214
1 | 166
1 | 116
1 | 1
(16 rows)
-- single level inner joins with reference tables
SELECT
"value_3", count(*) AS cnt
FROM
(SELECT
"value_3", "user_id", random()
FROM
(SELECT
users_in_segment_1.user_id, value_3
FROM
(SELECT
user_id, value_3 * 2 as value_3
FROM
(SELECT
user_id, value_3
FROM
(SELECT
"users"."user_id", value_3
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 30
) simple_user_where_1
) all_buckets_1
) users_in_segment_1
JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 60
) some_users_data
ON ("users_in_segment_1".user_id = "some_users_data".user_id)
) segmentalias_1) "tempQuery"
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
value_3 | cnt
---------+-----
556 | 75
228 | 75
146 | 75
70 | 75
1442 | 79
1232 | 79
1090 | 79
1012 | 79
886 | 79
674 | 79
(10 rows)
-- nested LATERAL JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT *
FROM
(SELECT "some_users_data".user_id, "some_recent_users".value_3
FROM
(SELECT
filter_users_1.user_id, value_3
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1
JOIN LATERAL
(SELECT
user_id, value_3
FROM
events_reference_table as "events"
WHERE
user_id > 20 and user_id < 70 AND
("events".user_id = "filter_users_1".user_id)
ORDER BY
value_3 DESC
LIMIT 1) "last_events_1" ON true
ORDER BY value_3 DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
users.value_2 > 200
LIMIT 1) "some_users_data" ON true
ORDER BY
value_3 DESC
LIMIT 10) "some_users"
ORDER BY
value_3 DESC
LIMIT 10;
user_id | value_3
---------+---------
44 | 998
65 | 996
66 | 996
37 | 995
57 | 989
21 | 985
(6 rows)
SET citus.subquery_pushdown to OFF;
-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the
-- left side of the LEFT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the
-- right side of the RIGHT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
RIGHT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_reference_table as "users") "right_group_by_1"
ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- Outer subquery with reference table
SELECT "some_users_data".user_id, lastseen
FROM
(SELECT user_id, max(time) AS lastseen
FROM
(SELECT user_id, time
FROM
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40) "events_1"
ORDER BY
time DESC) "recent_events_1"
GROUP BY
user_id
ORDER BY
max(TIME) DESC) "some_recent_users"
FULL JOIN
(SELECT
"users".user_id
FROM
users_table as "users"
WHERE
users.value_2 > 50 and users.value_2 < 55) "some_users_data"
ON "some_users_data"."user_id" = "some_recent_users"."user_id"
ORDER BY
user_id
limit 50;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
--
-- UNIONs and JOINs with reference tables, shoukld error out
--
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
events_reference_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
ERROR: cannot push down this subquery
DETAIL: Reference tables are not supported with union operator
-- reference table exist in the subquery of union, should error out
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT
*, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."time", 0 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(
SELECT * FROM
(
SELECT
max("events"."time"),
0 AS event,
"events"."user_id"
FROM
events_reference_table as "events", users_table as "users"
WHERE
events.user_id = users.user_id AND
event_type IN (10, 11, 12, 13, 14, 15)
GROUP BY "events"."user_id"
) as events_subquery_5
) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 2 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 3 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)
) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
ERROR: cannot push down this subquery
DETAIL: Reference tables are not supported with union operator
--
-- Should error out with UNION ALL Queries on reference tables
--
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
events_reference_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT "users"."user_id"
FROM users_table as "users"
WHERE value_1 > 50 and value_1 < 70) AS t ON (t.user_id = q.user_id)) as final_query
GROUP BY types
ORDER BY types;
ERROR: cannot push down this subquery
DETAIL: Reference tables are not supported with union operator
DROP TABLE user_buy_test_table;
DROP TABLE users_ref_test_table;
DROP TABLE users_return_test_table;

View File

@ -0,0 +1,226 @@
--
-- queries to test the subquery pushdown on reference tables
-- subqueries in WHERE with greater operator
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id AND event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
user_id
---------
49
55
56
63
(4 rows)
-- subqueries in WHERE with IN operator
SELECT
user_id
FROM
users_table
WHERE
value_2 IN
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
user_id
---------
0
1
2
(3 rows)
-- subqueries in WHERE with NOT EXISTS operator, should work since
-- reference table in the inner part of the join
SELECT
user_id
FROM
users_table
WHERE
NOT EXISTS
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
user_id
---------
(0 rows)
-- subqueries in WHERE with NOT EXISTS operator, should not work
-- there is a reference table in the outer part of the join
SELECT
user_id
FROM
users_reference_table
WHERE
NOT EXISTS
(SELECT
value_2
FROM
events_table
WHERE
users_reference_table.user_id = events_table.user_id
)
LIMIT 3;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join
-- subqueries in WHERE with IN operator without equality
SELECT
user_id
FROM
users_table
WHERE
value_2 IN
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id > events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- have reference table without any equality, should error out
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
ERROR: cannot pushdown the subquery since all relations are not joined using distribution keys
DETAIL: Each relation should be joined with at least one another relation using distribution keys and equality operator.
-- users that appeared more than 118 times, should run since the reference table
-- on the right side of the semi join
SELECT
user_id
FROM
users_table
WHERE 118 <=
(SELECT
count(*)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
GROUP BY
user_id)
GROUP BY
user_id
ORDER BY
user_id;
user_id
---------
13
17
23
25
(4 rows)
-- should error out since reference table exist on the left side
-- of the left lateral join
SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110
AND value_2 >= 5
AND user_id IN
(
SELECT
e1.user_id
FROM (
-- Get the first time each user viewed the homepage.
SELECT
user_id,
1 AS view_homepage,
min(time) AS view_homepage_time
FROM events_reference_table
WHERE
event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90)
GROUP BY user_id
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo,
time AS use_demo_time
FROM events_reference_table
WHERE
user_id = e1.user_id AND
event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91)
ORDER BY time
) e2 ON true LEFT JOIN LATERAL (
SELECT
user_id,
1 AS enter_credit_card,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e2.user_id AND
event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92)
ORDER BY time
) e3 ON true LEFT JOIN LATERAL (
SELECT
1 AS submit_card_info,
user_id,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e3.user_id AND
event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93)
ORDER BY time
) e4 ON true LEFT JOIN LATERAL (
SELECT
1 AS see_bought_screen
FROM events_reference_table
WHERE
user_id = e4.user_id AND
event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94)
ORDER BY time
) e5 ON true
group by e1.user_id
HAVING sum(submit_card_info) > 0
)
ORDER BY 1, 2;
ERROR: cannot pushdown the subquery
DETAIL: There exist a reference table in the outer part of the outer join

View File

@ -23,6 +23,17 @@ LIMIT 5;
20 | 9
(5 rows)
-- a very simple union query with reference table
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_reference_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
ERROR: cannot pushdown this query
DETAIL: Reference tables are not allowed with set operations
-- the same query with union all
SELECT user_id, counter
FROM (
@ -41,6 +52,17 @@ LIMIT 5;
15 | 9
(5 rows)
-- the same query with union all and reference table
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_reference_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
ERROR: cannot pushdown this query
DETAIL: Reference tables are not allowed with set operations
-- the same query with group by
SELECT user_id, sum(counter)
FROM (
@ -162,6 +184,22 @@ GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
23508
(5 rows)
-- similar query this time more subqueries with reference table and target list contains a resjunk entry
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_reference_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
ERROR: cannot pushdown this query
DETAIL: Reference tables are not allowed with set operations
-- similar query as above, with UNION ALL
SELECT sum(counter)
FROM (
@ -236,6 +274,50 @@ LIMIT 5;
90 | 115843
(5 rows)
-- unions within unions with reference table
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) AS counter
FROM
users_table
GROUP BY
user_id
UNION
SELECT
user_id, sum(value_2) AS counter
FROM
events_reference_table
GROUP BY
user_id) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) AS counter
FROM
users_table
GROUP BY
user_id
UNION
SELECT
user_id, sum(value_2) AS counter
FROM
events_table
GROUP BY
user_id) user_id_2
GROUP BY
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
ERROR: cannot pushdown this query
DETAIL: Reference tables are not allowed with set operations
-- top level unions are wrapped into top level aggregations
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
@ -493,6 +575,17 @@ FROM
20002
(1 row)
-- some UNION ALL queries that are going to be pulled up with reference table
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_reference_table)
) b;
ERROR: cannot pushdown this query
DETAIL: Reference tables are not allowed with set operations
-- similar query without top level agg
SELECT
user_id
@ -899,3 +992,5 @@ ORDER BY types;
ERROR: cannot push down this subquery
DETAIL: Subqueries without relations are unsupported
SET citus.enable_router_execution TO true;
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;

View File

@ -353,3 +353,11 @@ SET citus.shard_max_size TO "1MB";
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
CREATE TABLE events_reference_table (like events_table including all);
SELECT create_reference_table('events_reference_table');
INSERT INTO events_reference_table SELECT * FROM events_table;
CREATE TABLE users_reference_table (like users_table including all);
SELECT create_reference_table('users_reference_table');
INSERT INTO users_reference_table SELECT * FROM users_table;

View File

@ -51,6 +51,8 @@ test: multi_deparse_shard_query multi_distributed_transaction_id
test: multi_basic_queries multi_complex_expressions
test: multi_explain
test: multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
test: multi_subquery_complex_reference_clause
test: multi_subquery_in_where_reference_clause
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
test: multi_reference_table
test: multi_outer_join_reference

View File

@ -421,3 +421,19 @@ SET citus.shard_max_size TO "1MB";
\copy lineitem_subquery FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|'
\copy orders_subquery FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|'
CREATE TABLE events_reference_table (like events_table including all);
SELECT create_reference_table('events_reference_table');
create_reference_table
------------------------
(1 row)
INSERT INTO events_reference_table SELECT * FROM events_table;
CREATE TABLE users_reference_table (like users_table including all);
SELECT create_reference_table('users_reference_table');
create_reference_table
------------------------
(1 row)
INSERT INTO users_reference_table SELECT * FROM users_table;

View File

@ -707,8 +707,7 @@ FROM
WHERE
colocated_table_test.value_1 = reference_table_test.value_1;
-- now, insert into the hash partitioned table and use reference
-- tables in the SELECT queries
-- not pushable due to lack of equality between partition column and column of reference table
INSERT INTO
colocated_table_test (value_1, value_2)
SELECT

View File

@ -0,0 +1,574 @@
--
-- multi subquery complex queries aims to expand existing subquery pushdown
-- regression tests to cover more caeses
-- the tables that are used depends to multi_insert_select_behavioral_analytics_create_table.sql
--
-- We don't need shard id sequence here, so commented out to prevent conflicts with concurrent tests
-- ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1400000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1400000;
SET citus.enable_router_execution TO FALSE;
CREATE TABLE user_buy_test_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('user_buy_test_table', 'user_id');
INSERT INTO user_buy_test_table VALUES(1,2,1);
INSERT INTO user_buy_test_table VALUES(2,3,4);
INSERT INTO user_buy_test_table VALUES(3,4,2);
INSERT INTO user_buy_test_table VALUES(7,5,2);
CREATE TABLE users_return_test_table(user_id int, item_id int, buy_count int);
SELECT create_distributed_table('users_return_test_table', 'user_id');
INSERT INTO users_return_test_table VALUES(4,1,1);
INSERT INTO users_return_test_table VALUES(1,3,1);
INSERT INTO users_return_test_table VALUES(3,2,2);
CREATE TABLE users_ref_test_table(id int, it_name varchar(25), k_no int);
SELECT create_reference_table('users_ref_test_table');
INSERT INTO users_ref_test_table VALUES(1,'User_1',45);
INSERT INTO users_ref_test_table VALUES(2,'User_2',46);
INSERT INTO users_ref_test_table VALUES(3,'User_3',47);
INSERT INTO users_ref_test_table VALUES(4,'User_4',48);
INSERT INTO users_ref_test_table VALUES(5,'User_5',49);
INSERT INTO users_ref_test_table VALUES(6,'User_6',50);
-- Simple Join test with reference table
SELECT count(*) FROM
(SELECT random() FROM user_buy_test_table JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
-- Should work, reference table at the inner side is allowed
SELECT count(*) FROM
(SELECT random(), k_no FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1 WHERE k_no = 47;
-- Should not work, no equality between partition column and reference table
SELECT * FROM
(SELECT random() FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.item_id = users_ref_test_table.id) subquery_1;
-- Should not work, no equality between partition column and reference table
SELECT * FROM
(SELECT random() FROM user_buy_test_table LEFT JOIN users_ref_test_table
ON user_buy_test_table.user_id > users_ref_test_table.id) subquery_1;
-- Shouldn't work, reference table at the outer side is not allowed
SELECT * FROM
(SELECT random() FROM users_ref_test_table LEFT JOIN user_buy_test_table
ON users_ref_test_table.id = user_buy_test_table.user_id) subquery_1;
-- Should work, reference table at the inner side is allowed
SELECT count(*) FROM
(SELECT random() FROM users_ref_test_table RIGHT JOIN user_buy_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
-- Shouldn't work, reference table at the outer side is not allowed
SELECT * FROM
(SELECT random() FROM user_buy_test_table RIGHT JOIN users_ref_test_table
ON user_buy_test_table.user_id = users_ref_test_table.id) subquery_1;
-- Should pass since reference table locates in the inner part of each left join
SELECT count(*) FROM
(SELECT tt1.user_id, random() FROM user_buy_test_table AS tt1 JOIN users_return_test_table as tt2
ON tt1.user_id = tt2.user_id) subquery_1
LEFT JOIN
(SELECT tt1.user_id, random() FROM user_buy_test_table as tt1 LEFT JOIN users_ref_test_table as ref
ON tt1.user_id = ref.id) subquery_2 ON subquery_1.user_id = subquery_2.user_id;
-- Should not pass since reference table locates in the outer part of right join
SELECT * FROM
(SELECT tt1.user_id, random() FROM user_buy_test_table AS tt1 JOIN users_return_test_table as tt2
ON tt1.user_id = tt2.user_id) subquery_1
RIGHT JOIN
(SELECT tt1.user_id, random() FROM user_buy_test_table as tt1 JOIN users_ref_test_table as ref
ON tt1.user_id = ref.id) subquery_2 ON subquery_1.user_id = subquery_2.user_id;
-- LATERAL JOINs used with INNER JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT user_id, lastseen
FROM
(SELECT
"some_users_data".user_id, lastseen
FROM
(SELECT
filter_users_1.user_id, time AS lastseen
FROM
(SELECT
user_where_1_1.user_id
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_1 > 20) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 12 and user_id < 16 and value_2 > 60) user_where_1_join_1
ON ("user_where_1_1".user_id = "user_where_1_join_1".user_id))
filter_users_1
JOIN LATERAL
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 12 and user_id < 16 AND
user_id = filter_users_1.user_id
ORDER BY
time DESC
LIMIT 1) "last_events_1"
ON TRUE
ORDER BY
time DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
"users"."value_2" > 70
LIMIT 1) "some_users_data"
ON TRUE
ORDER BY
lastseen DESC
LIMIT 10) "some_users"
ORDER BY
user_id DESC
LIMIT 10;
SET citus.subquery_pushdown to OFF;
-- NESTED INNER JOINs with reference tables
SELECT
count(*) AS value, "generated_group_field"
FROM
(SELECT
DISTINCT "pushedDownQuery"."real_user_id", "generated_group_field"
FROM
(SELECT
"eventQuery"."real_user_id", "eventQuery"."time", random(), ("eventQuery"."value_2") AS "generated_group_field"
FROM
(SELECT
*
FROM
(SELECT
"events"."time", "events"."user_id", "events"."value_2"
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40 AND event_type IN (40, 41, 42, 43, 44, 45) ) "temp_data_queries"
INNER JOIN
(SELECT
user_where_1_1.real_user_id
FROM
(SELECT
"users"."user_id" as real_user_id
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 50 ) user_where_1_1
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_3 > 50 ) user_where_1_join_1
ON ("user_where_1_1".real_user_id = "user_where_1_join_1".user_id)) "user_filters_1"
ON ("temp_data_queries".user_id = "user_filters_1".real_user_id)) "eventQuery") "pushedDownQuery") "pushedDownQuery"
GROUP BY
"generated_group_field"
ORDER BY
generated_group_field DESC, value DESC;
-- single level inner joins with reference tables
SELECT
"value_3", count(*) AS cnt
FROM
(SELECT
"value_3", "user_id", random()
FROM
(SELECT
users_in_segment_1.user_id, value_3
FROM
(SELECT
user_id, value_3 * 2 as value_3
FROM
(SELECT
user_id, value_3
FROM
(SELECT
"users"."user_id", value_3
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 30
) simple_user_where_1
) all_buckets_1
) users_in_segment_1
JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 10 and user_id < 40 and value_2 > 60
) some_users_data
ON ("users_in_segment_1".user_id = "some_users_data".user_id)
) segmentalias_1) "tempQuery"
GROUP BY "value_3"
ORDER BY cnt, value_3 DESC LIMIT 10;
-- nested LATERAL JOINs with reference tables
SET citus.subquery_pushdown to ON;
SELECT *
FROM
(SELECT "some_users_data".user_id, "some_recent_users".value_3
FROM
(SELECT
filter_users_1.user_id, value_3
FROM
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 20 and user_id < 70 and users.value_2 = 200) filter_users_1
JOIN LATERAL
(SELECT
user_id, value_3
FROM
events_reference_table as "events"
WHERE
user_id > 20 and user_id < 70 AND
("events".user_id = "filter_users_1".user_id)
ORDER BY
value_3 DESC
LIMIT 1) "last_events_1" ON true
ORDER BY value_3 DESC
LIMIT 10) "some_recent_users"
JOIN LATERAL
(SELECT
"users".user_id
FROM
users_reference_table as "users"
WHERE
"users"."user_id" = "some_recent_users"."user_id" AND
users.value_2 > 200
LIMIT 1) "some_users_data" ON true
ORDER BY
value_3 DESC
LIMIT 10) "some_users"
ORDER BY
value_3 DESC
LIMIT 10;
SET citus.subquery_pushdown to OFF;
-- LEFT JOINs used with INNER JOINs should error out since reference table exist in the
-- left side of the LEFT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_reference_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
LEFT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_table as "users") "left_group_by_1"
ON ("left_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
-- RIGHT JOINs used with INNER JOINs should error out since reference table exist in the
-- right side of the RIGHT JOIN.
SELECT
count(*) AS cnt, "generated_group_field"
FROM
(SELECT
"eventQuery"."user_id", random(), generated_group_field
FROM
(SELECT
"multi_group_wrapper_1".*, generated_group_field, random()
FROM
(SELECT *
FROM
(SELECT
"events"."time", "events"."user_id" as event_user_id
FROM
events_table as "events"
WHERE
user_id > 80) "temp_data_queries"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
user_id > 80 and value_2 = 5) "user_filters_1"
ON ("temp_data_queries".event_user_id = "user_filters_1".user_id)) AS "multi_group_wrapper_1"
RIGHT JOIN
(SELECT
"users"."user_id" AS "user_id", value_2 AS "generated_group_field"
FROM
users_reference_table as "users") "right_group_by_1"
ON ("right_group_by_1".user_id = "multi_group_wrapper_1".event_user_id)) "eventQuery") "pushedDownQuery"
group BY
"generated_group_field"
ORDER BY
cnt DESC, generated_group_field ASC
LIMIT 10;
-- Outer subquery with reference table
SELECT "some_users_data".user_id, lastseen
FROM
(SELECT user_id, max(time) AS lastseen
FROM
(SELECT user_id, time
FROM
(SELECT
user_id, time
FROM
events_reference_table as "events"
WHERE
user_id > 10 and user_id < 40) "events_1"
ORDER BY
time DESC) "recent_events_1"
GROUP BY
user_id
ORDER BY
max(TIME) DESC) "some_recent_users"
FULL JOIN
(SELECT
"users".user_id
FROM
users_table as "users"
WHERE
users.value_2 > 50 and users.value_2 < 55) "some_users_data"
ON "some_users_data"."user_id" = "some_recent_users"."user_id"
ORDER BY
user_id
limit 50;
--
-- UNIONs and JOINs with reference tables, shoukld error out
--
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
events_reference_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT
*
FROM
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
-- reference table exist in the subquery of union, should error out
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT
*, random()
FROM
(SELECT
"t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT
"t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT
*
FROM
(SELECT
"events"."time", 0 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION
(SELECT *
FROM
(
SELECT * FROM
(
SELECT
max("events"."time"),
0 AS event,
"events"."user_id"
FROM
events_reference_table as "events", users_table as "users"
WHERE
events.user_id = users.user_id AND
event_type IN (10, 11, 12, 13, 14, 15)
GROUP BY "events"."user_id"
) as events_subquery_5
) events_subquery_2)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 2 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION
(SELECT *
FROM
(SELECT
"events"."time", 3 AS event, "events"."user_id"
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)
) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT
"users"."user_id"
FROM
users_table as "users"
WHERE
value_1 > 50 and value_1 < 70) AS t
ON (t.user_id = q.user_id)) as final_query
GROUP BY
types
ORDER BY
types;
--
-- Should error out with UNION ALL Queries on reference tables
--
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
( SELECT *, random()
FROM
( SELECT "t"."user_id", "t"."time", unnest("t"."collected_events") AS "event_types"
FROM
( SELECT "t1"."user_id", min("t1"."time") AS "time", array_agg(("t1"."event") ORDER BY TIME ASC, event DESC) AS collected_events
FROM (
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 0 AS event
FROM
events_table as "events"
WHERE
event_type IN (10, 11, 12, 13, 14, 15) ) events_subquery_1)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 1 AS event
FROM
events_table as "events"
WHERE
event_type IN (15, 16, 17, 18, 19) ) events_subquery_2)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 2 AS event
FROM
events_reference_table as "events"
WHERE
event_type IN (20, 21, 22, 23, 24, 25) ) events_subquery_3)
UNION ALL
(SELECT *
FROM
(SELECT
"events"."user_id", "events"."time", 3 AS event
FROM
events_table as "events"
WHERE
event_type IN (26, 27, 28, 29, 30, 13)) events_subquery_4)) t1
GROUP BY "t1"."user_id") AS t) "q"
INNER JOIN
(SELECT "users"."user_id"
FROM users_table as "users"
WHERE value_1 > 50 and value_1 < 70) AS t ON (t.user_id = q.user_id)) as final_query
GROUP BY types
ORDER BY types;
DROP TABLE user_buy_test_table;
DROP TABLE users_ref_test_table;
DROP TABLE users_return_test_table;

View File

@ -0,0 +1,199 @@
--
-- queries to test the subquery pushdown on reference tables
-- subqueries in WHERE with greater operator
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id AND event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- subqueries in WHERE with IN operator
SELECT
user_id
FROM
users_table
WHERE
value_2 IN
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
-- subqueries in WHERE with NOT EXISTS operator, should work since
-- reference table in the inner part of the join
SELECT
user_id
FROM
users_table
WHERE
NOT EXISTS
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
-- subqueries in WHERE with NOT EXISTS operator, should not work
-- there is a reference table in the outer part of the join
SELECT
user_id
FROM
users_reference_table
WHERE
NOT EXISTS
(SELECT
value_2
FROM
events_table
WHERE
users_reference_table.user_id = events_table.user_id
)
LIMIT 3;
-- subqueries in WHERE with IN operator without equality
SELECT
user_id
FROM
users_table
WHERE
value_2 IN
(SELECT
value_2
FROM
events_reference_table
WHERE
users_table.user_id > events_reference_table.user_id
)
GROUP BY user_id
ORDER BY user_id
LIMIT 3;
-- have reference table without any equality, should error out
SELECT
user_id
FROM
users_table
WHERE
value_2 >
(SELECT
max(value_2)
FROM
events_reference_table
WHERE
event_type = 50
GROUP BY
user_id
)
GROUP BY user_id
HAVING count(*) > 66
ORDER BY user_id
LIMIT 5;
-- users that appeared more than 118 times, should run since the reference table
-- on the right side of the semi join
SELECT
user_id
FROM
users_table
WHERE 118 <=
(SELECT
count(*)
FROM
events_reference_table
WHERE
users_table.user_id = events_reference_table.user_id
GROUP BY
user_id)
GROUP BY
user_id
ORDER BY
user_id;
-- should error out since reference table exist on the left side
-- of the left lateral join
SELECT user_id, value_2 FROM users_table WHERE
value_1 > 101 AND value_1 < 110
AND value_2 >= 5
AND user_id IN
(
SELECT
e1.user_id
FROM (
-- Get the first time each user viewed the homepage.
SELECT
user_id,
1 AS view_homepage,
min(time) AS view_homepage_time
FROM events_reference_table
WHERE
event_type IN (10, 20, 30, 40, 50, 60, 70, 80, 90)
GROUP BY user_id
) e1 LEFT JOIN LATERAL (
SELECT
user_id,
1 AS use_demo,
time AS use_demo_time
FROM events_reference_table
WHERE
user_id = e1.user_id AND
event_type IN (11, 21, 31, 41, 51, 61, 71, 81, 91)
ORDER BY time
) e2 ON true LEFT JOIN LATERAL (
SELECT
user_id,
1 AS enter_credit_card,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e2.user_id AND
event_type IN (12, 22, 32, 42, 52, 62, 72, 82, 92)
ORDER BY time
) e3 ON true LEFT JOIN LATERAL (
SELECT
1 AS submit_card_info,
user_id,
time AS enter_credit_card_time
FROM events_reference_table
WHERE
user_id = e3.user_id AND
event_type IN (13, 23, 33, 43, 53, 63, 73, 83, 93)
ORDER BY time
) e4 ON true LEFT JOIN LATERAL (
SELECT
1 AS see_bought_screen
FROM events_reference_table
WHERE
user_id = e4.user_id AND
event_type IN (14, 24, 34, 44, 54, 64, 74, 84, 94)
ORDER BY time
) e5 ON true
group by e1.user_id
HAVING sum(submit_card_info) > 0
)
ORDER BY 1, 2;

View File

@ -17,6 +17,16 @@ FROM (
ORDER BY 2 DESC,1
LIMIT 5;
-- a very simple union query with reference table
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION
SELECT user_id, value_2 % 10 AS counter FROM events_reference_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with union all
SELECT user_id, counter
FROM (
@ -27,6 +37,16 @@ FROM (
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with union all and reference table
SELECT user_id, counter
FROM (
SELECT user_id, value_2 % 10 AS counter FROM events_table WHERE event_type IN (1, 2, 3, 4, 5)
UNION ALL
SELECT user_id, value_2 % 10 AS counter FROM events_reference_table WHERE event_type IN (5, 6, 7, 8, 9, 10)
) user_id
ORDER BY 2 DESC,1
LIMIT 5;
-- the same query with group by
SELECT user_id, sum(counter)
FROM (
@ -102,6 +122,21 @@ FROM (
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
-- similar query this time more subqueries with reference table and target list contains a resjunk entry
SELECT sum(counter)
FROM (
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 20 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 40 and value_1 < 60 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_reference_table where value_1 < 60 and value_1 < 80 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 80 and value_1 < 100 GROUP BY user_id HAVING sum(value_2) > 500
UNION
SELECT user_id, sum(value_2) AS counter FROM users_table where value_1 < 100 and value_1 < 120 GROUP BY user_id HAVING sum(value_2) > 500
) user_id
GROUP BY user_id ORDER BY 1 DESC LIMIT 5;
-- similar query as above, with UNION ALL
SELECT sum(counter)
FROM (
@ -160,6 +195,49 @@ FROM (
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- unions within unions with reference table
SELECT *
FROM (
( SELECT user_id,
sum(counter)
FROM
(SELECT
user_id, sum(value_2) AS counter
FROM
users_table
GROUP BY
user_id
UNION
SELECT
user_id, sum(value_2) AS counter
FROM
events_reference_table
GROUP BY
user_id) user_id_1
GROUP BY
user_id)
UNION
(SELECT
user_id, sum(counter)
FROM
(SELECT
user_id, sum(value_2) AS counter
FROM
users_table
GROUP BY
user_id
UNION
SELECT
user_id, sum(value_2) AS counter
FROM
events_table
GROUP BY
user_id) user_id_2
GROUP BY
user_id)) AS ftop
ORDER BY 2 DESC, 1 DESC
LIMIT 5;
-- top level unions are wrapped into top level aggregations
SELECT ("final_query"."event_types") as types, count(*) AS sumOfEventType
FROM
@ -377,6 +455,16 @@ FROM
(SELECT user_id FROM events_table)
) b;
-- some UNION ALL queries that are going to be pulled up with reference table
SELECT
count(*)
FROM
(
(SELECT user_id FROM users_table)
UNION ALL
(SELECT user_id FROM events_reference_table)
) b;
-- similar query without top level agg
SELECT
user_id
@ -724,3 +812,6 @@ GROUP BY types
ORDER BY types;
SET citus.enable_router_execution TO true;
DROP TABLE events_reference_table;
DROP TABLE users_reference_table;