From 84672c3dbdcd319b0f1f9b042625056817281292 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 6 Apr 2020 17:01:08 +0200 Subject: [PATCH 1/2] Simplify intermediate result pruning logic --- .../distributed/planner/distributed_planner.c | 104 +++---------- .../planner/intermediate_result_pruning.c | 144 ++++++------------ .../planner/multi_master_planner.c | 4 - .../planner/multi_physical_planner.c | 5 + .../distributed/intermediate_result_pruning.h | 6 +- src/test/regress/expected/multi_subquery.out | 12 +- .../regress/expected/window_functions.out | 14 ++ src/test/regress/sql/window_functions.sql | 10 ++ 8 files changed, 107 insertions(+), 192 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 321fdacb1..719e7d498 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -22,6 +22,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" +#include "distributed/citus_ruleutils.h" #include "distributed/cte_inline.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" @@ -91,8 +92,6 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); -static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery); -static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -955,8 +954,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery); - return distributedPlan; } else @@ -977,8 +974,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); if (distributedPlan->planningError == NULL) { - FinalizeDistributedPlan(distributedPlan, originalQuery); - return distributedPlan; } else @@ -1075,8 +1070,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi Assert(distributedPlan != NULL); distributedPlan->subPlanList = subPlanList; - FinalizeDistributedPlan(distributedPlan, originalQuery); - return distributedPlan; } @@ -1087,8 +1080,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi */ if (IsModifyCommand(originalQuery)) { - FinalizeDistributedPlan(distributedPlan, originalQuery); - return distributedPlan; } @@ -1120,87 +1111,10 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* distributed plan currently should always succeed or error out */ Assert(distributedPlan && distributedPlan->planningError == NULL); - FinalizeDistributedPlan(distributedPlan, originalQuery); - return distributedPlan; } -/* - * FinalizeDistributedPlan is the final step of distributed planning. The function - * currently only implements some optimizations for intermediate result(s) pruning. - */ -static void -FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) -{ - /* - * Fast path queries, we cannot have any subplans by their definition, - * so skip expensive traversals. - */ - if (!plan->fastPathRouterPlan) - { - RecordSubPlansUsedInPlan(plan, originalQuery); - } -} - - -/* - * RecordSubPlansUsedInPlan gets a distributed plan a queryTree, and - * updates the usedSubPlanNodeList of the distributed plan. - * - * The function simply pulls all the subPlans that are used in the queryTree - * with one exception: subPlans in the HAVING clause. The reason is explained - * in the function. - */ -static void -RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery) -{ - Node *havingQual = originalQuery->havingQual; - - /* temporarily set to NULL, we're going to restore before the function returns */ - originalQuery->havingQual = NULL; - - /* - * Mark the subplans as needed on remote side. Note that this decision is revisited - * on execution, when the query only consists of intermediate results. - */ - List *subplansExceptHaving = FindSubPlansUsedInNode((Node *) originalQuery); - UpdateUsedPlanListLocation(subplansExceptHaving, SUBPLAN_ACCESS_REMOTE); - - /* do the same for HAVING part of the query */ - List *subplansInHaving = NIL; - if (originalQuery->hasSubLinks && - FindNodeCheck(havingQual, IsNodeSubquery)) - { - subplansInHaving = FindSubPlansUsedInNode(havingQual); - if (plan->masterQuery) - { - /* - * If we have the master query, we're sure that the result is needed locally. - * Otherwise, such as router queries, the plan may not be required locally. - * Note that if the query consists of only intermediate results, the executor - * may still prefer to write locally. - * - * If any of the subplansInHaving is used in other parts of the query, - * we'll later merge those subPlans and send to remote. - */ - UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_LOCAL); - } - else - { - UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_REMOTE); - } - } - - /* set back the havingQual and the calculated subplans */ - originalQuery->havingQual = havingQual; - - /* merge the used subplans */ - plan->usedSubPlanNodeList = - MergeUsedSubPlanLists(subplansExceptHaving, subplansInHaving); -} - - /* * EnsurePartitionTableNotReplicated errors out if the infput relation is * a partition table and the table has a replication factor greater than @@ -1426,6 +1340,22 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan) customScan->custom_private = list_make1(distributedPlanData); customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; + /* + * Fast path queries cannot have any subplans by definition, so skip + * expensive traversals. + */ + if (!distributedPlan->fastPathRouterPlan) + { + /* + * Record subplans used by distributed plan to make intermediate result + * pruning easier. + * + * We do this before finalizing the plan, because the masterQuery is + * rewritten by standard_planner in FinalizeNonRouterPlan. + */ + distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan); + } + if (distributedPlan->masterQuery) { finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan); diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index f5809d59a..1346f4b95 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -13,6 +13,7 @@ */ #include "distributed/citus_custom_scan.h" +#include "distributed/citus_ruleutils.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/listutils.h" #include "distributed/log_utils.h" @@ -24,6 +25,8 @@ /* controlled via GUC, used mostly for testing */ bool LogIntermediateResults = false; + +static List * FindSubPlansUsedInNode(Node *node); static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, int workerNodeCount); @@ -31,17 +34,49 @@ static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, List *workerNodeList); -static bool UsedSubPlansEqual(UsedDistributedSubPlan *left, - UsedDistributedSubPlan *right); -static UsedDistributedSubPlan * UsedSubPlanListMember(List *list, - UsedDistributedSubPlan *usedPlan); +static void UpdateUsedPlanListLocation(List *subPlanList, int locationMask); + + +/* + * FindSubPlanUsages finds the subplans used in the master query and the + * job query and returns them as a combined list of UsedDistributedSubPlan + * structs. + * + * The list may contain duplicates if the subplan is referenced multiple + * times. + */ +List * +FindSubPlanUsages(DistributedPlan *plan) +{ + List *localSubPlans = NIL; + List *remoteSubPlans = NIL; + + if (plan->masterQuery != NULL) + { + localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery); + UpdateUsedPlanListLocation(localSubPlans, SUBPLAN_ACCESS_LOCAL); + } + + if (plan->workerJob != NULL) + { + /* + * Mark the subplans as needed on remote side. Note that this decision is revisited + * on execution, when the query only consists of intermediate results. + */ + remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery); + UpdateUsedPlanListLocation(remoteSubPlans, SUBPLAN_ACCESS_REMOTE); + } + + /* merge the used subplans */ + return list_concat(localSubPlans, remoteSubPlans); +} /* * FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing * the input node. */ -List * +static List * FindSubPlansUsedInNode(Node *node) { List *rangeTableList = NIL; @@ -75,10 +110,7 @@ FindSubPlansUsedInNode(Node *node) /* the callers are responsible for setting the accurate location */ usedPlan->locationMask = SUBPLAN_ACCESS_NONE; - if (!UsedSubPlanListMember(usedSubPlanList, usedPlan)) - { - usedSubPlanList = lappend(usedSubPlanList, usedPlan); - } + usedSubPlanList = lappend(usedSubPlanList, usedPlan); } } @@ -117,12 +149,6 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, IntermediateResultsHashEntry *entry = SearchIntermediateResult( intermediateResultsHash, resultId); - if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL) - { - /* subPlan needs to be written locally as the planner decided */ - entry->writeLocalFile = true; - } - /* * There is no need to traverse the subplan if the intermediate result * will be written to a local file and sent to all nodes. Note that the @@ -133,7 +159,14 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, elog(DEBUG4, "Subplan %s is used in all workers", resultId); continue; } - else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) + + if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL) + { + /* subPlan needs to be written locally as the planner decided */ + entry->writeLocalFile = true; + } + + if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) { /* * traverse the plan and add find all worker nodes @@ -383,66 +416,11 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) } -/* - * MergeUsedSubPlanLists is a utility function that merges the two input - * UsedSubPlan lists. Existence of the items of the rightSubPlanList - * checked in leftSubPlanList. If not found, items are added to - * leftSubPlanList. If found, the locationMask fields are merged. - * - * Finally, the in-place modified leftSubPlanList is returned. - */ -List * -MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList) -{ - ListCell *rightListCell; - - foreach(rightListCell, rightSubPlanList) - { - UsedDistributedSubPlan *memberOnRightList = lfirst(rightListCell); - UsedDistributedSubPlan *memberOnLeftList = - UsedSubPlanListMember(leftSubPlanList, memberOnRightList); - - if (memberOnLeftList == NULL) - { - leftSubPlanList = lappend(leftSubPlanList, memberOnRightList); - } - else - { - memberOnLeftList->locationMask |= memberOnRightList->locationMask; - } - } - - return leftSubPlanList; -} - - -/* - * UsedSubPlanListMember is a utility function inspired from list_member(), - * but operating on UsedDistributedSubPlan struct, which doesn't have equal() - * function defined (similar to all Citus node types). - */ -static UsedDistributedSubPlan * -UsedSubPlanListMember(List *usedSubPlanList, UsedDistributedSubPlan *usedPlan) -{ - const ListCell *usedSubPlanCell; - - foreach(usedSubPlanCell, usedSubPlanList) - { - if (UsedSubPlansEqual(lfirst(usedSubPlanCell), usedPlan)) - { - return lfirst(usedSubPlanCell); - } - } - - return NULL; -} - - /* * UpdateUsedPlanListLocation is a utility function which iterates over the list * and updates the subPlanLocation to the input location. */ -void +static void UpdateUsedPlanListLocation(List *subPlanList, int locationMask) { ListCell *subPlanCell = NULL; @@ -453,25 +431,3 @@ UpdateUsedPlanListLocation(List *subPlanList, int locationMask) subPlan->locationMask |= locationMask; } } - - -/* - * UsedSubPlansEqual is a utility function inspired from equal(), - * but operating on UsedDistributedSubPlan struct, which doesn't have equal() - * function defined (similar to all Citus node types). - */ -static bool -UsedSubPlansEqual(UsedDistributedSubPlan *left, UsedDistributedSubPlan *right) -{ - if (left == NULL || right == NULL) - { - return false; - } - - if (strncmp(left->subPlanId, right->subPlanId, NAMEDATALEN) == 0) - { - return true; - } - - return false; -} diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index 5c6cc2474..a574ccd7f 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -207,10 +207,6 @@ BuildSelectStatementViaStdPlanner(Query *masterQuery, List *masterTargetList, remoteScan->custom_scan_tlist = copyObject(masterTargetList); remoteScan->scan.plan.targetlist = copyObject(masterTargetList); - /* probably want to do this where we add sublinks to the master plan */ - masterQuery->hasSubLinks = checkExprHasSubLink((Node *) masterQuery); - Assert(masterQuery->hasWindowFuncs == contain_window_function((Node *) masterQuery)); - /* * We will overwrite the alias of the rangetable which describes the custom scan. * Ideally we would have set the correct column names and alias on the range table in diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a2f239d33..7f9b52060 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -732,6 +732,9 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList) jobQuery->hasDistinctOn = hasDistinctOn; jobQuery->windowClause = windowClause; jobQuery->hasWindowFuncs = hasWindowFuncs; + jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery); + + Assert(jobQuery->hasWindowFuncs == contain_window_function((Node *) jobQuery)); return jobQuery; } @@ -1271,6 +1274,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis funcColumnTypes, funcColumnTypeMods, funcCollations); + RangeTblRef *rangeTableRef = makeNode(RangeTblRef); rangeTableRef->rtindex = list_length(*rangeTableList) + 1; @@ -1604,6 +1608,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode) jobQuery->distinctClause = distinctClause; jobQuery->hasWindowFuncs = hasWindowFuncs; jobQuery->windowClause = windowClause; + jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery); return jobQuery; } diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h index 2d1ca3050..5bde93313 100644 --- a/src/include/distributed/intermediate_result_pruning.h +++ b/src/include/distributed/intermediate_result_pruning.h @@ -20,7 +20,7 @@ extern bool LogIntermediateResults; -extern List * FindSubPlansUsedInNode(Node *node); +extern List * FindSubPlanUsages(DistributedPlan *plan); extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, char *resultId); extern HTAB * MakeIntermediateResultHTAB(void); @@ -29,8 +29,4 @@ extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash, char *resultId); -/* utility functions related to UsedSubPlans */ -extern List * MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList); -extern void UpdateUsedPlanListLocation(List *subPlanList, int localtionMask); - #endif /* INTERMEDIATE_RESULT_PRUNING_H */ diff --git a/src/test/regress/expected/multi_subquery.out b/src/test/regress/expected/multi_subquery.out index 4a0afb2bb..c5b41d5f3 100644 --- a/src/test/regress/expected/multi_subquery.out +++ b/src/test/regress/expected/multi_subquery.out @@ -911,8 +911,16 @@ RESET citus.coordinator_aggregation_strategy; SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1)) ORDER BY 1; -ERROR: result "68_1" does not exist -CONTEXT: while executing command on localhost:xxxxx + event_type +--------------------------------------------------------------------- + 0 + 1 + 2 + 3 + 4 + 5 +(6 rows) + SELECT t1.event_type FROM events_table t1 GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3) ORDER BY 1; diff --git a/src/test/regress/expected/window_functions.out b/src/test/regress/expected/window_functions.out index 01094c544..e46ac41a5 100644 --- a/src/test/regress/expected/window_functions.out +++ b/src/test/regress/expected/window_functions.out @@ -196,6 +196,20 @@ ORDER BY (6 rows) DROP VIEW users_view, window_view; +-- window functions along with subquery in HAVING +SELECT + user_id, count (user_id) OVER (PARTITION BY user_id) +FROM + users_table +GROUP BY + user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table) +ORDER BY 1 DESC,2 DESC +LIMIT 1; + user_id | count +--------------------------------------------------------------------- + 6 | 1 +(1 row) + -- window function uses columns from two different tables SELECT DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk diff --git a/src/test/regress/sql/window_functions.sql b/src/test/regress/sql/window_functions.sql index ed67fe22d..899140bd3 100644 --- a/src/test/regress/sql/window_functions.sql +++ b/src/test/regress/sql/window_functions.sql @@ -110,6 +110,16 @@ ORDER BY DROP VIEW users_view, window_view; +-- window functions along with subquery in HAVING +SELECT + user_id, count (user_id) OVER (PARTITION BY user_id) +FROM + users_table +GROUP BY + user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table) +ORDER BY 1 DESC,2 DESC +LIMIT 1; + -- window function uses columns from two different tables SELECT DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk From 2632343f64cb92b3ad0cee72e9b3cb3d77c3cfd1 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 6 Apr 2020 17:29:22 +0200 Subject: [PATCH 2/2] Fix intermediate result pruning for INSERT..SELECT --- .../planner/intermediate_result_pruning.c | 84 ++++++++++++------- .../distributed/utils/citus_copyfuncs.c | 2 +- .../distributed/utils/citus_outfuncs.c | 2 +- .../distributed/multi_physical_planner.h | 27 ++++-- .../expected/intermediate_result_pruning.out | 31 +++++++ .../sql/intermediate_result_pruning.sql | 18 ++++ 6 files changed, 123 insertions(+), 41 deletions(-) diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 1346f4b95..3a318672a 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -26,15 +26,15 @@ bool LogIntermediateResults = false; -static List * FindSubPlansUsedInNode(Node *node); +static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType); static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, int workerNodeCount); +static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry); static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry); static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, List *workerNodeList); -static void UpdateUsedPlanListLocation(List *subPlanList, int locationMask); /* @@ -53,18 +53,33 @@ FindSubPlanUsages(DistributedPlan *plan) if (plan->masterQuery != NULL) { - localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery); - UpdateUsedPlanListLocation(localSubPlans, SUBPLAN_ACCESS_LOCAL); + localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery, + SUBPLAN_ACCESS_LOCAL); } if (plan->workerJob != NULL) { /* - * Mark the subplans as needed on remote side. Note that this decision is revisited - * on execution, when the query only consists of intermediate results. + * Mark the subplans as needed on remote side. Note that this decision is + * revisited on execution, when the query only consists of intermediate + * results. */ - remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery); - UpdateUsedPlanListLocation(remoteSubPlans, SUBPLAN_ACCESS_REMOTE); + remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery, + SUBPLAN_ACCESS_REMOTE); + } + + if (plan->insertSelectQuery != NULL) + { + /* INSERT..SELECT plans currently do not have a workerJob */ + Assert(plan->workerJob == NULL); + + /* + * The SELECT in an INSERT..SELECT is not fully planned yet and we cannot + * perform pruning. We therefore require all subplans used in the + * INSERT..SELECT to be available all nodes. + */ + remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->insertSelectQuery, + SUBPLAN_ACCESS_ANYWHERE); } /* merge the used subplans */ @@ -77,7 +92,7 @@ FindSubPlanUsages(DistributedPlan *plan) * the input node. */ static List * -FindSubPlansUsedInNode(Node *node) +FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType) { List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; @@ -106,9 +121,7 @@ FindSubPlansUsedInNode(Node *node) UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan); usedPlan->subPlanId = pstrdup(resultId); - - /* the callers are responsible for setting the accurate location */ - usedPlan->locationMask = SUBPLAN_ACCESS_NONE; + usedPlan->accessType = accessType; usedSubPlanList = lappend(usedSubPlanList, usedPlan); } @@ -160,13 +173,12 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, continue; } - if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL) + if (usedPlan->accessType == SUBPLAN_ACCESS_LOCAL) { /* subPlan needs to be written locally as the planner decided */ entry->writeLocalFile = true; } - - if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) + else if (usedPlan->accessType == SUBPLAN_ACCESS_REMOTE) { /* * traverse the plan and add find all worker nodes @@ -179,6 +191,12 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId); } + else if (usedPlan->accessType == SUBPLAN_ACCESS_ANYWHERE) + { + /* subplan is needed on all nodes */ + entry->writeLocalFile = true; + AppendAllWorkerNodes(entry); + } } /* descend into the subPlans */ @@ -243,6 +261,25 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, } +/* + * AppendAllWorkerNodes appends all node IDs of readable worker nodes to the + * nodeIdList, meaning the corresponding intermediate result should be sent + * to all readable nodes. + */ +static void +AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) +{ + List *workerNodeList = ActiveReadableWorkerNodeList(); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + entry->nodeIdList = + list_append_unique_int(entry->nodeIdList, workerNode->nodeId); + } +} + + /* * MakeIntermediateResultHTAB is a helper method that creates a Hash Table that * stores information on the intermediate result. @@ -414,20 +451,3 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) return entry; } - - -/* - * UpdateUsedPlanListLocation is a utility function which iterates over the list - * and updates the subPlanLocation to the input location. - */ -static void -UpdateUsedPlanListLocation(List *subPlanList, int locationMask) -{ - ListCell *subPlanCell = NULL; - foreach(subPlanCell, subPlanList) - { - UsedDistributedSubPlan *subPlan = lfirst(subPlanCell); - - subPlan->locationMask |= locationMask; - } -} diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 1aa27b5bf..38f31ce85 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -141,7 +141,7 @@ CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS) DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan); COPY_STRING_FIELD(subPlanId); - COPY_SCALAR_FIELD(locationMask); + COPY_SCALAR_FIELD(accessType); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 2f11a4cf4..a6515f599 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -222,7 +222,7 @@ OutUsedDistributedSubPlan(OUTFUNC_ARGS) WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN"); WRITE_STRING_FIELD(subPlanId); - WRITE_INT_FIELD(locationMask); + WRITE_ENUM_FIELD(accessType, SubPlanAccessType); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 1da038cd7..ba0c00d28 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -48,10 +48,6 @@ #define MERGE_FILES_AND_RUN_QUERY_COMMAND \ "SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)" -#define SUBPLAN_ACCESS_NONE 0 -#define SUBPLAN_ACCESS_LOCAL 1 -#define SUBPLAN_ACCESS_REMOTE 2 - typedef enum CitusRTEKind { @@ -407,10 +403,14 @@ typedef struct DistributedPlan /* * List of subPlans that are used in the DistributedPlan * Note that this is different that "subPlanList" field which - * contains the subplans generated part of the DistributedPlan. + * contains the subplans generated as part of the DistributedPlan. * * On the other hand, usedSubPlanNodeList keeps track of which subPlans - * are used within this distributed plan. + * are used within this distributed plan as a list of + * UsedDistributedSubPlan pointers. + * + * The list may contain duplicates if the subplan is referenced multiple + * times (e.g. a CTE appears in the query tree multiple times). */ List *usedSubPlanNodeList; @@ -444,6 +444,16 @@ typedef struct DistributedSubPlan } DistributedSubPlan; +/* defines how a subplan is used by a distributed query */ +typedef enum SubPlanAccessType +{ + SUBPLAN_ACCESS_NONE, + SUBPLAN_ACCESS_LOCAL, + SUBPLAN_ACCESS_REMOTE, + SUBPLAN_ACCESS_ANYWHERE +} SubPlanAccessType; + + /* * UsedDistributedSubPlan contains information about a subPlan that is used in a * distributed plan. @@ -452,8 +462,11 @@ typedef struct UsedDistributedSubPlan { CitusNode type; + /* subplan used by the distributed query */ char *subPlanId; - int locationMask; + + /* how the subplan is used by a distributed query */ + SubPlanAccessType accessType; } UsedDistributedSubPlan; diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index e499e4893..c265a7c9b 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -1031,6 +1031,37 @@ DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx 100 (1 row) +RESET citus.task_assignment_policy; +-- Insert..select is planned differently, make sure we have results everywhere. +-- We put the insert..select in a CTE here to prevent the CTE from being moved +-- into the select, which would follow the regular code path for select. +WITH stats AS ( + SELECT count(key) m FROM table_3 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key > (SELECT m FROM stats) + GROUP BY key + HAVING count(*) < (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM intermediate_result_pruning.table_3 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO intermediate_result_pruning.table_2 (key, value) SELECT key, count(*) AS count FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.>) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 1 +(1 row) + SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT; DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned; diff --git a/src/test/regress/sql/intermediate_result_pruning.sql b/src/test/regress/sql/intermediate_result_pruning.sql index a0fe32145..b6e17723d 100644 --- a/src/test/regress/sql/intermediate_result_pruning.sql +++ b/src/test/regress/sql/intermediate_result_pruning.sql @@ -607,6 +607,24 @@ FROM FROM accounts_cte INNER JOIN joined_stats_cte_2 USING (account_id) ) inner_query; +RESET citus.task_assignment_policy; + +-- Insert..select is planned differently, make sure we have results everywhere. +-- We put the insert..select in a CTE here to prevent the CTE from being moved +-- into the select, which would follow the regular code path for select. +WITH stats AS ( + SELECT count(key) m FROM table_3 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key > (SELECT m FROM stats) + GROUP BY key + HAVING count(*) < (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; SET citus.task_assignment_policy to DEFAULT; SET client_min_messages TO DEFAULT;