Simplify intermediate result pruning logic

pull/3720/head
Marco Slot 2020-04-06 17:01:08 +02:00
parent a710b3cdc5
commit 84672c3dbd
8 changed files with 107 additions and 192 deletions

View File

@ -22,6 +22,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodefuncs.h"
#include "distributed/citus_nodes.h" #include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/cte_inline.h" #include "distributed/cte_inline.h"
#include "distributed/function_call_delegation.h" #include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
@ -91,8 +92,6 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue
bool hasUnresolvedParams, bool hasUnresolvedParams,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery);
static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId); relationId);
@ -955,8 +954,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
if (distributedPlan->planningError == NULL) if (distributedPlan->planningError == NULL)
{ {
FinalizeDistributedPlan(distributedPlan, originalQuery);
return distributedPlan; return distributedPlan;
} }
else else
@ -977,8 +974,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
plannerRestrictionContext); plannerRestrictionContext);
if (distributedPlan->planningError == NULL) if (distributedPlan->planningError == NULL)
{ {
FinalizeDistributedPlan(distributedPlan, originalQuery);
return distributedPlan; return distributedPlan;
} }
else else
@ -1075,8 +1070,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
Assert(distributedPlan != NULL); Assert(distributedPlan != NULL);
distributedPlan->subPlanList = subPlanList; distributedPlan->subPlanList = subPlanList;
FinalizeDistributedPlan(distributedPlan, originalQuery);
return distributedPlan; return distributedPlan;
} }
@ -1087,8 +1080,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
*/ */
if (IsModifyCommand(originalQuery)) if (IsModifyCommand(originalQuery))
{ {
FinalizeDistributedPlan(distributedPlan, originalQuery);
return distributedPlan; return distributedPlan;
} }
@ -1120,87 +1111,10 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
/* distributed plan currently should always succeed or error out */ /* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL); Assert(distributedPlan && distributedPlan->planningError == NULL);
FinalizeDistributedPlan(distributedPlan, originalQuery);
return distributedPlan; return distributedPlan;
} }
/*
* FinalizeDistributedPlan is the final step of distributed planning. The function
* currently only implements some optimizations for intermediate result(s) pruning.
*/
static void
FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery)
{
/*
* Fast path queries, we cannot have any subplans by their definition,
* so skip expensive traversals.
*/
if (!plan->fastPathRouterPlan)
{
RecordSubPlansUsedInPlan(plan, originalQuery);
}
}
/*
* RecordSubPlansUsedInPlan gets a distributed plan a queryTree, and
* updates the usedSubPlanNodeList of the distributed plan.
*
* The function simply pulls all the subPlans that are used in the queryTree
* with one exception: subPlans in the HAVING clause. The reason is explained
* in the function.
*/
static void
RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery)
{
Node *havingQual = originalQuery->havingQual;
/* temporarily set to NULL, we're going to restore before the function returns */
originalQuery->havingQual = NULL;
/*
* Mark the subplans as needed on remote side. Note that this decision is revisited
* on execution, when the query only consists of intermediate results.
*/
List *subplansExceptHaving = FindSubPlansUsedInNode((Node *) originalQuery);
UpdateUsedPlanListLocation(subplansExceptHaving, SUBPLAN_ACCESS_REMOTE);
/* do the same for HAVING part of the query */
List *subplansInHaving = NIL;
if (originalQuery->hasSubLinks &&
FindNodeCheck(havingQual, IsNodeSubquery))
{
subplansInHaving = FindSubPlansUsedInNode(havingQual);
if (plan->masterQuery)
{
/*
* If we have the master query, we're sure that the result is needed locally.
* Otherwise, such as router queries, the plan may not be required locally.
* Note that if the query consists of only intermediate results, the executor
* may still prefer to write locally.
*
* If any of the subplansInHaving is used in other parts of the query,
* we'll later merge those subPlans and send to remote.
*/
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_LOCAL);
}
else
{
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_REMOTE);
}
}
/* set back the havingQual and the calculated subplans */
originalQuery->havingQual = havingQual;
/* merge the used subplans */
plan->usedSubPlanNodeList =
MergeUsedSubPlanLists(subplansExceptHaving, subplansInHaving);
}
/* /*
* EnsurePartitionTableNotReplicated errors out if the infput relation is * EnsurePartitionTableNotReplicated errors out if the infput relation is
* a partition table and the table has a replication factor greater than * a partition table and the table has a replication factor greater than
@ -1426,6 +1340,22 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
customScan->custom_private = list_make1(distributedPlanData); customScan->custom_private = list_make1(distributedPlanData);
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
/*
* Fast path queries cannot have any subplans by definition, so skip
* expensive traversals.
*/
if (!distributedPlan->fastPathRouterPlan)
{
/*
* Record subplans used by distributed plan to make intermediate result
* pruning easier.
*
* We do this before finalizing the plan, because the masterQuery is
* rewritten by standard_planner in FinalizeNonRouterPlan.
*/
distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan);
}
if (distributedPlan->masterQuery) if (distributedPlan->masterQuery)
{ {
finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan); finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);

View File

@ -13,6 +13,7 @@
*/ */
#include "distributed/citus_custom_scan.h" #include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_result_pruning.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/log_utils.h" #include "distributed/log_utils.h"
@ -24,6 +25,8 @@
/* controlled via GUC, used mostly for testing */ /* controlled via GUC, used mostly for testing */
bool LogIntermediateResults = false; bool LogIntermediateResults = false;
static List * FindSubPlansUsedInNode(Node *node);
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
DistributedPlan *distributedPlan, DistributedPlan *distributedPlan,
int workerNodeCount); int workerNodeCount);
@ -31,17 +34,49 @@ static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
List *workerNodeList); List *workerNodeList);
static bool UsedSubPlansEqual(UsedDistributedSubPlan *left, static void UpdateUsedPlanListLocation(List *subPlanList, int locationMask);
UsedDistributedSubPlan *right);
static UsedDistributedSubPlan * UsedSubPlanListMember(List *list,
UsedDistributedSubPlan *usedPlan); /*
* FindSubPlanUsages finds the subplans used in the master query and the
* job query and returns them as a combined list of UsedDistributedSubPlan
* structs.
*
* The list may contain duplicates if the subplan is referenced multiple
* times.
*/
List *
FindSubPlanUsages(DistributedPlan *plan)
{
List *localSubPlans = NIL;
List *remoteSubPlans = NIL;
if (plan->masterQuery != NULL)
{
localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery);
UpdateUsedPlanListLocation(localSubPlans, SUBPLAN_ACCESS_LOCAL);
}
if (plan->workerJob != NULL)
{
/*
* Mark the subplans as needed on remote side. Note that this decision is revisited
* on execution, when the query only consists of intermediate results.
*/
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery);
UpdateUsedPlanListLocation(remoteSubPlans, SUBPLAN_ACCESS_REMOTE);
}
/* merge the used subplans */
return list_concat(localSubPlans, remoteSubPlans);
}
/* /*
* FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing * FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing
* the input node. * the input node.
*/ */
List * static List *
FindSubPlansUsedInNode(Node *node) FindSubPlansUsedInNode(Node *node)
{ {
List *rangeTableList = NIL; List *rangeTableList = NIL;
@ -75,10 +110,7 @@ FindSubPlansUsedInNode(Node *node)
/* the callers are responsible for setting the accurate location */ /* the callers are responsible for setting the accurate location */
usedPlan->locationMask = SUBPLAN_ACCESS_NONE; usedPlan->locationMask = SUBPLAN_ACCESS_NONE;
if (!UsedSubPlanListMember(usedSubPlanList, usedPlan)) usedSubPlanList = lappend(usedSubPlanList, usedPlan);
{
usedSubPlanList = lappend(usedSubPlanList, usedPlan);
}
} }
} }
@ -117,12 +149,6 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
IntermediateResultsHashEntry *entry = SearchIntermediateResult( IntermediateResultsHashEntry *entry = SearchIntermediateResult(
intermediateResultsHash, resultId); intermediateResultsHash, resultId);
if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL)
{
/* subPlan needs to be written locally as the planner decided */
entry->writeLocalFile = true;
}
/* /*
* There is no need to traverse the subplan if the intermediate result * There is no need to traverse the subplan if the intermediate result
* will be written to a local file and sent to all nodes. Note that the * will be written to a local file and sent to all nodes. Note that the
@ -133,7 +159,14 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
elog(DEBUG4, "Subplan %s is used in all workers", resultId); elog(DEBUG4, "Subplan %s is used in all workers", resultId);
continue; continue;
} }
else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL)
{
/* subPlan needs to be written locally as the planner decided */
entry->writeLocalFile = true;
}
if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
{ {
/* /*
* traverse the plan and add find all worker nodes * traverse the plan and add find all worker nodes
@ -383,66 +416,11 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId)
} }
/*
* MergeUsedSubPlanLists is a utility function that merges the two input
* UsedSubPlan lists. Existence of the items of the rightSubPlanList
* checked in leftSubPlanList. If not found, items are added to
* leftSubPlanList. If found, the locationMask fields are merged.
*
* Finally, the in-place modified leftSubPlanList is returned.
*/
List *
MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList)
{
ListCell *rightListCell;
foreach(rightListCell, rightSubPlanList)
{
UsedDistributedSubPlan *memberOnRightList = lfirst(rightListCell);
UsedDistributedSubPlan *memberOnLeftList =
UsedSubPlanListMember(leftSubPlanList, memberOnRightList);
if (memberOnLeftList == NULL)
{
leftSubPlanList = lappend(leftSubPlanList, memberOnRightList);
}
else
{
memberOnLeftList->locationMask |= memberOnRightList->locationMask;
}
}
return leftSubPlanList;
}
/*
* UsedSubPlanListMember is a utility function inspired from list_member(),
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
* function defined (similar to all Citus node types).
*/
static UsedDistributedSubPlan *
UsedSubPlanListMember(List *usedSubPlanList, UsedDistributedSubPlan *usedPlan)
{
const ListCell *usedSubPlanCell;
foreach(usedSubPlanCell, usedSubPlanList)
{
if (UsedSubPlansEqual(lfirst(usedSubPlanCell), usedPlan))
{
return lfirst(usedSubPlanCell);
}
}
return NULL;
}
/* /*
* UpdateUsedPlanListLocation is a utility function which iterates over the list * UpdateUsedPlanListLocation is a utility function which iterates over the list
* and updates the subPlanLocation to the input location. * and updates the subPlanLocation to the input location.
*/ */
void static void
UpdateUsedPlanListLocation(List *subPlanList, int locationMask) UpdateUsedPlanListLocation(List *subPlanList, int locationMask)
{ {
ListCell *subPlanCell = NULL; ListCell *subPlanCell = NULL;
@ -453,25 +431,3 @@ UpdateUsedPlanListLocation(List *subPlanList, int locationMask)
subPlan->locationMask |= locationMask; subPlan->locationMask |= locationMask;
} }
} }
/*
* UsedSubPlansEqual is a utility function inspired from equal(),
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
* function defined (similar to all Citus node types).
*/
static bool
UsedSubPlansEqual(UsedDistributedSubPlan *left, UsedDistributedSubPlan *right)
{
if (left == NULL || right == NULL)
{
return false;
}
if (strncmp(left->subPlanId, right->subPlanId, NAMEDATALEN) == 0)
{
return true;
}
return false;
}

View File

@ -207,10 +207,6 @@ BuildSelectStatementViaStdPlanner(Query *masterQuery, List *masterTargetList,
remoteScan->custom_scan_tlist = copyObject(masterTargetList); remoteScan->custom_scan_tlist = copyObject(masterTargetList);
remoteScan->scan.plan.targetlist = copyObject(masterTargetList); remoteScan->scan.plan.targetlist = copyObject(masterTargetList);
/* probably want to do this where we add sublinks to the master plan */
masterQuery->hasSubLinks = checkExprHasSubLink((Node *) masterQuery);
Assert(masterQuery->hasWindowFuncs == contain_window_function((Node *) masterQuery));
/* /*
* We will overwrite the alias of the rangetable which describes the custom scan. * We will overwrite the alias of the rangetable which describes the custom scan.
* Ideally we would have set the correct column names and alias on the range table in * Ideally we would have set the correct column names and alias on the range table in

View File

@ -732,6 +732,9 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
jobQuery->hasDistinctOn = hasDistinctOn; jobQuery->hasDistinctOn = hasDistinctOn;
jobQuery->windowClause = windowClause; jobQuery->windowClause = windowClause;
jobQuery->hasWindowFuncs = hasWindowFuncs; jobQuery->hasWindowFuncs = hasWindowFuncs;
jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery);
Assert(jobQuery->hasWindowFuncs == contain_window_function((Node *) jobQuery));
return jobQuery; return jobQuery;
} }
@ -1271,6 +1274,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis
funcColumnTypes, funcColumnTypes,
funcColumnTypeMods, funcColumnTypeMods,
funcCollations); funcCollations);
RangeTblRef *rangeTableRef = makeNode(RangeTblRef); RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
rangeTableRef->rtindex = list_length(*rangeTableList) + 1; rangeTableRef->rtindex = list_length(*rangeTableList) + 1;
@ -1604,6 +1608,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
jobQuery->distinctClause = distinctClause; jobQuery->distinctClause = distinctClause;
jobQuery->hasWindowFuncs = hasWindowFuncs; jobQuery->hasWindowFuncs = hasWindowFuncs;
jobQuery->windowClause = windowClause; jobQuery->windowClause = windowClause;
jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery);
return jobQuery; return jobQuery;
} }

View File

@ -20,7 +20,7 @@
extern bool LogIntermediateResults; extern bool LogIntermediateResults;
extern List * FindSubPlansUsedInNode(Node *node); extern List * FindSubPlanUsages(DistributedPlan *plan);
extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash,
char *resultId); char *resultId);
extern HTAB * MakeIntermediateResultHTAB(void); extern HTAB * MakeIntermediateResultHTAB(void);
@ -29,8 +29,4 @@ extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash, extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash,
char *resultId); char *resultId);
/* utility functions related to UsedSubPlans */
extern List * MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList);
extern void UpdateUsedPlanListLocation(List *subPlanList, int localtionMask);
#endif /* INTERMEDIATE_RESULT_PRUNING_H */ #endif /* INTERMEDIATE_RESULT_PRUNING_H */

View File

@ -911,8 +911,16 @@ RESET citus.coordinator_aggregation_strategy;
SELECT t1.event_type FROM events_table t1 SELECT t1.event_type FROM events_table t1
GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1)) GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1))
ORDER BY 1; ORDER BY 1;
ERROR: result "68_1" does not exist event_type
CONTEXT: while executing command on localhost:xxxxx ---------------------------------------------------------------------
0
1
2
3
4
5
(6 rows)
SELECT t1.event_type FROM events_table t1 SELECT t1.event_type FROM events_table t1
GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3) GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3)
ORDER BY 1; ORDER BY 1;

View File

@ -196,6 +196,20 @@ ORDER BY
(6 rows) (6 rows)
DROP VIEW users_view, window_view; DROP VIEW users_view, window_view;
-- window functions along with subquery in HAVING
SELECT
user_id, count (user_id) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table)
ORDER BY 1 DESC,2 DESC
LIMIT 1;
user_id | count
---------------------------------------------------------------------
6 | 1
(1 row)
-- window function uses columns from two different tables -- window function uses columns from two different tables
SELECT SELECT
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk

View File

@ -110,6 +110,16 @@ ORDER BY
DROP VIEW users_view, window_view; DROP VIEW users_view, window_view;
-- window functions along with subquery in HAVING
SELECT
user_id, count (user_id) OVER (PARTITION BY user_id)
FROM
users_table
GROUP BY
user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table)
ORDER BY 1 DESC,2 DESC
LIMIT 1;
-- window function uses columns from two different tables -- window function uses columns from two different tables
SELECT SELECT
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk