mirror of https://github.com/citusdata/citus.git
Merge pull request #3720 from citusdata/fix/intermediate_result_pruning
Simplify and fix issues in intermediate result pruningpull/3715/head^2
commit
225adbc7ac
|
@ -22,6 +22,7 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/citus_nodefuncs.h"
|
#include "distributed/citus_nodefuncs.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/cte_inline.h"
|
#include "distributed/cte_inline.h"
|
||||||
#include "distributed/function_call_delegation.h"
|
#include "distributed/function_call_delegation.h"
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
|
@ -91,8 +92,6 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue
|
||||||
bool hasUnresolvedParams,
|
bool hasUnresolvedParams,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery);
|
|
||||||
static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery);
|
|
||||||
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
|
||||||
relationId);
|
relationId);
|
||||||
|
|
||||||
|
@ -955,8 +954,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
|
|
||||||
if (distributedPlan->planningError == NULL)
|
if (distributedPlan->planningError == NULL)
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -977,8 +974,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
if (distributedPlan->planningError == NULL)
|
if (distributedPlan->planningError == NULL)
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1075,8 +1070,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
Assert(distributedPlan != NULL);
|
Assert(distributedPlan != NULL);
|
||||||
distributedPlan->subPlanList = subPlanList;
|
distributedPlan->subPlanList = subPlanList;
|
||||||
|
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1087,8 +1080,6 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
*/
|
*/
|
||||||
if (IsModifyCommand(originalQuery))
|
if (IsModifyCommand(originalQuery))
|
||||||
{
|
{
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1120,87 +1111,10 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
/* distributed plan currently should always succeed or error out */
|
/* distributed plan currently should always succeed or error out */
|
||||||
Assert(distributedPlan && distributedPlan->planningError == NULL);
|
Assert(distributedPlan && distributedPlan->planningError == NULL);
|
||||||
|
|
||||||
FinalizeDistributedPlan(distributedPlan, originalQuery);
|
|
||||||
|
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* FinalizeDistributedPlan is the final step of distributed planning. The function
|
|
||||||
* currently only implements some optimizations for intermediate result(s) pruning.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Fast path queries, we cannot have any subplans by their definition,
|
|
||||||
* so skip expensive traversals.
|
|
||||||
*/
|
|
||||||
if (!plan->fastPathRouterPlan)
|
|
||||||
{
|
|
||||||
RecordSubPlansUsedInPlan(plan, originalQuery);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* RecordSubPlansUsedInPlan gets a distributed plan a queryTree, and
|
|
||||||
* updates the usedSubPlanNodeList of the distributed plan.
|
|
||||||
*
|
|
||||||
* The function simply pulls all the subPlans that are used in the queryTree
|
|
||||||
* with one exception: subPlans in the HAVING clause. The reason is explained
|
|
||||||
* in the function.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery)
|
|
||||||
{
|
|
||||||
Node *havingQual = originalQuery->havingQual;
|
|
||||||
|
|
||||||
/* temporarily set to NULL, we're going to restore before the function returns */
|
|
||||||
originalQuery->havingQual = NULL;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Mark the subplans as needed on remote side. Note that this decision is revisited
|
|
||||||
* on execution, when the query only consists of intermediate results.
|
|
||||||
*/
|
|
||||||
List *subplansExceptHaving = FindSubPlansUsedInNode((Node *) originalQuery);
|
|
||||||
UpdateUsedPlanListLocation(subplansExceptHaving, SUBPLAN_ACCESS_REMOTE);
|
|
||||||
|
|
||||||
/* do the same for HAVING part of the query */
|
|
||||||
List *subplansInHaving = NIL;
|
|
||||||
if (originalQuery->hasSubLinks &&
|
|
||||||
FindNodeCheck(havingQual, IsNodeSubquery))
|
|
||||||
{
|
|
||||||
subplansInHaving = FindSubPlansUsedInNode(havingQual);
|
|
||||||
if (plan->masterQuery)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* If we have the master query, we're sure that the result is needed locally.
|
|
||||||
* Otherwise, such as router queries, the plan may not be required locally.
|
|
||||||
* Note that if the query consists of only intermediate results, the executor
|
|
||||||
* may still prefer to write locally.
|
|
||||||
*
|
|
||||||
* If any of the subplansInHaving is used in other parts of the query,
|
|
||||||
* we'll later merge those subPlans and send to remote.
|
|
||||||
*/
|
|
||||||
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_LOCAL);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_REMOTE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* set back the havingQual and the calculated subplans */
|
|
||||||
originalQuery->havingQual = havingQual;
|
|
||||||
|
|
||||||
/* merge the used subplans */
|
|
||||||
plan->usedSubPlanNodeList =
|
|
||||||
MergeUsedSubPlanLists(subplansExceptHaving, subplansInHaving);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsurePartitionTableNotReplicated errors out if the infput relation is
|
* EnsurePartitionTableNotReplicated errors out if the infput relation is
|
||||||
* a partition table and the table has a replication factor greater than
|
* a partition table and the table has a replication factor greater than
|
||||||
|
@ -1426,6 +1340,22 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
|
||||||
customScan->custom_private = list_make1(distributedPlanData);
|
customScan->custom_private = list_make1(distributedPlanData);
|
||||||
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fast path queries cannot have any subplans by definition, so skip
|
||||||
|
* expensive traversals.
|
||||||
|
*/
|
||||||
|
if (!distributedPlan->fastPathRouterPlan)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Record subplans used by distributed plan to make intermediate result
|
||||||
|
* pruning easier.
|
||||||
|
*
|
||||||
|
* We do this before finalizing the plan, because the masterQuery is
|
||||||
|
* rewritten by standard_planner in FinalizeNonRouterPlan.
|
||||||
|
*/
|
||||||
|
distributedPlan->usedSubPlanNodeList = FindSubPlanUsages(distributedPlan);
|
||||||
|
}
|
||||||
|
|
||||||
if (distributedPlan->masterQuery)
|
if (distributedPlan->masterQuery)
|
||||||
{
|
{
|
||||||
finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
|
finalPlan = FinalizeNonRouterPlan(localPlan, distributedPlan, customScan);
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "distributed/citus_custom_scan.h"
|
#include "distributed/citus_custom_scan.h"
|
||||||
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/intermediate_result_pruning.h"
|
#include "distributed/intermediate_result_pruning.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/log_utils.h"
|
#include "distributed/log_utils.h"
|
||||||
|
@ -24,25 +25,74 @@
|
||||||
/* controlled via GUC, used mostly for testing */
|
/* controlled via GUC, used mostly for testing */
|
||||||
bool LogIntermediateResults = false;
|
bool LogIntermediateResults = false;
|
||||||
|
|
||||||
|
|
||||||
|
static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType);
|
||||||
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
DistributedPlan *distributedPlan,
|
DistributedPlan *distributedPlan,
|
||||||
int workerNodeCount);
|
int workerNodeCount);
|
||||||
|
static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry);
|
||||||
static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry);
|
static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry);
|
||||||
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
||||||
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
|
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
|
||||||
List *workerNodeList);
|
List *workerNodeList);
|
||||||
static bool UsedSubPlansEqual(UsedDistributedSubPlan *left,
|
|
||||||
UsedDistributedSubPlan *right);
|
|
||||||
static UsedDistributedSubPlan * UsedSubPlanListMember(List *list,
|
/*
|
||||||
UsedDistributedSubPlan *usedPlan);
|
* FindSubPlanUsages finds the subplans used in the master query and the
|
||||||
|
* job query and returns them as a combined list of UsedDistributedSubPlan
|
||||||
|
* structs.
|
||||||
|
*
|
||||||
|
* The list may contain duplicates if the subplan is referenced multiple
|
||||||
|
* times.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
FindSubPlanUsages(DistributedPlan *plan)
|
||||||
|
{
|
||||||
|
List *localSubPlans = NIL;
|
||||||
|
List *remoteSubPlans = NIL;
|
||||||
|
|
||||||
|
if (plan->masterQuery != NULL)
|
||||||
|
{
|
||||||
|
localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery,
|
||||||
|
SUBPLAN_ACCESS_LOCAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (plan->workerJob != NULL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Mark the subplans as needed on remote side. Note that this decision is
|
||||||
|
* revisited on execution, when the query only consists of intermediate
|
||||||
|
* results.
|
||||||
|
*/
|
||||||
|
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery,
|
||||||
|
SUBPLAN_ACCESS_REMOTE);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (plan->insertSelectQuery != NULL)
|
||||||
|
{
|
||||||
|
/* INSERT..SELECT plans currently do not have a workerJob */
|
||||||
|
Assert(plan->workerJob == NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The SELECT in an INSERT..SELECT is not fully planned yet and we cannot
|
||||||
|
* perform pruning. We therefore require all subplans used in the
|
||||||
|
* INSERT..SELECT to be available all nodes.
|
||||||
|
*/
|
||||||
|
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->insertSelectQuery,
|
||||||
|
SUBPLAN_ACCESS_ANYWHERE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* merge the used subplans */
|
||||||
|
return list_concat(localSubPlans, remoteSubPlans);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing
|
* FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing
|
||||||
* the input node.
|
* the input node.
|
||||||
*/
|
*/
|
||||||
List *
|
static List *
|
||||||
FindSubPlansUsedInNode(Node *node)
|
FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
@ -71,16 +121,11 @@ FindSubPlansUsedInNode(Node *node)
|
||||||
UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan);
|
UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan);
|
||||||
|
|
||||||
usedPlan->subPlanId = pstrdup(resultId);
|
usedPlan->subPlanId = pstrdup(resultId);
|
||||||
|
usedPlan->accessType = accessType;
|
||||||
|
|
||||||
/* the callers are responsible for setting the accurate location */
|
|
||||||
usedPlan->locationMask = SUBPLAN_ACCESS_NONE;
|
|
||||||
|
|
||||||
if (!UsedSubPlanListMember(usedSubPlanList, usedPlan))
|
|
||||||
{
|
|
||||||
usedSubPlanList = lappend(usedSubPlanList, usedPlan);
|
usedSubPlanList = lappend(usedSubPlanList, usedPlan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return usedSubPlanList;
|
return usedSubPlanList;
|
||||||
}
|
}
|
||||||
|
@ -117,12 +162,6 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
IntermediateResultsHashEntry *entry = SearchIntermediateResult(
|
IntermediateResultsHashEntry *entry = SearchIntermediateResult(
|
||||||
intermediateResultsHash, resultId);
|
intermediateResultsHash, resultId);
|
||||||
|
|
||||||
if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL)
|
|
||||||
{
|
|
||||||
/* subPlan needs to be written locally as the planner decided */
|
|
||||||
entry->writeLocalFile = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There is no need to traverse the subplan if the intermediate result
|
* There is no need to traverse the subplan if the intermediate result
|
||||||
* will be written to a local file and sent to all nodes. Note that the
|
* will be written to a local file and sent to all nodes. Note that the
|
||||||
|
@ -133,7 +172,13 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
|
|
||||||
|
if (usedPlan->accessType == SUBPLAN_ACCESS_LOCAL)
|
||||||
|
{
|
||||||
|
/* subPlan needs to be written locally as the planner decided */
|
||||||
|
entry->writeLocalFile = true;
|
||||||
|
}
|
||||||
|
else if (usedPlan->accessType == SUBPLAN_ACCESS_REMOTE)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* traverse the plan and add find all worker nodes
|
* traverse the plan and add find all worker nodes
|
||||||
|
@ -146,6 +191,12 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
|
|
||||||
elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId);
|
elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId);
|
||||||
}
|
}
|
||||||
|
else if (usedPlan->accessType == SUBPLAN_ACCESS_ANYWHERE)
|
||||||
|
{
|
||||||
|
/* subplan is needed on all nodes */
|
||||||
|
entry->writeLocalFile = true;
|
||||||
|
AppendAllWorkerNodes(entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* descend into the subPlans */
|
/* descend into the subPlans */
|
||||||
|
@ -210,6 +261,25 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AppendAllWorkerNodes appends all node IDs of readable worker nodes to the
|
||||||
|
* nodeIdList, meaning the corresponding intermediate result should be sent
|
||||||
|
* to all readable nodes.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AppendAllWorkerNodes(IntermediateResultsHashEntry *entry)
|
||||||
|
{
|
||||||
|
List *workerNodeList = ActiveReadableWorkerNodeList();
|
||||||
|
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
{
|
||||||
|
entry->nodeIdList =
|
||||||
|
list_append_unique_int(entry->nodeIdList, workerNode->nodeId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MakeIntermediateResultHTAB is a helper method that creates a Hash Table that
|
* MakeIntermediateResultHTAB is a helper method that creates a Hash Table that
|
||||||
* stores information on the intermediate result.
|
* stores information on the intermediate result.
|
||||||
|
@ -381,97 +451,3 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId)
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* MergeUsedSubPlanLists is a utility function that merges the two input
|
|
||||||
* UsedSubPlan lists. Existence of the items of the rightSubPlanList
|
|
||||||
* checked in leftSubPlanList. If not found, items are added to
|
|
||||||
* leftSubPlanList. If found, the locationMask fields are merged.
|
|
||||||
*
|
|
||||||
* Finally, the in-place modified leftSubPlanList is returned.
|
|
||||||
*/
|
|
||||||
List *
|
|
||||||
MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList)
|
|
||||||
{
|
|
||||||
ListCell *rightListCell;
|
|
||||||
|
|
||||||
foreach(rightListCell, rightSubPlanList)
|
|
||||||
{
|
|
||||||
UsedDistributedSubPlan *memberOnRightList = lfirst(rightListCell);
|
|
||||||
UsedDistributedSubPlan *memberOnLeftList =
|
|
||||||
UsedSubPlanListMember(leftSubPlanList, memberOnRightList);
|
|
||||||
|
|
||||||
if (memberOnLeftList == NULL)
|
|
||||||
{
|
|
||||||
leftSubPlanList = lappend(leftSubPlanList, memberOnRightList);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
memberOnLeftList->locationMask |= memberOnRightList->locationMask;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return leftSubPlanList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UsedSubPlanListMember is a utility function inspired from list_member(),
|
|
||||||
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
|
|
||||||
* function defined (similar to all Citus node types).
|
|
||||||
*/
|
|
||||||
static UsedDistributedSubPlan *
|
|
||||||
UsedSubPlanListMember(List *usedSubPlanList, UsedDistributedSubPlan *usedPlan)
|
|
||||||
{
|
|
||||||
const ListCell *usedSubPlanCell;
|
|
||||||
|
|
||||||
foreach(usedSubPlanCell, usedSubPlanList)
|
|
||||||
{
|
|
||||||
if (UsedSubPlansEqual(lfirst(usedSubPlanCell), usedPlan))
|
|
||||||
{
|
|
||||||
return lfirst(usedSubPlanCell);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UpdateUsedPlanListLocation is a utility function which iterates over the list
|
|
||||||
* and updates the subPlanLocation to the input location.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
UpdateUsedPlanListLocation(List *subPlanList, int locationMask)
|
|
||||||
{
|
|
||||||
ListCell *subPlanCell = NULL;
|
|
||||||
foreach(subPlanCell, subPlanList)
|
|
||||||
{
|
|
||||||
UsedDistributedSubPlan *subPlan = lfirst(subPlanCell);
|
|
||||||
|
|
||||||
subPlan->locationMask |= locationMask;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* UsedSubPlansEqual is a utility function inspired from equal(),
|
|
||||||
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
|
|
||||||
* function defined (similar to all Citus node types).
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
UsedSubPlansEqual(UsedDistributedSubPlan *left, UsedDistributedSubPlan *right)
|
|
||||||
{
|
|
||||||
if (left == NULL || right == NULL)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (strncmp(left->subPlanId, right->subPlanId, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
|
@ -207,10 +207,6 @@ BuildSelectStatementViaStdPlanner(Query *masterQuery, List *masterTargetList,
|
||||||
remoteScan->custom_scan_tlist = copyObject(masterTargetList);
|
remoteScan->custom_scan_tlist = copyObject(masterTargetList);
|
||||||
remoteScan->scan.plan.targetlist = copyObject(masterTargetList);
|
remoteScan->scan.plan.targetlist = copyObject(masterTargetList);
|
||||||
|
|
||||||
/* probably want to do this where we add sublinks to the master plan */
|
|
||||||
masterQuery->hasSubLinks = checkExprHasSubLink((Node *) masterQuery);
|
|
||||||
Assert(masterQuery->hasWindowFuncs == contain_window_function((Node *) masterQuery));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We will overwrite the alias of the rangetable which describes the custom scan.
|
* We will overwrite the alias of the rangetable which describes the custom scan.
|
||||||
* Ideally we would have set the correct column names and alias on the range table in
|
* Ideally we would have set the correct column names and alias on the range table in
|
||||||
|
|
|
@ -732,6 +732,9 @@ BuildJobQuery(MultiNode *multiNode, List *dependentJobList)
|
||||||
jobQuery->hasDistinctOn = hasDistinctOn;
|
jobQuery->hasDistinctOn = hasDistinctOn;
|
||||||
jobQuery->windowClause = windowClause;
|
jobQuery->windowClause = windowClause;
|
||||||
jobQuery->hasWindowFuncs = hasWindowFuncs;
|
jobQuery->hasWindowFuncs = hasWindowFuncs;
|
||||||
|
jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery);
|
||||||
|
|
||||||
|
Assert(jobQuery->hasWindowFuncs == contain_window_function((Node *) jobQuery));
|
||||||
|
|
||||||
return jobQuery;
|
return jobQuery;
|
||||||
}
|
}
|
||||||
|
@ -1271,6 +1274,7 @@ QueryJoinTree(MultiNode *multiNode, List *dependentJobList, List **rangeTableLis
|
||||||
funcColumnTypes,
|
funcColumnTypes,
|
||||||
funcColumnTypeMods,
|
funcColumnTypeMods,
|
||||||
funcCollations);
|
funcCollations);
|
||||||
|
|
||||||
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
|
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
|
||||||
|
|
||||||
rangeTableRef->rtindex = list_length(*rangeTableList) + 1;
|
rangeTableRef->rtindex = list_length(*rangeTableList) + 1;
|
||||||
|
@ -1604,6 +1608,7 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
|
||||||
jobQuery->distinctClause = distinctClause;
|
jobQuery->distinctClause = distinctClause;
|
||||||
jobQuery->hasWindowFuncs = hasWindowFuncs;
|
jobQuery->hasWindowFuncs = hasWindowFuncs;
|
||||||
jobQuery->windowClause = windowClause;
|
jobQuery->windowClause = windowClause;
|
||||||
|
jobQuery->hasSubLinks = checkExprHasSubLink((Node *) jobQuery);
|
||||||
|
|
||||||
return jobQuery;
|
return jobQuery;
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS)
|
||||||
DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan);
|
DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan);
|
||||||
|
|
||||||
COPY_STRING_FIELD(subPlanId);
|
COPY_STRING_FIELD(subPlanId);
|
||||||
COPY_SCALAR_FIELD(locationMask);
|
COPY_SCALAR_FIELD(accessType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -222,7 +222,7 @@ OutUsedDistributedSubPlan(OUTFUNC_ARGS)
|
||||||
WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
|
WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
|
||||||
|
|
||||||
WRITE_STRING_FIELD(subPlanId);
|
WRITE_STRING_FIELD(subPlanId);
|
||||||
WRITE_INT_FIELD(locationMask);
|
WRITE_ENUM_FIELD(accessType, SubPlanAccessType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
extern bool LogIntermediateResults;
|
extern bool LogIntermediateResults;
|
||||||
|
|
||||||
extern List * FindSubPlansUsedInNode(Node *node);
|
extern List * FindSubPlanUsages(DistributedPlan *plan);
|
||||||
extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash,
|
extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash,
|
||||||
char *resultId);
|
char *resultId);
|
||||||
extern HTAB * MakeIntermediateResultHTAB(void);
|
extern HTAB * MakeIntermediateResultHTAB(void);
|
||||||
|
@ -29,8 +29,4 @@ extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash,
|
extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash,
|
||||||
char *resultId);
|
char *resultId);
|
||||||
|
|
||||||
/* utility functions related to UsedSubPlans */
|
|
||||||
extern List * MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList);
|
|
||||||
extern void UpdateUsedPlanListLocation(List *subPlanList, int localtionMask);
|
|
||||||
|
|
||||||
#endif /* INTERMEDIATE_RESULT_PRUNING_H */
|
#endif /* INTERMEDIATE_RESULT_PRUNING_H */
|
||||||
|
|
|
@ -48,10 +48,6 @@
|
||||||
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
|
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
|
||||||
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)"
|
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)"
|
||||||
|
|
||||||
#define SUBPLAN_ACCESS_NONE 0
|
|
||||||
#define SUBPLAN_ACCESS_LOCAL 1
|
|
||||||
#define SUBPLAN_ACCESS_REMOTE 2
|
|
||||||
|
|
||||||
|
|
||||||
typedef enum CitusRTEKind
|
typedef enum CitusRTEKind
|
||||||
{
|
{
|
||||||
|
@ -407,10 +403,14 @@ typedef struct DistributedPlan
|
||||||
/*
|
/*
|
||||||
* List of subPlans that are used in the DistributedPlan
|
* List of subPlans that are used in the DistributedPlan
|
||||||
* Note that this is different that "subPlanList" field which
|
* Note that this is different that "subPlanList" field which
|
||||||
* contains the subplans generated part of the DistributedPlan.
|
* contains the subplans generated as part of the DistributedPlan.
|
||||||
*
|
*
|
||||||
* On the other hand, usedSubPlanNodeList keeps track of which subPlans
|
* On the other hand, usedSubPlanNodeList keeps track of which subPlans
|
||||||
* are used within this distributed plan.
|
* are used within this distributed plan as a list of
|
||||||
|
* UsedDistributedSubPlan pointers.
|
||||||
|
*
|
||||||
|
* The list may contain duplicates if the subplan is referenced multiple
|
||||||
|
* times (e.g. a CTE appears in the query tree multiple times).
|
||||||
*/
|
*/
|
||||||
List *usedSubPlanNodeList;
|
List *usedSubPlanNodeList;
|
||||||
|
|
||||||
|
@ -444,6 +444,16 @@ typedef struct DistributedSubPlan
|
||||||
} DistributedSubPlan;
|
} DistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
|
/* defines how a subplan is used by a distributed query */
|
||||||
|
typedef enum SubPlanAccessType
|
||||||
|
{
|
||||||
|
SUBPLAN_ACCESS_NONE,
|
||||||
|
SUBPLAN_ACCESS_LOCAL,
|
||||||
|
SUBPLAN_ACCESS_REMOTE,
|
||||||
|
SUBPLAN_ACCESS_ANYWHERE
|
||||||
|
} SubPlanAccessType;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UsedDistributedSubPlan contains information about a subPlan that is used in a
|
* UsedDistributedSubPlan contains information about a subPlan that is used in a
|
||||||
* distributed plan.
|
* distributed plan.
|
||||||
|
@ -452,8 +462,11 @@ typedef struct UsedDistributedSubPlan
|
||||||
{
|
{
|
||||||
CitusNode type;
|
CitusNode type;
|
||||||
|
|
||||||
|
/* subplan used by the distributed query */
|
||||||
char *subPlanId;
|
char *subPlanId;
|
||||||
int locationMask;
|
|
||||||
|
/* how the subplan is used by a distributed query */
|
||||||
|
SubPlanAccessType accessType;
|
||||||
} UsedDistributedSubPlan;
|
} UsedDistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1031,6 +1031,37 @@ DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
|
||||||
100
|
100
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.task_assignment_policy;
|
||||||
|
-- Insert..select is planned differently, make sure we have results everywhere.
|
||||||
|
-- We put the insert..select in a CTE here to prevent the CTE from being moved
|
||||||
|
-- into the select, which would follow the regular code path for select.
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_3
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key > (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) < (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM intermediate_result_pruning.table_3
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO intermediate_result_pruning.table_2 (key, value) SELECT key, count(*) AS count FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.>) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SET citus.task_assignment_policy to DEFAULT;
|
SET citus.task_assignment_policy to DEFAULT;
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;
|
DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;
|
||||||
|
|
|
@ -911,8 +911,16 @@ RESET citus.coordinator_aggregation_strategy;
|
||||||
SELECT t1.event_type FROM events_table t1
|
SELECT t1.event_type FROM events_table t1
|
||||||
GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1))
|
GROUP BY t1.event_type HAVING t1.event_type > corr(t1.value_3, t1.value_2 + (SELECT t2.value_2 FROM users_table t2 ORDER BY 1 DESC LIMIT 1))
|
||||||
ORDER BY 1;
|
ORDER BY 1;
|
||||||
ERROR: result "68_1" does not exist
|
event_type
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
5
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
SELECT t1.event_type FROM events_table t1
|
SELECT t1.event_type FROM events_table t1
|
||||||
GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3)
|
GROUP BY t1.event_type HAVING t1.event_type * 5 > sum(distinct t1.value_3)
|
||||||
ORDER BY 1;
|
ORDER BY 1;
|
||||||
|
|
|
@ -196,6 +196,20 @@ ORDER BY
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
DROP VIEW users_view, window_view;
|
DROP VIEW users_view, window_view;
|
||||||
|
-- window functions along with subquery in HAVING
|
||||||
|
SELECT
|
||||||
|
user_id, count (user_id) OVER (PARTITION BY user_id)
|
||||||
|
FROM
|
||||||
|
users_table
|
||||||
|
GROUP BY
|
||||||
|
user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table)
|
||||||
|
ORDER BY 1 DESC,2 DESC
|
||||||
|
LIMIT 1;
|
||||||
|
user_id | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- window function uses columns from two different tables
|
-- window function uses columns from two different tables
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
|
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
|
||||||
|
|
|
@ -607,6 +607,24 @@ FROM
|
||||||
FROM accounts_cte
|
FROM accounts_cte
|
||||||
INNER JOIN joined_stats_cte_2 USING (account_id)
|
INNER JOIN joined_stats_cte_2 USING (account_id)
|
||||||
) inner_query;
|
) inner_query;
|
||||||
|
RESET citus.task_assignment_policy;
|
||||||
|
|
||||||
|
-- Insert..select is planned differently, make sure we have results everywhere.
|
||||||
|
-- We put the insert..select in a CTE here to prevent the CTE from being moved
|
||||||
|
-- into the select, which would follow the regular code path for select.
|
||||||
|
WITH stats AS (
|
||||||
|
SELECT count(key) m FROM table_3
|
||||||
|
),
|
||||||
|
inserts AS (
|
||||||
|
INSERT INTO table_2
|
||||||
|
SELECT key, count(*)
|
||||||
|
FROM table_1
|
||||||
|
WHERE key > (SELECT m FROM stats)
|
||||||
|
GROUP BY key
|
||||||
|
HAVING count(*) < (SELECT m FROM stats)
|
||||||
|
LIMIT 1
|
||||||
|
RETURNING *
|
||||||
|
) SELECT count(*) FROM inserts;
|
||||||
|
|
||||||
SET citus.task_assignment_policy to DEFAULT;
|
SET citus.task_assignment_policy to DEFAULT;
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
|
@ -110,6 +110,16 @@ ORDER BY
|
||||||
|
|
||||||
DROP VIEW users_view, window_view;
|
DROP VIEW users_view, window_view;
|
||||||
|
|
||||||
|
-- window functions along with subquery in HAVING
|
||||||
|
SELECT
|
||||||
|
user_id, count (user_id) OVER (PARTITION BY user_id)
|
||||||
|
FROM
|
||||||
|
users_table
|
||||||
|
GROUP BY
|
||||||
|
user_id HAVING avg(value_1) < (SELECT min(k_no) FROM users_ref_test_table)
|
||||||
|
ORDER BY 1 DESC,2 DESC
|
||||||
|
LIMIT 1;
|
||||||
|
|
||||||
-- window function uses columns from two different tables
|
-- window function uses columns from two different tables
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
|
DISTINCT ON (events_table.user_id, rnk) events_table.user_id, rank() OVER my_win AS rnk
|
||||||
|
|
Loading…
Reference in New Issue