mirror of https://github.com/citusdata/citus.git
Improve the representation of used sub plans (#3411)
Previously, we've identified the usedSubPlans by only looking to the subPlanId. With this commit, we're expanding it to also include information on the location of the subPlan. This is useful to distinguish the cases where the subPlan is used either on only HAVING or both HAVING and any other part of the query.pull/3425/head
parent
87e6352d5b
commit
4519d3411d
|
@ -1112,23 +1112,49 @@ FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery)
|
||||||
static void
|
static void
|
||||||
RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery)
|
RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery)
|
||||||
{
|
{
|
||||||
/* first, get all the subplans in the query */
|
Node *havingQual = originalQuery->havingQual;
|
||||||
plan->usedSubPlanNodeList = FindSubPlansUsedInNode((Node *) originalQuery);
|
|
||||||
|
/* temporarily set to NULL, we're going to restore before the function returns */
|
||||||
|
originalQuery->havingQual = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Later, remove the subplans used in the HAVING clause, because they
|
* Mark the subplans as needed on remote side. Note that this decision is revisited
|
||||||
* are only required in the coordinator. Including them in the
|
* on execution, when the query only consists of intermediate results.
|
||||||
* usedSubPlanNodeList prevents the intermediate results to be sent to the
|
|
||||||
* coordinator only.
|
|
||||||
*/
|
*/
|
||||||
if (originalQuery->hasSubLinks &&
|
List *subplansExceptHaving = FindSubPlansUsedInNode((Node *) originalQuery);
|
||||||
FindNodeCheck(originalQuery->havingQual, IsNodeSubquery))
|
UpdateUsedPlanListLocation(subplansExceptHaving, SUBPLAN_ACCESS_REMOTE);
|
||||||
{
|
|
||||||
List *subplansInHaving = FindSubPlansUsedInNode(originalQuery->havingQual);
|
|
||||||
|
|
||||||
plan->usedSubPlanNodeList =
|
/* do the same for HAVING part of the query */
|
||||||
list_difference(plan->usedSubPlanNodeList, subplansInHaving);
|
List *subplansInHaving = NIL;
|
||||||
|
if (originalQuery->hasSubLinks &&
|
||||||
|
FindNodeCheck(havingQual, IsNodeSubquery))
|
||||||
|
{
|
||||||
|
subplansInHaving = FindSubPlansUsedInNode(havingQual);
|
||||||
|
if (plan->masterQuery)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we have the master query, we're sure that the result is needed locally.
|
||||||
|
* Otherwise, such as router queries, the plan may not be required locally.
|
||||||
|
* Note that if the query consists of only intermediate results, the executor
|
||||||
|
* may still prefer to write locally.
|
||||||
|
*
|
||||||
|
* If any of the subplansInHaving is used in other parts of the query,
|
||||||
|
* we'll later merge it those subPlans and send it to remote.
|
||||||
|
*/
|
||||||
|
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_LOCAL);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_REMOTE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* set back the havingQual and the calculated subplans */
|
||||||
|
originalQuery->havingQual = havingQual;
|
||||||
|
|
||||||
|
/* merge the used subplans */
|
||||||
|
plan->usedSubPlanNodeList =
|
||||||
|
MergeUsedSubPlanLists(subplansExceptHaving, subplansInHaving);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -31,23 +31,29 @@ static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry
|
||||||
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
|
||||||
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
|
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
|
||||||
List *workerNodeList);
|
List *workerNodeList);
|
||||||
|
static bool UsedSubPlansEqual(UsedDistributedSubPlan *left,
|
||||||
|
UsedDistributedSubPlan *right);
|
||||||
|
static UsedDistributedSubPlan * UsedSubPlanListMember(List *list,
|
||||||
|
UsedDistributedSubPlan *usedPlan);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing
|
* FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing
|
||||||
* the range table entries in the plan.
|
* the input node.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
FindSubPlansUsedInNode(Node *node)
|
FindSubPlansUsedInNode(Node *node)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
List *subPlanList = NIL;
|
List *usedSubPlanList = NIL;
|
||||||
|
|
||||||
ExtractRangeTableEntryWalker(node, &rangeTableList);
|
ExtractRangeTableEntryWalker(node, &rangeTableList);
|
||||||
|
|
||||||
foreach(rangeTableCell, rangeTableList)
|
foreach(rangeTableCell, rangeTableList)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = lfirst(rangeTableCell);
|
RangeTblEntry *rangeTableEntry = lfirst(rangeTableCell);
|
||||||
|
|
||||||
if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
if (rangeTableEntry->rtekind == RTE_FUNCTION)
|
||||||
{
|
{
|
||||||
char *resultId =
|
char *resultId =
|
||||||
|
@ -62,12 +68,21 @@ FindSubPlansUsedInNode(Node *node)
|
||||||
* Use a Value to be able to use list_append_unique and store
|
* Use a Value to be able to use list_append_unique and store
|
||||||
* the result ID in the DistributedPlan.
|
* the result ID in the DistributedPlan.
|
||||||
*/
|
*/
|
||||||
Value *resultIdValue = makeString(resultId);
|
UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan);
|
||||||
subPlanList = list_append_unique(subPlanList, resultIdValue);
|
|
||||||
|
usedPlan->subPlanId = pstrdup(resultId);
|
||||||
|
|
||||||
|
/* the callers are responsible for setting the accurate location */
|
||||||
|
usedPlan->locationMask = SUBPLAN_ACCESS_NONE;
|
||||||
|
|
||||||
|
if (!UsedSubPlanListMember(usedSubPlanList, usedPlan))
|
||||||
|
{
|
||||||
|
usedSubPlanList = lappend(usedSubPlanList, usedPlan);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return subPlanList;
|
return usedSubPlanList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -88,26 +103,36 @@ void
|
||||||
RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
DistributedPlan *distributedPlan)
|
DistributedPlan *distributedPlan)
|
||||||
{
|
{
|
||||||
Value *usedSubPlanIdValue = NULL;
|
|
||||||
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList;
|
||||||
List *subPlanList = distributedPlan->subPlanList;
|
List *subPlanList = distributedPlan->subPlanList;
|
||||||
ListCell *subPlanCell = NULL;
|
ListCell *subPlanCell = NULL;
|
||||||
int workerNodeCount = GetWorkerNodeCount();
|
int workerNodeCount = GetWorkerNodeCount();
|
||||||
|
|
||||||
foreach_ptr(usedSubPlanIdValue, usedSubPlanNodeList)
|
foreach(subPlanCell, usedSubPlanNodeList)
|
||||||
{
|
{
|
||||||
char *resultId = strVal(usedSubPlanIdValue);
|
UsedDistributedSubPlan *usedPlan = lfirst(subPlanCell);
|
||||||
|
|
||||||
|
char *resultId = usedPlan->subPlanId;
|
||||||
|
|
||||||
IntermediateResultsHashEntry *entry = SearchIntermediateResult(
|
IntermediateResultsHashEntry *entry = SearchIntermediateResult(
|
||||||
intermediateResultsHash, resultId);
|
intermediateResultsHash, resultId);
|
||||||
|
|
||||||
/* no need to traverse the whole plan if all the workers are hit */
|
if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL)
|
||||||
|
{
|
||||||
|
/* subPlan needs to be written locally as the planner decided */
|
||||||
|
entry->writeLocalFile = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* There is no need to traverse the whole plan if the intermediate result
|
||||||
|
* will be written to a local file and send to all nodes
|
||||||
|
*/
|
||||||
if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile)
|
if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile)
|
||||||
{
|
{
|
||||||
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
elog(DEBUG4, "Subplan %s is used in all workers", resultId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else
|
else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* traverse the plan and add find all worker nodes
|
* traverse the plan and add find all worker nodes
|
||||||
|
@ -221,20 +246,6 @@ FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash,
|
||||||
SearchIntermediateResult(intermediateResultsHash, resultId);
|
SearchIntermediateResult(intermediateResultsHash, resultId);
|
||||||
|
|
||||||
List *remoteWorkerNodes = FindAllRemoteWorkerNodesUsingSubplan(entry);
|
List *remoteWorkerNodes = FindAllRemoteWorkerNodesUsingSubplan(entry);
|
||||||
if (remoteWorkerNodes == NIL)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* This could happen in two cases:
|
|
||||||
* (a) Subquery in the having
|
|
||||||
* (b) The intermediate result is not used, such as RETURNING of a
|
|
||||||
* modifying CTE is not used
|
|
||||||
*
|
|
||||||
* For SELECT, Postgres/Citus is clever enough to not execute the CTE
|
|
||||||
* if it is not used at all, but for modifications we have to execute
|
|
||||||
* the queries.
|
|
||||||
*/
|
|
||||||
entry->writeLocalFile = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Don't include the current worker if the result will be written to local
|
* Don't include the current worker if the result will be written to local
|
||||||
|
@ -369,3 +380,97 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId)
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MergeUsedSubPlanLists is a utility function that merges the two input
|
||||||
|
* UsedSubPlan lists. Existence of the items of the rightSubPlanList
|
||||||
|
* checked in leftSubPlanList. If not found, items are added to
|
||||||
|
* leftSubPlanList. If found, the locationMask fields are merged.
|
||||||
|
*
|
||||||
|
* Finally, the in-place modified leftSubPlanList is returned.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList)
|
||||||
|
{
|
||||||
|
ListCell *rightListCell;
|
||||||
|
|
||||||
|
foreach(rightListCell, rightSubPlanList)
|
||||||
|
{
|
||||||
|
UsedDistributedSubPlan *memberOnRightList = lfirst(rightListCell);
|
||||||
|
UsedDistributedSubPlan *memberOnLeftList =
|
||||||
|
UsedSubPlanListMember(leftSubPlanList, memberOnRightList);
|
||||||
|
|
||||||
|
if (memberOnLeftList == NULL)
|
||||||
|
{
|
||||||
|
leftSubPlanList = lappend(leftSubPlanList, memberOnRightList);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memberOnLeftList->locationMask |= memberOnRightList->locationMask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return leftSubPlanList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UsedSubPlanListMember is a utility function inspired from list_member(),
|
||||||
|
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
|
||||||
|
* function defined (similar to all Citus node types).
|
||||||
|
*/
|
||||||
|
static UsedDistributedSubPlan *
|
||||||
|
UsedSubPlanListMember(List *usedSubPlanList, UsedDistributedSubPlan *usedPlan)
|
||||||
|
{
|
||||||
|
const ListCell *usedSubPlanCell;
|
||||||
|
|
||||||
|
foreach(usedSubPlanCell, usedSubPlanList)
|
||||||
|
{
|
||||||
|
if (UsedSubPlansEqual(lfirst(usedSubPlanCell), usedPlan))
|
||||||
|
{
|
||||||
|
return lfirst(usedSubPlanCell);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateUsedPlanListLocation is a utility function which iterates over the list
|
||||||
|
* and updates the subPlanLocation to the input location.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
UpdateUsedPlanListLocation(List *subPlanList, int locationMask)
|
||||||
|
{
|
||||||
|
ListCell *subPlanCell = NULL;
|
||||||
|
foreach(subPlanCell, subPlanList)
|
||||||
|
{
|
||||||
|
UsedDistributedSubPlan *subPlan = lfirst(subPlanCell);
|
||||||
|
|
||||||
|
subPlan->locationMask |= locationMask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UsedSubPlansEqual is a utility function inspired from equal(),
|
||||||
|
* but operating on UsedDistributedSubPlan struct, which doesn't have equal()
|
||||||
|
* function defined (similar to all Citus node types).
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
UsedSubPlansEqual(UsedDistributedSubPlan *left, UsedDistributedSubPlan *right)
|
||||||
|
{
|
||||||
|
if (left == NULL || right == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(left->subPlanId, right->subPlanId, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
|
@ -133,6 +133,16 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS)
|
||||||
|
{
|
||||||
|
DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan);
|
||||||
|
|
||||||
|
COPY_STRING_FIELD(subPlanId);
|
||||||
|
COPY_SCALAR_FIELD(locationMask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
CopyNodeShardInterval(COPYFUNC_ARGS)
|
CopyNodeShardInterval(COPYFUNC_ARGS)
|
||||||
{
|
{
|
||||||
|
|
|
@ -35,6 +35,7 @@ static const char *CitusNodeTagNamesD[] = {
|
||||||
"MapMergeJob",
|
"MapMergeJob",
|
||||||
"DistributedPlan",
|
"DistributedPlan",
|
||||||
"DistributedSubPlan",
|
"DistributedSubPlan",
|
||||||
|
"UsedDistributedSubPlan",
|
||||||
"Task",
|
"Task",
|
||||||
"LocalPlannedStatement",
|
"LocalPlannedStatement",
|
||||||
"TaskExecution",
|
"TaskExecution",
|
||||||
|
@ -382,6 +383,7 @@ const ExtensibleNodeMethods nodeMethods[] =
|
||||||
{
|
{
|
||||||
DEFINE_NODE_METHODS(DistributedPlan),
|
DEFINE_NODE_METHODS(DistributedPlan),
|
||||||
DEFINE_NODE_METHODS(DistributedSubPlan),
|
DEFINE_NODE_METHODS(DistributedSubPlan),
|
||||||
|
DEFINE_NODE_METHODS(UsedDistributedSubPlan),
|
||||||
DEFINE_NODE_METHODS(Job),
|
DEFINE_NODE_METHODS(Job),
|
||||||
DEFINE_NODE_METHODS(ShardInterval),
|
DEFINE_NODE_METHODS(ShardInterval),
|
||||||
DEFINE_NODE_METHODS(MapMergeJob),
|
DEFINE_NODE_METHODS(MapMergeJob),
|
||||||
|
|
|
@ -212,6 +212,17 @@ OutDistributedSubPlan(OUTFUNC_ARGS)
|
||||||
WRITE_NODE_FIELD(plan);
|
WRITE_NODE_FIELD(plan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
OutUsedDistributedSubPlan(OUTFUNC_ARGS)
|
||||||
|
{
|
||||||
|
WRITE_LOCALS(UsedDistributedSubPlan);
|
||||||
|
|
||||||
|
WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
|
||||||
|
|
||||||
|
WRITE_STRING_FIELD(subPlanId);
|
||||||
|
WRITE_INT_FIELD(locationMask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
OutMultiProject(OUTFUNC_ARGS)
|
OutMultiProject(OUTFUNC_ARGS)
|
||||||
|
|
|
@ -244,6 +244,16 @@ ReadDistributedSubPlan(READFUNC_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
READFUNC_RET
|
||||||
|
ReadUsedDistributedSubPlan(READFUNC_ARGS)
|
||||||
|
{
|
||||||
|
READ_LOCALS(UsedDistributedSubPlan);
|
||||||
|
|
||||||
|
READ_STRING_FIELD(subPlanId);
|
||||||
|
READ_INT_FIELD(locationMask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
READFUNC_RET
|
READFUNC_RET
|
||||||
ReadShardInterval(READFUNC_ARGS)
|
ReadShardInterval(READFUNC_ARGS)
|
||||||
{
|
{
|
||||||
|
|
|
@ -45,6 +45,7 @@ extern void RegisterNodes(void);
|
||||||
extern READFUNC_RET ReadJob(READFUNC_ARGS);
|
extern READFUNC_RET ReadJob(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS);
|
extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadDistributedSubPlan(READFUNC_ARGS);
|
extern READFUNC_RET ReadDistributedSubPlan(READFUNC_ARGS);
|
||||||
|
extern READFUNC_RET ReadUsedDistributedSubPlan(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
|
extern READFUNC_RET ReadShardInterval(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
|
extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS);
|
||||||
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
|
extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS);
|
||||||
|
@ -61,6 +62,7 @@ extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS);
|
||||||
extern void OutJob(OUTFUNC_ARGS);
|
extern void OutJob(OUTFUNC_ARGS);
|
||||||
extern void OutDistributedPlan(OUTFUNC_ARGS);
|
extern void OutDistributedPlan(OUTFUNC_ARGS);
|
||||||
extern void OutDistributedSubPlan(OUTFUNC_ARGS);
|
extern void OutDistributedSubPlan(OUTFUNC_ARGS);
|
||||||
|
extern void OutUsedDistributedSubPlan(OUTFUNC_ARGS);
|
||||||
extern void OutShardInterval(OUTFUNC_ARGS);
|
extern void OutShardInterval(OUTFUNC_ARGS);
|
||||||
extern void OutMapMergeJob(OUTFUNC_ARGS);
|
extern void OutMapMergeJob(OUTFUNC_ARGS);
|
||||||
extern void OutShardPlacement(OUTFUNC_ARGS);
|
extern void OutShardPlacement(OUTFUNC_ARGS);
|
||||||
|
@ -86,6 +88,7 @@ extern void OutMultiExtendedOp(OUTFUNC_ARGS);
|
||||||
extern void CopyNodeJob(COPYFUNC_ARGS);
|
extern void CopyNodeJob(COPYFUNC_ARGS);
|
||||||
extern void CopyNodeDistributedPlan(COPYFUNC_ARGS);
|
extern void CopyNodeDistributedPlan(COPYFUNC_ARGS);
|
||||||
extern void CopyNodeDistributedSubPlan(COPYFUNC_ARGS);
|
extern void CopyNodeDistributedSubPlan(COPYFUNC_ARGS);
|
||||||
|
extern void CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS);
|
||||||
extern void CopyNodeShardInterval(COPYFUNC_ARGS);
|
extern void CopyNodeShardInterval(COPYFUNC_ARGS);
|
||||||
extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
|
extern void CopyNodeMapMergeJob(COPYFUNC_ARGS);
|
||||||
extern void CopyNodeShardPlacement(COPYFUNC_ARGS);
|
extern void CopyNodeShardPlacement(COPYFUNC_ARGS);
|
||||||
|
|
|
@ -57,6 +57,7 @@ typedef enum CitusNodeTag
|
||||||
T_MapMergeJob,
|
T_MapMergeJob,
|
||||||
T_DistributedPlan,
|
T_DistributedPlan,
|
||||||
T_DistributedSubPlan,
|
T_DistributedSubPlan,
|
||||||
|
T_UsedDistributedSubPlan,
|
||||||
T_Task,
|
T_Task,
|
||||||
T_LocalPlannedStatement,
|
T_LocalPlannedStatement,
|
||||||
T_TaskExecution,
|
T_TaskExecution,
|
||||||
|
|
|
@ -29,5 +29,8 @@ extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
|
||||||
extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash,
|
extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash,
|
||||||
char *resultId);
|
char *resultId);
|
||||||
|
|
||||||
|
/* utility functions related to UsedSubPlans */
|
||||||
|
extern List * MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList);
|
||||||
|
extern void UpdateUsedPlanListLocation(List *subPlanList, int localtionMask);
|
||||||
|
|
||||||
#endif /* INTERMEDIATE_RESULT_PRUNING_H */
|
#endif /* INTERMEDIATE_RESULT_PRUNING_H */
|
||||||
|
|
|
@ -44,6 +44,10 @@
|
||||||
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
|
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
|
||||||
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)"
|
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)"
|
||||||
|
|
||||||
|
#define SUBPLAN_ACCESS_NONE 0
|
||||||
|
#define SUBPLAN_ACCESS_LOCAL 1
|
||||||
|
#define SUBPLAN_ACCESS_REMOTE 2
|
||||||
|
|
||||||
|
|
||||||
typedef enum CitusRTEKind
|
typedef enum CitusRTEKind
|
||||||
{
|
{
|
||||||
|
@ -388,6 +392,19 @@ typedef struct DistributedSubPlan
|
||||||
} DistributedSubPlan;
|
} DistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UsedDistributedSubPlan contains information about a subPlan that is used in a
|
||||||
|
* distributed plan.
|
||||||
|
*/
|
||||||
|
typedef struct UsedDistributedSubPlan
|
||||||
|
{
|
||||||
|
CitusNode type;
|
||||||
|
|
||||||
|
char *subPlanId;
|
||||||
|
int locationMask;
|
||||||
|
} UsedDistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
/* OperatorCacheEntry contains information for each element in OperatorCache */
|
||||||
typedef struct OperatorCacheEntry
|
typedef struct OperatorCacheEntry
|
||||||
{
|
{
|
||||||
|
|
|
@ -225,18 +225,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
GROUP BY key
|
GROUP BY key
|
||||||
HAVING (max(table_2.value) > (SELECT value FROM a));
|
HAVING (max(table_2.value) > (SELECT value FROM a));
|
||||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
ERROR: result "31_1" does not exist
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING -- but used in another subquery/aggregate
|
-- table and used in HAVING -- but used in another subquery/aggregate
|
||||||
-- so one more level of recursive planning
|
-- so one more level of recursive planning
|
||||||
|
@ -330,6 +334,130 @@ NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT inte
|
||||||
4 | 4
|
4 | 4
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1
|
||||||
|
DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_4 will be written to local file
|
||||||
|
NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
-- now use the same queries on a worker
|
-- now use the same queries on a worker
|
||||||
SET search_path TO locally_execute_intermediate_results;
|
SET search_path TO locally_execute_intermediate_results;
|
||||||
|
@ -524,18 +652,21 @@ DEBUG: Subplan XXX_3 will be written to local file
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
GROUP BY key
|
GROUP BY key
|
||||||
HAVING (max(table_2.value) > (SELECT value FROM a));
|
HAVING (max(table_2.value) > (SELECT value FROM a));
|
||||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
ERROR: result "28_1" does not exist
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING -- but used in another subquery/aggregate
|
-- table and used in HAVING -- but used in another subquery/aggregate
|
||||||
-- so one more level of recursive planning
|
-- so one more level of recursive planning
|
||||||
|
@ -642,6 +773,128 @@ DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
4 | 4
|
4 | 4
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1
|
||||||
|
DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_4 will be written to local file
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- finally, use round-robin policy on the workers with same set of queries
|
-- finally, use round-robin policy on the workers with same set of queries
|
||||||
set citus.task_assignment_policy TO "round-robin" ;
|
set citus.task_assignment_policy TO "round-robin" ;
|
||||||
-- the query cannot be executed locally, but still because of
|
-- the query cannot be executed locally, but still because of
|
||||||
|
@ -727,7 +980,7 @@ DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM lo
|
||||||
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
count
|
count
|
||||||
|
@ -758,7 +1011,6 @@ DEBUG: Subplan XXX_2 will be written to local file
|
||||||
-- multiple CTEs are joined inside HAVING, so written to file
|
-- multiple CTEs are joined inside HAVING, so written to file
|
||||||
-- locally, also the join tree contains only another CTE, so should be
|
-- locally, also the join tree contains only another CTE, so should be
|
||||||
-- executed locally, but not on an Citus MX worker
|
-- executed locally, but not on an Citus MX worker
|
||||||
-- TODO: fixed by #3396
|
|
||||||
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
cte_2 AS (SELECT max(value) FROM table_1),
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
cte_3 AS (SELECT * FROM table_2)
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
@ -772,11 +1024,15 @@ DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM lo
|
||||||
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_2 will be written to local file
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
ERROR: result "66_1" does not exist
|
count
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
-- now, the CTE is going to be written locally,
|
-- now, the CTE is going to be written locally,
|
||||||
-- plus that could have been read locally on the coordinator
|
-- plus that could have been read locally on the coordinator
|
||||||
-- because of the aggragate over the cte in HAVING
|
-- because of the aggragate over the cte in HAVING
|
||||||
|
@ -815,7 +1071,7 @@ DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELEC
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text)))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text)))
|
||||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
DEBUG: Subplan XXX_3 will be written to local file
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
|
@ -825,18 +1081,21 @@ DEBUG: Subplan XXX_3 will be written to local file
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
GROUP BY key
|
GROUP BY key
|
||||||
HAVING (max(table_2.value) > (SELECT value FROM a));
|
HAVING (max(table_2.value) > (SELECT value FROM a));
|
||||||
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1
|
DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1
|
||||||
|
DEBUG: push down of limit count: 1
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1))
|
||||||
DEBUG: Subplan XXX_1 will be written to local file
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
ERROR: result "77_1" does not exist
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
count | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING -- but used in another subquery/aggregate
|
-- table and used in HAVING -- but used in another subquery/aggregate
|
||||||
-- so one more level of recursive planning
|
-- so one more level of recursive planning
|
||||||
|
@ -943,6 +1202,140 @@ DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
4 | 4
|
4 | 4
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- the same query as above, try to hit local node with either of the queries
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max))))
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
ERROR: syntax error at or near "-"
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1
|
||||||
|
DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2
|
||||||
|
DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1
|
||||||
|
DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key)
|
||||||
|
DEBUG: Subplan XXX_1 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_2 will be written to local file
|
||||||
|
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
|
||||||
|
key | key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
DROP SCHEMA locally_execute_intermediate_results CASCADE;
|
DROP SCHEMA locally_execute_intermediate_results CASCADE;
|
||||||
|
|
|
@ -125,8 +125,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1);
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
|
@ -172,6 +171,67 @@ cte_2 AS (SELECT * FROM cte_1),
|
||||||
cte_3 AS (SELECT max(key) as key FROM cte_2)
|
cte_3 AS (SELECT max(key) as key FROM cte_2)
|
||||||
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
@ -287,8 +347,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1);
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
|
@ -346,6 +405,67 @@ cte_3 AS (SELECT max(key) as key FROM cte_2)
|
||||||
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
||||||
|
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
-- finally, use round-robin policy on the workers with same set of queries
|
-- finally, use round-robin policy on the workers with same set of queries
|
||||||
set citus.task_assignment_policy TO "round-robin" ;
|
set citus.task_assignment_policy TO "round-robin" ;
|
||||||
-- the query cannot be executed locally, but still because of
|
-- the query cannot be executed locally, but still because of
|
||||||
|
@ -413,7 +533,6 @@ HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max));
|
||||||
-- multiple CTEs are joined inside HAVING, so written to file
|
-- multiple CTEs are joined inside HAVING, so written to file
|
||||||
-- locally, also the join tree contains only another CTE, so should be
|
-- locally, also the join tree contains only another CTE, so should be
|
||||||
-- executed locally, but not on an Citus MX worker
|
-- executed locally, but not on an Citus MX worker
|
||||||
-- TODO: fixed by #3396
|
|
||||||
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
cte_2 AS (SELECT max(value) FROM table_1),
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
cte_3 AS (SELECT * FROM table_2)
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
@ -450,8 +569,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1);
|
||||||
|
|
||||||
-- this time the same CTE is both joined with a distributed
|
-- this time the same CTE is both joined with a distributed
|
||||||
-- table and used in HAVING
|
-- table and used in HAVING
|
||||||
-- TODO: fixed by #3396
|
WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1)
|
||||||
WITH a AS (SELECT * FROM table_1)
|
|
||||||
SELECT count(*),
|
SELECT count(*),
|
||||||
key
|
key
|
||||||
FROM a JOIN table_2 USING (key)
|
FROM a JOIN table_2 USING (key)
|
||||||
|
@ -509,6 +627,80 @@ cte_3 AS (SELECT max(key) as key FROM cte_2)
|
||||||
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
SELECT * FROM cte_3 JOIN ref_table USING (key);
|
||||||
|
|
||||||
|
|
||||||
|
-- some cases around router queries
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY KEY
|
||||||
|
HAVING max(value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with another cte
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
-- the same query as above, try to hit local node with either of the queries
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
|
||||||
|
-- a router query, but the having has two cte joins
|
||||||
|
-- and the jointree has a join with the same CTEs
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_3 AS (SELECT * FROM table_2)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX)
|
||||||
|
WHERE KEY = 3
|
||||||
|
GROUP BY table_2.KEY
|
||||||
|
HAVING max(table_2.value) >
|
||||||
|
(SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX));
|
||||||
|
|
||||||
|
- subPlans needed remotely as the subquery is pushed down
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key = bar.key;
|
||||||
|
|
||||||
|
-- the second subquery needs to be recursively planned due to non-colocated subquery join
|
||||||
|
-- so cte_2 becomes part of master query of that recursive subquery planning
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
|
|
||||||
|
-- now, forcing all subqueries to be on the local node
|
||||||
|
WITH cte_1 AS (SELECT max(value) FROM table_1),
|
||||||
|
cte_2 AS (SELECT max(value) FROM table_2)
|
||||||
|
SELECT * FROM
|
||||||
|
(SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo,
|
||||||
|
(SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar
|
||||||
|
WHERE foo.key != bar.key;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
|
|
Loading…
Reference in New Issue