Unify extendedOpNode Processing

Before this commit, we had a divergence among
the creation of master/worker extended op nodes.

This commit moves the related parts into a single place
and allows the creation of master/extended op nodes to
share a common data structure.
pull/2070/head
Onder Kalaci 2018-03-27 15:47:26 +03:00
parent 966f01fad3
commit ee748d9140
5 changed files with 333 additions and 237 deletions

View File

@ -40,6 +40,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/metadata/metadata_sync.o \ src/backend/distributed/metadata/metadata_sync.o \
src/backend/distributed/planner/deparse_shard_query.o \ src/backend/distributed/planner/deparse_shard_query.o \
src/backend/distributed/planner/distributed_planner.o \ src/backend/distributed/planner/distributed_planner.o \
src/backend/distributed/planner/extended_op_node_utils.o \
src/backend/distributed/planner/insert_select_planner.o \ src/backend/distributed/planner/insert_select_planner.o \
src/backend/distributed/planner/multi_explain.o \ src/backend/distributed/planner/multi_explain.o \
src/backend/distributed/planner/multi_join_order.o \ src/backend/distributed/planner/multi_join_order.o \

View File

@ -0,0 +1,262 @@
/*-------------------------------------------------------------------------
*
* extended_op_node_utils.c implements the logic for building the necessary
* information that is shared among both the worker and master extended
* op nodes.
*
* Copyright (c) 2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/extended_op_node_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/pg_dist_partition.h"
#include "optimizer/var.h"
#include "nodes/pg_list.h"
static bool GroupedByDisjointPartitionColumn(List *tableNodeList,
MultiExtendedOp *opNode);
static bool ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode);
static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList);
static bool PartitionColumnInTableList(Var *column, List *tableNodeList);
static bool ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg);
/*
* BuildExtendedOpNodeProperties is a helper function that simply builds
* the necessary information for processing the extended op node. The return
* value should be used in a read-only manner.
*/
ExtendedOpNodeProperties
BuildExtendedOpNodeProperties(MultiExtendedOp *extendedOpNode)
{
ExtendedOpNodeProperties extendedOpNodeProperties;
List *tableNodeList = NIL;
List *targetList = NIL;
Node *havingQual = NULL;
bool groupedByDisjointPartitionColumn = false;
bool repartitionSubquery = false;
bool hasNonPartitionColumnDistinctAgg = false;
bool pullDistinctColumns = false;
bool pushDownWindowFunctions = false;
tableNodeList = FindNodesOfType((MultiNode *) extendedOpNode, T_MultiTable);
groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList,
extendedOpNode);
repartitionSubquery = ExtendedOpNodeContainsRepartitionSubquery(extendedOpNode);
targetList = extendedOpNode->targetList;
havingQual = extendedOpNode->havingQual;
hasNonPartitionColumnDistinctAgg =
HasNonPartitionColumnDistinctAgg(targetList, havingQual, tableNodeList);
pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
/*
* TODO: Only window functions that can be pushed down reach here, thus,
* using hasWindowFuncs is safe for now. However, this should be fixed
* when we support pull-to-master window functions.
*/
pushDownWindowFunctions = extendedOpNode->hasWindowFuncs;
extendedOpNodeProperties.groupedByDisjointPartitionColumn =
groupedByDisjointPartitionColumn;
extendedOpNodeProperties.repartitionSubquery = repartitionSubquery;
extendedOpNodeProperties.hasNonPartitionColumnDistinctAgg =
hasNonPartitionColumnDistinctAgg;
extendedOpNodeProperties.pullDistinctColumns = pullDistinctColumns;
extendedOpNodeProperties.pushDownWindowFunctions = pushDownWindowFunctions;
return extendedOpNodeProperties;
}
/*
* GroupedByDisjointPartitionColumn returns true if the query is grouped by the
* partition column of a table whose shards have disjoint sets of partition values.
*/
static bool
GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode)
{
bool result = false;
ListCell *tableNodeCell = NULL;
foreach(tableNodeCell, tableNodeList)
{
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
Oid relationId = tableNode->relationId;
char partitionMethod = 0;
if (relationId == SUBQUERY_RELATION_ID || !IsDistributedTable(relationId))
{
continue;
}
partitionMethod = PartitionMethod(relationId);
if (partitionMethod != DISTRIBUTE_BY_RANGE &&
partitionMethod != DISTRIBUTE_BY_HASH)
{
continue;
}
if (GroupedByColumn(opNode->groupClauseList, opNode->targetList,
tableNode->partitionColumn))
{
result = true;
break;
}
}
return result;
}
static bool
ExtendedOpNodeContainsRepartitionSubquery(MultiExtendedOp *originalOpNode)
{
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode);
if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect))
{
return true;
}
return false;
}
/*
* HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier
* has non-partition column reference in aggregate (distinct) definition. Note that,
* it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses.
*/
static bool
HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList)
{
List *targetVarList = pull_var_clause((Node *) targetEntryList,
PVC_INCLUDE_AGGREGATES |
PVC_RECURSE_WINDOWFUNCS);
/* having clause can't have window functions, no need to recurse for that */
List *havingVarList = pull_var_clause((Node *) havingQual, PVC_INCLUDE_AGGREGATES);
List *aggregateCheckList = list_concat(targetVarList, havingVarList);
ListCell *aggregateCheckCell = NULL;
foreach(aggregateCheckCell, aggregateCheckList)
{
Node *targetNode = lfirst(aggregateCheckCell);
Aggref *targetAgg = NULL;
List *varList = NIL;
ListCell *varCell = NULL;
bool isPartitionColumn = false;
if (IsA(targetNode, Var))
{
continue;
}
Assert(IsA(targetNode, Aggref));
targetAgg = (Aggref *) targetNode;
if (targetAgg->aggdistinct == NIL)
{
continue;
}
varList = pull_var_clause_default((Node *) targetAgg->args);
foreach(varCell, varList)
{
Node *targetVar = (Node *) lfirst(varCell);
Assert(IsA(targetVar, Var));
isPartitionColumn =
PartitionColumnInTableList((Var *) targetVar, tableNodeList);
if (!isPartitionColumn)
{
return true;
}
}
}
return false;
}
/*
* PartitionColumnInTableList returns true if provided column is a partition
* column from provided table node list. It also returns false if a column is
* partition column of an append distributed table.
*/
static bool
PartitionColumnInTableList(Var *column, List *tableNodeList)
{
ListCell *tableNodeCell = NULL;
foreach(tableNodeCell, tableNodeList)
{
MultiTable *tableNode = lfirst(tableNodeCell);
Var *partitionColumn = tableNode->partitionColumn;
if (partitionColumn != NULL &&
partitionColumn->varno == column->varno &&
partitionColumn->varattno == column->varattno)
{
Assert(partitionColumn->varno == tableNode->rangeTableId);
if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND)
{
return true;
}
}
}
return false;
}
/*
* ShouldPullDistinctColumn returns true if distinct aggregate should pull
* individual columns from worker to master and evaluate aggregate operation
* at master.
*
* Pull cases are:
* - repartition subqueries
* - query has count distinct on a non-partition column on at least one target
* - count distinct is on a non-partition column and query is not
* grouped on partition column
*/
static bool
ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg)
{
if (repartitionSubquery)
{
return true;
}
if (groupedByDisjointPartitionColumn)
{
return false;
}
else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg)
{
return true;
}
return false;
}

View File

@ -28,6 +28,7 @@
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/extended_op_node_utils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
@ -109,17 +110,15 @@ static void RemoveUnaryNode(MultiUnaryNode *unaryNode);
static void PullUpUnaryNode(MultiUnaryNode *unaryNode); static void PullUpUnaryNode(MultiUnaryNode *unaryNode);
static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode, static void ParentSetNewChild(MultiNode *parentNode, MultiNode *oldChildNode,
MultiNode *newChildNode); MultiNode *newChildNode);
static bool GroupedByDisjointPartitionColumn(List *tableNodeList,
MultiExtendedOp *opNode);
/* Local functions forward declarations for aggregate expressions */ /* Local functions forward declarations for aggregate expressions */
static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode, static void ApplyExtendedOpNodes(MultiExtendedOp *originalNode,
MultiExtendedOp *masterNode, MultiExtendedOp *masterNode,
MultiExtendedOp *workerNode); MultiExtendedOp *workerNode);
static void TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList); static void TransformSubqueryNode(MultiTable *subqueryNode);
static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode, static MultiExtendedOp * MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn, ExtendedOpNodeProperties *
List *tableNodeList); extendedOpNodeProperties);
static Node * MasterAggregateMutator(Node *originalNode, static Node * MasterAggregateMutator(Node *originalNode,
MasterAggregateWalkerContext *walkerContext); MasterAggregateWalkerContext *walkerContext);
static Expr * MasterAggregateExpression(Aggref *originalAggregate, static Expr * MasterAggregateExpression(Aggref *originalAggregate,
@ -128,14 +127,8 @@ static Expr * MasterAverageExpression(Oid sumAggregateType, Oid countAggregateTy
AttrNumber *columnId); AttrNumber *columnId);
static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression); static Expr * AddTypeConversion(Node *originalAggregate, Node *newExpression);
static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, static MultiExtendedOp * WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn, ExtendedOpNodeProperties *
List *tableNodeList); extendedOpNodeProperties);
static bool HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList);
static bool ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg);
static bool PartitionColumnInTableList(Var *column, List *tableNodeList);
static bool WorkerAggregateWalker(Node *node, static bool WorkerAggregateWalker(Node *node,
WorkerAggregateWalkerContext *walkerContext); WorkerAggregateWalkerContext *walkerContext);
static List * WorkerAggregateExpressionList(Aggref *originalAggregate, static List * WorkerAggregateExpressionList(Aggref *originalAggregate,
@ -165,7 +158,6 @@ static Var * AggregateDistinctColumn(Aggref *aggregateExpression);
static bool TablePartitioningSupportsDistinct(List *tableNodeList, static bool TablePartitioningSupportsDistinct(List *tableNodeList,
MultiExtendedOp *opNode, MultiExtendedOp *opNode,
Var *distinctColumn); Var *distinctColumn);
static bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
/* Local functions forward declarations for limit clauses */ /* Local functions forward declarations for limit clauses */
static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode, static Node * WorkerLimitCount(MultiExtendedOp *originalOpNode,
@ -200,7 +192,6 @@ void
MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan) MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
{ {
bool hasOrderByHllType = false; bool hasOrderByHllType = false;
bool groupedByDisjointPartitionColumn = false;
List *selectNodeList = NIL; List *selectNodeList = NIL;
List *projectNodeList = NIL; List *projectNodeList = NIL;
List *collectNodeList = NIL; List *collectNodeList = NIL;
@ -212,6 +203,7 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
MultiExtendedOp *extendedOpNode = NULL; MultiExtendedOp *extendedOpNode = NULL;
MultiExtendedOp *masterExtendedOpNode = NULL; MultiExtendedOp *masterExtendedOpNode = NULL;
MultiExtendedOp *workerExtendedOpNode = NULL; MultiExtendedOp *workerExtendedOpNode = NULL;
ExtendedOpNodeProperties extendedOpNodeProperties;
MultiNode *logicalPlanNode = (MultiNode *) multiLogicalPlan; MultiNode *logicalPlanNode = (MultiNode *) multiLogicalPlan;
/* check that we can optimize aggregates in the plan */ /* check that we can optimize aggregates in the plan */
@ -275,26 +267,23 @@ MultiLogicalPlanOptimize(MultiTreeRoot *multiLogicalPlan)
extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp); extendedOpNodeList = FindNodesOfType(logicalPlanNode, T_MultiExtendedOp);
extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList); extendedOpNode = (MultiExtendedOp *) linitial(extendedOpNodeList);
tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable); extendedOpNodeProperties = BuildExtendedOpNodeProperties(extendedOpNode);
groupedByDisjointPartitionColumn = GroupedByDisjointPartitionColumn(tableNodeList,
extendedOpNode);
masterExtendedOpNode = MasterExtendedOpNode(extendedOpNode, masterExtendedOpNode =
groupedByDisjointPartitionColumn, MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
tableNodeList); workerExtendedOpNode =
workerExtendedOpNode = WorkerExtendedOpNode(extendedOpNode, WorkerExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
groupedByDisjointPartitionColumn,
tableNodeList);
ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode); ApplyExtendedOpNodes(extendedOpNode, masterExtendedOpNode, workerExtendedOpNode);
tableNodeList = FindNodesOfType(logicalPlanNode, T_MultiTable);
foreach(tableNodeCell, tableNodeList) foreach(tableNodeCell, tableNodeList)
{ {
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell); MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
if (tableNode->relationId == SUBQUERY_RELATION_ID) if (tableNode->relationId == SUBQUERY_RELATION_ID)
{ {
ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode); ErrorIfContainsUnsupportedAggregate((MultiNode *) tableNode);
TransformSubqueryNode(tableNode, tableNodeList); TransformSubqueryNode(tableNode);
} }
} }
@ -1181,27 +1170,28 @@ ApplyExtendedOpNodes(MultiExtendedOp *originalNode, MultiExtendedOp *masterNode,
* operator node. * operator node.
*/ */
static void static void
TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList) TransformSubqueryNode(MultiTable *subqueryNode)
{ {
MultiExtendedOp *extendedOpNode = MultiExtendedOp *extendedOpNode =
(MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode); (MultiExtendedOp *) ChildNode((MultiUnaryNode *) subqueryNode);
MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode); MultiNode *collectNode = ChildNode((MultiUnaryNode *) extendedOpNode);
MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode); MultiNode *collectChildNode = ChildNode((MultiUnaryNode *) collectNode);
bool groupedByDisjointPartitionColumn =
GroupedByDisjointPartitionColumn(tableNodeList, extendedOpNode); ExtendedOpNodeProperties extendedOpNodeProperties =
BuildExtendedOpNodeProperties(extendedOpNode);
MultiExtendedOp *masterExtendedOpNode = MultiExtendedOp *masterExtendedOpNode =
MasterExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, MasterExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
tableNodeList);
MultiExtendedOp *workerExtendedOpNode = MultiExtendedOp *workerExtendedOpNode =
WorkerExtendedOpNode(extendedOpNode, groupedByDisjointPartitionColumn, WorkerExtendedOpNode(extendedOpNode, &extendedOpNodeProperties);
tableNodeList);
MultiPartition *partitionNode = CitusMakeNode(MultiPartition);
List *groupClauseList = extendedOpNode->groupClauseList; List *groupClauseList = extendedOpNode->groupClauseList;
List *targetEntryList = extendedOpNode->targetList; List *targetEntryList = extendedOpNode->targetList;
List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, targetEntryList); List *groupTargetEntryList = GroupTargetEntryList(groupClauseList, targetEntryList);
TargetEntry *groupByTargetEntry = (TargetEntry *) linitial(groupTargetEntryList); TargetEntry *groupByTargetEntry = (TargetEntry *) linitial(groupTargetEntryList);
Expr *groupByExpression = groupByTargetEntry->expr; Expr *groupByExpression = groupByTargetEntry->expr;
MultiPartition *partitionNode = CitusMakeNode(MultiPartition);
/* /*
* If group by is on a function expression, then we create a new column from * If group by is on a function expression, then we create a new column from
* function expression result type. Because later while creating partition * function expression result type. Because later while creating partition
@ -1257,8 +1247,7 @@ TransformSubqueryNode(MultiTable *subqueryNode, List *tableNodeList)
*/ */
static MultiExtendedOp * static MultiExtendedOp *
MasterExtendedOpNode(MultiExtendedOp *originalOpNode, MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn, ExtendedOpNodeProperties *extendedOpNodeProperties)
List *tableNodeList)
{ {
MultiExtendedOp *masterExtendedOpNode = NULL; MultiExtendedOp *masterExtendedOpNode = NULL;
List *targetEntryList = originalOpNode->targetList; List *targetEntryList = originalOpNode->targetList;
@ -1266,26 +1255,11 @@ MasterExtendedOpNode(MultiExtendedOp *originalOpNode,
ListCell *targetEntryCell = NULL; ListCell *targetEntryCell = NULL;
Node *originalHavingQual = originalOpNode->havingQual; Node *originalHavingQual = originalOpNode->havingQual;
Node *newHavingQual = NULL; Node *newHavingQual = NULL;
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode);
MasterAggregateWalkerContext *walkerContext = palloc0( MasterAggregateWalkerContext *walkerContext = palloc0(
sizeof(MasterAggregateWalkerContext)); sizeof(MasterAggregateWalkerContext));
bool hasNonPartitionColumnDistinctAgg = false;
bool repartitionSubquery = false;
walkerContext->columnId = 1; walkerContext->columnId = 1;
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect))
{
repartitionSubquery = true;
}
hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList,
originalHavingQual,
tableNodeList);
walkerContext->pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
/* iterate over original target entries */ /* iterate over original target entries */
foreach(targetEntryCell, targetEntryList) foreach(targetEntryCell, targetEntryList)
@ -1844,12 +1818,9 @@ AddTypeConversion(Node *originalAggregate, Node *newExpression)
*/ */
static MultiExtendedOp * static MultiExtendedOp *
WorkerExtendedOpNode(MultiExtendedOp *originalOpNode, WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool groupedByDisjointPartitionColumn, ExtendedOpNodeProperties *extendedOpNodeProperties)
List *tableNodeList)
{ {
MultiExtendedOp *workerExtendedOpNode = NULL; MultiExtendedOp *workerExtendedOpNode = NULL;
MultiNode *parentNode = ParentNode((MultiNode *) originalOpNode);
MultiNode *childNode = ChildNode((MultiUnaryNode *) originalOpNode);
List *targetEntryList = originalOpNode->targetList; List *targetEntryList = originalOpNode->targetList;
ListCell *targetEntryCell = NULL; ListCell *targetEntryCell = NULL;
List *newTargetEntryList = NIL; List *newTargetEntryList = NIL;
@ -1863,13 +1834,12 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
bool distinctClauseSupersetofGroupClause = false; bool distinctClauseSupersetofGroupClause = false;
bool distinctPreventsLimitPushdown = false; bool distinctPreventsLimitPushdown = false;
bool createdNewGroupByClause = false; bool createdNewGroupByClause = false;
bool hasNonPartitionColumnDistinctAgg = false; bool groupedByDisjointPartitionColumn =
bool repartitionSubquery = false; extendedOpNodeProperties->groupedByDisjointPartitionColumn;
bool pushDownWindowFunction = extendedOpNodeProperties->pushDownWindowFunctions;
/* only window functions that can be pushed down reach here */
bool pushDownWindowFunction = originalOpNode->hasWindowFuncs;
walkerContext->expressionList = NIL; walkerContext->expressionList = NIL;
walkerContext->pullDistinctColumns = extendedOpNodeProperties->pullDistinctColumns;
/* find max of sort group ref index */ /* find max of sort group ref index */
foreach(targetEntryCell, targetEntryList) foreach(targetEntryCell, targetEntryList)
@ -1884,18 +1854,6 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
/* next group ref index starts from max group ref index + 1 */ /* next group ref index starts from max group ref index + 1 */
nextSortGroupRefIndex++; nextSortGroupRefIndex++;
if (CitusIsA(parentNode, MultiTable) && CitusIsA(childNode, MultiCollect))
{
repartitionSubquery = true;
}
hasNonPartitionColumnDistinctAgg = HasNonPartitionColumnDistinctAgg(targetEntryList,
havingQual,
tableNodeList);
walkerContext->pullDistinctColumns =
ShouldPullDistinctColumn(repartitionSubquery, groupedByDisjointPartitionColumn,
hasNonPartitionColumnDistinctAgg);
/* iterate over original target entries */ /* iterate over original target entries */
foreach(targetEntryCell, targetEntryList) foreach(targetEntryCell, targetEntryList)
{ {
@ -2152,130 +2110,6 @@ WorkerExtendedOpNode(MultiExtendedOp *originalOpNode,
} }
/*
* HasNonPartitionColumnDistinctAgg returns true if target entry or having qualifier
* has non-partition column reference in aggregate (distinct) definition. Note that,
* it only checks aggs subfield of Aggref, it does not check FILTER or SORT clauses.
*/
static bool
HasNonPartitionColumnDistinctAgg(List *targetEntryList, Node *havingQual,
List *tableNodeList)
{
List *targetVarList = pull_var_clause((Node *) targetEntryList,
PVC_INCLUDE_AGGREGATES |
PVC_RECURSE_WINDOWFUNCS);
/* having clause can't have window functions, no need to recurse for that */
List *havingVarList = pull_var_clause((Node *) havingQual, PVC_INCLUDE_AGGREGATES);
List *aggregateCheckList = list_concat(targetVarList, havingVarList);
ListCell *aggregateCheckCell = NULL;
foreach(aggregateCheckCell, aggregateCheckList)
{
Node *targetNode = lfirst(aggregateCheckCell);
Aggref *targetAgg = NULL;
List *varList = NIL;
ListCell *varCell = NULL;
bool isPartitionColumn = false;
if (IsA(targetNode, Var))
{
continue;
}
Assert(IsA(targetNode, Aggref));
targetAgg = (Aggref *) targetNode;
if (targetAgg->aggdistinct == NIL)
{
continue;
}
varList = pull_var_clause_default((Node *) targetAgg->args);
foreach(varCell, varList)
{
Node *targetVar = (Node *) lfirst(varCell);
Assert(IsA(targetVar, Var));
isPartitionColumn =
PartitionColumnInTableList((Var *) targetVar, tableNodeList);
if (!isPartitionColumn)
{
return true;
}
}
}
return false;
}
/*
* ShouldPullDistinctColumn returns true if distinct aggregate should pull
* individual columns from worker to master and evaluate aggregate operation
* at master.
*
* Pull cases are:
* - repartition subqueries
* - query has count distinct on a non-partition column on at least one target
* - count distinct is on a non-partition column and query is not
* grouped on partition column
*/
static bool
ShouldPullDistinctColumn(bool repartitionSubquery,
bool groupedByDisjointPartitionColumn,
bool hasNonPartitionColumnDistinctAgg)
{
if (repartitionSubquery)
{
return true;
}
if (groupedByDisjointPartitionColumn)
{
return false;
}
else if (!groupedByDisjointPartitionColumn && hasNonPartitionColumnDistinctAgg)
{
return true;
}
return false;
}
/*
* PartitionColumnInTableList returns true if provided column is a partition
* column from provided table node list. It also returns false if a column is
* partition column of an append distributed table.
*/
static bool
PartitionColumnInTableList(Var *column, List *tableNodeList)
{
ListCell *tableNodeCell = NULL;
foreach(tableNodeCell, tableNodeList)
{
MultiTable *tableNode = lfirst(tableNodeCell);
Var *partitionColumn = tableNode->partitionColumn;
if (partitionColumn != NULL &&
partitionColumn->varno == column->varno &&
partitionColumn->varattno == column->varattno)
{
Assert(partitionColumn->varno == tableNode->rangeTableId);
if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND)
{
return true;
}
}
}
return false;
}
/* /*
* WorkerAggregateWalker walks over the original target entry expression, and * WorkerAggregateWalker walks over the original target entry expression, and
* creates the list of expression trees (potentially more than one) to execute * creates the list of expression trees (potentially more than one) to execute
@ -2632,46 +2466,6 @@ TypeOid(Oid schemaId, const char *typeName)
} }
/*
* GroupedByDisjointPartitionColumn returns true if the query is grouped by the
* partition column of a table whose shards have disjoint sets of partition values.
*/
static bool
GroupedByDisjointPartitionColumn(List *tableNodeList, MultiExtendedOp *opNode)
{
bool result = false;
ListCell *tableNodeCell = NULL;
foreach(tableNodeCell, tableNodeList)
{
MultiTable *tableNode = (MultiTable *) lfirst(tableNodeCell);
Oid relationId = tableNode->relationId;
char partitionMethod = 0;
if (relationId == SUBQUERY_RELATION_ID || !IsDistributedTable(relationId))
{
continue;
}
partitionMethod = PartitionMethod(relationId);
if (partitionMethod != DISTRIBUTE_BY_RANGE &&
partitionMethod != DISTRIBUTE_BY_HASH)
{
continue;
}
if (GroupedByColumn(opNode->groupClauseList, opNode->targetList,
tableNode->partitionColumn))
{
result = true;
break;
}
}
return result;
}
/* /*
* CreateSortGroupClause creates SortGroupClause for a given column Var. * CreateSortGroupClause creates SortGroupClause for a given column Var.
* The caller should set tleSortGroupRef field and respective * The caller should set tleSortGroupRef field and respective
@ -3213,7 +3007,7 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
* GroupedByColumn walks over group clauses in the given list, and checks if any * GroupedByColumn walks over group clauses in the given list, and checks if any
* of the group clauses is on the given column. * of the group clauses is on the given column.
*/ */
static bool bool
GroupedByColumn(List *groupClauseList, List *targetList, Var *column) GroupedByColumn(List *groupClauseList, List *targetList, Var *column)
{ {
bool groupedByColumn = false; bool groupedByColumn = false;

View File

@ -0,0 +1,38 @@
/*-------------------------------------------------------------------------
*
* extended_op_node_utils.h
* General Citus planner code.
*
* Copyright (c) 2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#ifndef EXTENDED_OP_NODE_UTILS_H_
#define EXTENDED_OP_NODE_UTILS_H_
#include "distributed/multi_logical_planner.h"
/*
* ExtendedOpNodeProperties is a helper structure that is used to
* share the common information among the worker and master extended
* op nodes.
*
* It is designed to be a read-only singleton object per extended op node
* generation and processing.
*/
typedef struct ExtendedOpNodeProperties
{
bool groupedByDisjointPartitionColumn;
bool repartitionSubquery;
bool hasNonPartitionColumnDistinctAgg;
bool pullDistinctColumns;
bool pushDownWindowFunctions;
} ExtendedOpNodeProperties;
extern ExtendedOpNodeProperties BuildExtendedOpNodeProperties(MultiExtendedOp *
extendedOpNode);
#endif /* EXTENDED_OP_NODE_UTILS_H_ */

View File

@ -131,6 +131,7 @@ extern Oid FunctionOid(const char *schemaName, const char *functionName,
int argumentCount); int argumentCount);
/* Function declaration for helper functions in subquery pushdown */ /* Function declaration for helper functions in subquery pushdown */
extern bool GroupedByColumn(List *groupClauseList, List *targetList, Var *column);
extern List * SubqueryMultiTableList(MultiNode *multiNode); extern List * SubqueryMultiTableList(MultiNode *multiNode);
extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList); extern List * GroupTargetEntryList(List *groupClauseList, List *targetEntryList);
extern bool ExtractQueryWalker(Node *node, List **queryList); extern bool ExtractQueryWalker(Node *node, List **queryList);