diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index a9636a0c7..bca6691c1 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -398,3 +398,89 @@ ScanStateGetExecutorState(CitusScanState *scanState) { return scanState->customScanState.ss.ps.state; } + + +/* + * FetchCitusCustomScanIfExists traverses a given plan and returns a Citus CustomScan + * if it has any. + */ +CustomScan * +FetchCitusCustomScanIfExists(Plan *plan) +{ + CustomScan *customScan = NULL; + + if (plan == NULL) + { + return NULL; + } + + if (IsCitusCustomScan(plan)) + { + return (CustomScan *) plan; + } + + customScan = FetchCitusCustomScanIfExists(plan->lefttree); + + if (customScan == NULL) + { + customScan = FetchCitusCustomScanIfExists(plan->righttree); + } + + return customScan; +} + + +/* + * IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus + * by recursively walking through the plan tree. + */ +bool +IsCitusPlan(Plan *plan) +{ + if (plan == NULL) + { + return false; + } + + if (IsCitusCustomScan(plan)) + { + return true; + } + + return IsCitusPlan(plan->lefttree) || IsCitusPlan(plan->righttree); +} + + +/* + * IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus. + */ +bool +IsCitusCustomScan(Plan *plan) +{ + CustomScan *customScan = NULL; + Node *privateNode = NULL; + + if (plan == NULL) + { + return false; + } + + if (!IsA(plan, CustomScan)) + { + return false; + } + + customScan = (CustomScan *) plan; + if (list_length(customScan->custom_private) == 0) + { + return false; + } + + privateNode = (Node *) linitial(customScan->custom_private); + if (!CitusIsA(privateNode, DistributedPlan)) + { + return false; + } + + return true; +} diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 19006e196..6ff22c1ef 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -58,8 +58,6 @@ bool SortReturning = false; /* local function forward declarations */ -static bool IsCitusPlan(Plan *plan); -static bool IsCitusCustomScan(Plan *plan); static Relation StubRelation(TupleDesc tupleDescriptor); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan); @@ -189,72 +187,6 @@ CitusExecutorRun(QueryDesc *queryDesc, } -/* - * IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus - * by recursively walking through the plan tree. - */ -static bool -IsCitusPlan(Plan *plan) -{ - if (plan == NULL) - { - return false; - } - - if (IsCitusCustomScan(plan)) - { - return true; - } - - if (plan->lefttree != NULL && IsCitusPlan(plan->lefttree)) - { - return true; - } - - if (plan->righttree != NULL && IsCitusPlan(plan->righttree)) - { - return true; - } - - return false; -} - - -/* - * IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus. - */ -static bool -IsCitusCustomScan(Plan *plan) -{ - CustomScan *customScan = NULL; - Node *privateNode = NULL; - - if (plan == NULL) - { - return false; - } - - if (!IsA(plan, CustomScan)) - { - return false; - } - - customScan = (CustomScan *) plan; - if (list_length(customScan->custom_private) == 0) - { - return false; - } - - privateNode = (Node *) linitial(customScan->custom_private); - if (!CitusIsA(privateNode, DistributedPlan)) - { - return false; - } - - return true; -} - - /* * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the * given Citus scan node and returns it. It returns null if all tuples are read diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index 81525d4f1..6f3059d10 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -10,6 +10,7 @@ #include "postgres.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_results.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" @@ -35,12 +36,7 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) uint64 planId = distributedPlan->planId; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - List *nodeList = NIL; - - /* If you're not a worker node, you should write local file to make sure - * you have the data too */ - bool writeLocalFile = GetLocalGroupId() == 0; - + HTAB *intermediateResultsHash = NULL; if (subPlanList == NIL) { @@ -48,6 +44,10 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) return; } + intermediateResultsHash = MakeIntermediateResultHTAB(); + RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); + + /* * Make sure that this transaction has a distributed transaction ID. * @@ -56,8 +56,6 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) */ BeginOrContinueCoordinatedTransaction(); - nodeList = ActiveReadableWorkerNodeList(); - foreach(subPlanCell, subPlanList) { DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); @@ -66,14 +64,39 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) DestReceiver *copyDest = NULL; ParamListInfo params = NULL; EState *estate = NULL; - + bool writeLocalFile = false; char *resultId = GenerateResultId(planId, subPlanId); + List *workerNodeList = + FindAllWorkerNodesUsingSubplan(intermediateResultsHash, resultId); + + /* + * Write intermediate results to local file only if there is no worker + * node that receives them. + * + * This could happen in two cases: + * (a) Subquery in the having + * (b) The intermediate result is not used, such as RETURNING of a + * modifying CTE is not used + * + * For SELECT, Postgres/Citus is clever enough to not execute the CTE + * if it is not used at all, but for modifications we have to execute + * the queries. + */ + if (workerNodeList == NIL) + { + writeLocalFile = true; + + if ((LogIntermediateResults && IsLoggableLevel(DEBUG1)) || + IsLoggableLevel(DEBUG4)) + { + elog(DEBUG1, "Subplan %s will be written to local file", resultId); + } + } SubPlanLevel++; estate = CreateExecutorState(); - copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate, - nodeList, - writeLocalFile); + copyDest = CreateRemoteFileDestReceiver(resultId, estate, workerNodeList, + writeLocalFile); ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index f592d851e..97d208903 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -20,11 +20,13 @@ #include "distributed/citus_nodes.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/intermediate_results.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/distributed_planner.h" +#include "distributed/query_pushdown_planning.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" @@ -73,6 +75,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue bool hasUnresolvedParams, PlannerRestrictionContext * plannerRestrictionContext); +static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery); +static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery); static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid relationId); @@ -617,6 +621,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi List *subPlanList = NIL; bool hasCtes = originalQuery->cteList != NIL; + if (IsModifyCommand(originalQuery)) { Oid targetRelationId = InvalidOid; @@ -652,6 +657,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi if (distributedPlan->planningError == NULL) { + FinalizeDistributedPlan(distributedPlan, originalQuery); + return distributedPlan; } else @@ -672,7 +679,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); if (distributedPlan->planningError == NULL) { - /* successfully created a router plan */ + FinalizeDistributedPlan(distributedPlan, originalQuery); + return distributedPlan; } else @@ -765,6 +773,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi plannerRestrictionContext); distributedPlan->subPlanList = subPlanList; + FinalizeDistributedPlan(distributedPlan, originalQuery); + return distributedPlan; } @@ -775,6 +785,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi */ if (IsModifyCommand(originalQuery)) { + FinalizeDistributedPlan(distributedPlan, originalQuery); + return distributedPlan; } @@ -806,10 +818,54 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* distributed plan currently should always succeed or error out */ Assert(distributedPlan && distributedPlan->planningError == NULL); + FinalizeDistributedPlan(distributedPlan, originalQuery); + return distributedPlan; } +/* + * FinalizeDistributedPlan is the final step of distributed planning. The function + * currently only implements some optimizations for intermediate result(s) pruning. + */ +static void +FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) +{ + RecordSubPlansUsedInPlan(plan, originalQuery); +} + + +/* + * RecordSubPlansUsedInPlan gets a distributed plan a queryTree, and + * updates the usedSubPlanNodeList of the distributed plan. + * + * The function simply pulls all the subPlans that are used in the queryTree + * with one exception: subPlans in the HAVING clause. The reason is explained + * in the function. + */ +static void +RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery) +{ + /* first, get all the subplans in the query */ + plan->usedSubPlanNodeList = FindSubPlansUsedInNode((Node *) originalQuery); + + /* + * Later, remove the subplans used in the HAVING clause, because they + * are only required in the coordinator. Including them in the + * usedSubPlanNodeList prevents the intermediate results to be sent to the + * coordinator only. + */ + if (originalQuery->hasSubLinks && + FindNodeCheck(originalQuery->havingQual, IsNodeSubquery)) + { + List *subplansInHaving = FindSubPlansUsedInNode(originalQuery->havingQual); + + plan->usedSubPlanNodeList = + list_difference(plan->usedSubPlanNodeList, subplansInHaving); + } +} + + /* * EnsurePartitionTableNotReplicated errors out if the infput relation is * a partition table and the table has a replication factor greater than diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c new file mode 100644 index 000000000..8a0909bb1 --- /dev/null +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -0,0 +1,258 @@ +/*------------------------------------------------------------------------- + * + * intermediate_result_pruning.c + * Functions for pruning intermediate result broadcasting. + * + * We only send intermediate results of subqueries and CTEs to worker nodes + * that use them in the remainder of the distributed plan to avoid unnecessary + * network traffic. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "distributed/citus_custom_scan.h" +#include "distributed/intermediate_result_pruning.h" +#include "distributed/listutils.h" +#include "distributed/log_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/query_utils.h" +#include "distributed/worker_manager.h" +#include "utils/builtins.h" + +/* controlled via GUC, used mostly for testing */ +bool LogIntermediateResults = false; + +static List * AppendAllAccessedWorkerNodes(List *workerNodeList, + DistributedPlan *distributedPlan, + int workerNodeCount); +static IntermediateResultsHashEntry * SearchIntermediateResult(HTAB + *intermediateResultsHash, + char *resultId); + + +/* + * FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing + * the range table entries in the plan. + */ +List * +FindSubPlansUsedInNode(Node *node) +{ + List *rangeTableList = NIL; + ListCell *rangeTableCell = NULL; + List *subPlanList = NIL; + + ExtractRangeTableEntryWalker(node, &rangeTableList); + + foreach(rangeTableCell, rangeTableList) + { + RangeTblEntry *rangeTableEntry = lfirst(rangeTableCell); + if (rangeTableEntry->rtekind == RTE_FUNCTION) + { + char *resultId = + FindIntermediateResultIdIfExists(rangeTableEntry); + Value *resultIdValue = NULL; + + if (resultId == NULL) + { + continue; + } + + /* + * Use a Value to be able to use list_append_unique and store + * the result ID in the DistributedPlan. + */ + resultIdValue = makeString(resultId); + subPlanList = list_append_unique(subPlanList, resultIdValue); + } + } + + return subPlanList; +} + + +/* + * RecordSubplanExecutionsOnNodes iterates over the usedSubPlanNodeList, + * and for each entry, record the workerNodes that are accessed by + * the distributed plan. + * + * Later, we'll use this information while we broadcast the intermediate + * results to the worker nodes. The idea is that the intermediate result + * should only be broadcasted to the worker nodes that are accessed by + * the distributedPlan(s) that the subPlan is used in. + * + * Finally, the function recursively descends into the actual subplans + * of the input distributedPlan as well. + */ +void +RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, + DistributedPlan *distributedPlan) +{ + Value *usedSubPlanIdValue = NULL; + List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; + List *subPlanList = distributedPlan->subPlanList; + ListCell *subPlanCell = NULL; + int workerNodeCount = GetWorkerNodeCount(); + + foreach_ptr(usedSubPlanIdValue, usedSubPlanNodeList) + { + char *resultId = strVal(usedSubPlanIdValue); + + IntermediateResultsHashEntry *entry = SearchIntermediateResult( + intermediateResultsHash, resultId); + + /* no need to traverse the whole plan if all the workers are hit */ + if (list_length(entry->nodeIdList) == workerNodeCount) + { + elog(DEBUG4, "Subplan %s is used in all workers", resultId); + + break; + } + else + { + /* + * traverse the plan and add find all worker nodes + * + * If we have reference tables in the distributed plan, all the + * workers will be in the node list. We can improve intermediate result + * pruning by deciding which reference table shard will be accessed earlier + */ + entry->nodeIdList = AppendAllAccessedWorkerNodes(entry->nodeIdList, + distributedPlan, + workerNodeCount); + + elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId); + } + } + + /* descend into the subPlans */ + foreach(subPlanCell, subPlanList) + { + DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell); + CustomScan *customScan = FetchCitusCustomScanIfExists(subPlan->plan->planTree); + if (customScan) + { + DistributedPlan *distributedPlanOfSubPlan = GetDistributedPlan(customScan); + RecordSubplanExecutionsOnNodes(intermediateResultsHash, + distributedPlanOfSubPlan); + } + } +} + + +/* + * AppendAllAccessedWorkerNodes iterates over all the tasks in a distributed plan + * to create the list of worker nodes that can be accessed when this plan is executed. + * + * If there are multiple placements of a Shard, all of them are considered and + * all the workers with placements are appended to the list. This effectively + * means that if there is a reference table access in the distributed plan, all + * the workers will be in the resulting list. + */ +static List * +AppendAllAccessedWorkerNodes(List *workerNodeList, DistributedPlan *distributedPlan, int + workerNodeCount) +{ + List *taskList = distributedPlan->workerJob->taskList; + ListCell *taskCell = NULL; + + foreach(taskCell, taskList) + { + Task *task = lfirst(taskCell); + ListCell *placementCell = NULL; + foreach(placementCell, task->taskPlacementList) + { + ShardPlacement *placement = lfirst(placementCell); + workerNodeList = list_append_unique_int(workerNodeList, placement->nodeId); + + /* early return if all the workers are accessed */ + if (list_length(workerNodeList) == workerNodeCount) + { + return workerNodeList; + } + } + } + + return workerNodeList; +} + + +/* + * MakeIntermediateResultHTAB is a helper method that creates a Hash Table that + * stores information on the intermediate result. + */ +HTAB * +MakeIntermediateResultHTAB() +{ + HTAB *intermediateResultsHash = NULL; + uint32 hashFlags = 0; + HASHCTL info = { 0 }; + int initialNumberOfElements = 16; + + info.keysize = NAMEDATALEN; + info.entrysize = sizeof(IntermediateResultsHashEntry); + info.hash = string_hash; + info.hcxt = CurrentMemoryContext; + hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + intermediateResultsHash = hash_create("Intermediate results hash", + initialNumberOfElements, &info, hashFlags); + + return intermediateResultsHash; +} + + +/* + * FindAllWorkerNodesUsingSubplan creates a list of worker nodes that + * may need to access subplan results. + */ +List * +FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, + char *resultId) +{ + List *workerNodeList = NIL; + IntermediateResultsHashEntry *entry = + SearchIntermediateResult(intermediateResultsHash, resultId); + + ListCell *nodeIdCell = NULL; + foreach(nodeIdCell, entry->nodeIdList) + { + WorkerNode *workerNode = LookupNodeByNodeId(lfirst_int(nodeIdCell)); + + workerNodeList = lappend(workerNodeList, workerNode); + + if ((LogIntermediateResults && IsLoggableLevel(DEBUG1)) || + IsLoggableLevel(DEBUG4)) + { + elog(DEBUG1, "Subplan %s will be sent to %s:%d", resultId, + workerNode->workerName, workerNode->workerPort); + } + } + + return workerNodeList; +} + + +/* + * SearchIntermediateResult searches through intermediateResultsHash for a given + * intermediate result id. + * + * If an entry is not found, creates a new entry with sane defaults. + */ +static IntermediateResultsHashEntry * +SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) +{ + IntermediateResultsHashEntry *entry = NULL; + bool found = false; + + entry = hash_search(intermediateResultsHash, resultId, HASH_ENTER, &found); + + /* use sane defaults */ + if (!found) + { + entry->nodeIdList = NIL; + } + + return entry; +} diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index bd8519d48..e2a305b10 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -46,6 +46,7 @@ #include "optimizer/prep.h" #include "optimizer/tlist.h" #include "parser/parsetree.h" +#include "utils/builtins.h" #include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -808,6 +809,38 @@ IsReadIntermediateResultFunction(Node *node) } +/* + * FindIntermediateResultIdIfExists extracts the id of the intermediate result + * if the given RTE contains a read_intermediate_results function, NULL otherwise + */ +char * +FindIntermediateResultIdIfExists(RangeTblEntry *rte) +{ + List *functionList = NULL; + RangeTblFunction *rangeTblfunction = NULL; + FuncExpr *funcExpr = NULL; + char *resultId = NULL; + + Assert(rte->rtekind == RTE_FUNCTION); + + functionList = rte->functions; + rangeTblfunction = (RangeTblFunction *) linitial(functionList); + funcExpr = (FuncExpr *) rangeTblfunction->funcexpr; + + if (IsReadIntermediateResultFunction((Node *) funcExpr)) + { + Const *resultIdConst = linitial(funcExpr->args); + + if (!resultIdConst->constisnull) + { + resultId = TextDatumGetCString(resultIdConst->constvalue); + } + } + + return resultId; +} + + /* * ErrorIfQueryNotSupported checks that we can perform distributed planning for * the given query. The checks in this function will be removed as we support diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 0e8452793..e24368ad1 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -69,7 +69,6 @@ bool SubqueryPushdown = false; /* is subquery pushdown enabled */ /* Local functions forward declarations */ static bool JoinTreeContainsSubqueryWalker(Node *joinTreeNode, void *context); static bool IsFunctionRTE(Node *node); -static bool IsNodeSubquery(Node *node); static bool IsOuterJoinExpr(Node *node); static bool WindowPartitionOnDistributionColumn(Query *query); static DeferredErrorMessage * DeferErrorIfFromClauseRecurs(Query *queryTree); @@ -344,7 +343,7 @@ IsFunctionRTE(Node *node) * (select 1) are converted to init plans in the rewritten query. In this case * the only thing left in the query tree is a Param node with type PARAM_EXEC. */ -static bool +bool IsNodeSubquery(Node *node) { if (node == NULL) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index fa53748df..dec92955e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -26,6 +26,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/distributed_deadlock_detection.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/local_executor.h" #include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" @@ -524,6 +525,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.log_intermediate_results", + gettext_noop("Log intermediate results sent to other nodes"), + NULL, + &LogIntermediateResults, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.log_distributed_deadlock_detection", gettext_noop("Log distributed deadlock detection related processing in " diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index a2fdc8c10..1f2888f9c 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -118,6 +118,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_STRING_FIELD(intermediateResultIdPrefix); COPY_NODE_FIELD(subPlanList); + COPY_NODE_FIELD(usedSubPlanNodeList); COPY_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 364a3ffb7..726e27753 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -196,6 +196,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_STRING_FIELD(intermediateResultIdPrefix); WRITE_NODE_FIELD(subPlanList); + WRITE_NODE_FIELD(usedSubPlanNodeList); WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index f5d1a3862..b178b6afc 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -224,6 +224,7 @@ ReadDistributedPlan(READFUNC_ARGS) READ_STRING_FIELD(intermediateResultIdPrefix); READ_NODE_FIELD(subPlanList); + READ_NODE_FIELD(usedSubPlanNodeList); READ_NODE_FIELD(planningError); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 1d89db6f1..86c0da214 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -2978,6 +2978,20 @@ GetWorkerNodeHash(void) } +/* + * GetWorkerNodeCount returns the number of worker nodes + * + * If the Worker Node cache is not (yet) valid, it is first rebuilt. + */ +int +GetWorkerNodeCount(void) +{ + PrepareWorkerNodeCache(); + + return WorkerNodeCount; +} + + /* * PrepareWorkerNodeCache makes sure the worker node data from pg_dist_node is cached, * if it is not already cached. diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 59e7881ca..efd93147d 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -39,4 +39,7 @@ extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct extern TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState); extern EState * ScanStateGetExecutorState(CitusScanState *scanState); +extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan); +extern bool IsCitusPlan(Plan *plan); +extern bool IsCitusCustomScan(Plan *plan); #endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h new file mode 100644 index 000000000..08f75e24c --- /dev/null +++ b/src/include/distributed/intermediate_result_pruning.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * intermediate_result_pruning.h + * Functions for pruning intermediate result broadcasting. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef INTERMEDIATE_RESULT_PRUNING_H +#define INTERMEDIATE_RESULT_PRUNING_H + +#include "distributed/subplan_execution.h" + +extern bool LogIntermediateResults; + +extern List * FindSubPlansUsedInNode(Node *node); +extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, + char *resultId); +extern HTAB * MakeIntermediateResultHTAB(void); +extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, + DistributedPlan *distributedPlan); + + +#endif /* INTERMEDIATE_RESULT_PRUNING_H */ diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 8042b3bba..e020a8635 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -18,6 +18,7 @@ #include "nodes/execnodes.h" #include "nodes/pg_list.h" #include "tcop/dest.h" +#include "utils/builtins.h" #include "utils/palloc.h" diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 2b78af180..25b04c022 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -150,6 +150,7 @@ extern void EnsureModificationsCanRun(void); /* access WorkerNodeHash */ extern HTAB * GetWorkerNodeHash(void); +extern int GetWorkerNodeCount(void); extern WorkerNode * LookupNodeByNodeId(uint32 nodeId); extern WorkerNode * LookupNodeForGroup(int32 groupId); diff --git a/src/include/distributed/multi_logical_planner.h b/src/include/distributed/multi_logical_planner.h index 33b54bef2..eb0a56a58 100644 --- a/src/include/distributed/multi_logical_planner.h +++ b/src/include/distributed/multi_logical_planner.h @@ -193,6 +193,7 @@ extern bool FindNodeCheckInRangeTableList(List *rtable, bool (*check)(Node *)); extern bool IsDistributedTableRTE(Node *node); extern bool QueryContainsDistributedTableRTE(Query *query); extern bool ContainsReadIntermediateResultFunction(Node *node); +extern char * FindIntermediateResultIdIfExists(RangeTblEntry *rte); extern MultiNode * ParentNode(MultiNode *multiNode); extern MultiNode * ChildNode(MultiUnaryNode *multiNode); extern MultiNode * GrandChildNode(MultiUnaryNode *multiNode); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index f44609b6b..beb9a6320 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -304,6 +304,16 @@ typedef struct DistributedPlan /* list of subplans to execute before the distributed query */ List *subPlanList; + /* + * List of subPlans that are used in the DistributedPlan + * Note that this is different that "subPlanList" field which + * contains the subplans generated part of the DistributedPlan. + * + * On the other hand, usedSubPlanNodeList keeps track of which subPlans + * are used within this distributed plan. + */ + List *usedSubPlanNodeList; + /* * NULL if this a valid plan, an error description otherwise. This will * e.g. be set if SQL features are present that a planner doesn't support, diff --git a/src/include/distributed/query_pushdown_planning.h b/src/include/distributed/query_pushdown_planning.h index 8ba3341fb..22524b3e4 100644 --- a/src/include/distributed/query_pushdown_planning.h +++ b/src/include/distributed/query_pushdown_planning.h @@ -26,6 +26,7 @@ extern bool SubqueryPushdown; extern bool ShouldUseSubqueryPushDown(Query *originalQuery, Query *rewrittenQuery, PlannerRestrictionContext *plannerRestrictionContext); extern bool JoinTreeContainsSubquery(Query *query); +extern bool IsNodeSubquery(Node *node); extern bool HasEmptyJoinTree(Query *query); extern bool WhereOrHavingClauseContainsSubquery(Query *query); extern bool TargetListContainsSubquery(Query *query); diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index 91c6f580e..10e088c8e 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -19,5 +19,20 @@ extern int SubPlanLevel; extern void ExecuteSubPlans(DistributedPlan *distributedPlan); +/** + * IntermediateResultsHashEntry is used to store which nodes need to receive + * intermediate results. Given an intermediate result name, you can lookup + * the list of nodes that can possibly run a query that will use the + * intermediate results. + * + * The nodeIdList contains a set of unique WorkerNode ids that have placements + * that can be used in non-colocated subquery joins with the intermediate result + * given in the key. + */ +typedef struct IntermediateResultsHashEntry +{ + char key[NAMEDATALEN]; + List *nodeIdList; +} IntermediateResultsHashEntry; #endif /* SUBPLAN_EXECUTION_H */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 9e951fce7..65d0e212b 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -77,7 +77,6 @@ extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern uint32 ActiveReadableWorkerNodeCount(void); extern List * ActiveReadableWorkerNodeList(void); extern List * ActiveReadableNodeList(void); -extern WorkerNode * GetWorkerNodeByNodeId(int nodeId); extern WorkerNode * FindWorkerNode(char *nodeName, int32 nodePort); extern WorkerNode * FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort); extern List * ReadDistNode(bool includeNodesFromOtherClusters); diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out new file mode 100644 index 000000000..f9850c7d8 --- /dev/null +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -0,0 +1,935 @@ +CREATE SCHEMA intermediate_result_pruning; +SET search_path TO intermediate_result_pruning; +SET citus.log_intermediate_results TO TRUE; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 1480000; +SET citus.shard_replication_factor = 1; +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_3 (key int, value text); +SELECT create_distributed_table('table_3', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE ref_table (key int, value text); +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +-- load some data +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO table_3 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +-- see which workers are hit for intermediate results +SET client_min_messages TO DEBUG1; +-- a very basic case, where the intermediate result +-- should go to both workers +WITH some_values_1 AS + (SELECT key FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN table_2 USING (key); +DEBUG: generating subplan 5_1 for CTE some_values_1: SELECT key FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 5 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Subplan 5_1 will be sent to localhost:57637 +DEBUG: Subplan 5_1 will be sent to localhost:57638 + count +------- + 2 +(1 row) + +-- a very basic case, where the intermediate result +-- should only go to one worker because the final query is a router +-- we use random() to prevent postgres inline the CTE(s) +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan 7_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 7 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('7_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan 7_1 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- a similar query, but with a reference table now +-- given that reference tables are replicated to all nodes +-- we have to broadcast to all nodes +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN ref_table USING (key); +DEBUG: generating subplan 9_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 9 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.ref_table USING (key)) +DEBUG: Subplan 9_1 will be sent to localhost:57637 +DEBUG: Subplan 9_1 will be sent to localhost:57638 + count +------- + 2 +(1 row) + +-- a similar query as above, but this time use the CTE inside +-- another CTE +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1) +SELECT + count(*) +FROM + some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan 11_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 11_2 for CTE some_values_2: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 +DEBUG: Plan 11 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('11_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan 11_1 will be sent to localhost:57638 +DEBUG: Subplan 11_2 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- the second CTE does a join with a distributed table +-- and the final query is a router query +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 3; +DEBUG: generating subplan 14_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 14_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('14_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) +DEBUG: Subplan 14_1 will be sent to localhost:57637 +DEBUG: Subplan 14_1 will be sent to localhost:57638 +DEBUG: Subplan 14_2 will be sent to localhost:57638 + count +------- + 1 +(1 row) + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table +-- and the final query is a router query +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; +DEBUG: generating subplan 17_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 17_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('17_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) +DEBUG: Subplan 17_1 will be sent to localhost:57638 +DEBUG: Subplan 17_1 will be sent to localhost:57637 +DEBUG: Subplan 17_2 will be sent to localhost:57638 + count +------- + 1 +(1 row) + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table but a router query on a worker +-- and the final query is another router query on another worker +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; +DEBUG: generating subplan 20_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 20_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Plan 20 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('20_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) +DEBUG: Subplan 20_1 will be sent to localhost:57638 +DEBUG: Subplan 20_1 will be sent to localhost:57637 +DEBUG: Subplan 20_2 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table but a router query on a worker +-- and the final query is a router query on the same worker, so the first result is only +-- broadcasted to a single node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan 23_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 23_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('23_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Plan 23 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('23_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('23_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan 23_1 will be sent to localhost:57637 +DEBUG: Subplan 23_2 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- the same query with the above, but the final query is hitting all shards +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; +DEBUG: generating subplan 26_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 26_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Plan 26 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('26_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.<>) 3) +DEBUG: Subplan 26_1 will be sent to localhost:57637 +DEBUG: Subplan 26_1 will be sent to localhost:57638 +DEBUG: Subplan 26_2 will be sent to localhost:57637 +DEBUG: Subplan 26_2 will be sent to localhost:57638 + count +------- + 1 +(1 row) + +-- even if we add a filter on the first query and make it a router query, +-- the first intermediate result still hits all workers because of the final +-- join is hitting all workers +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 3) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; +DEBUG: generating subplan 29_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 29_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) +DEBUG: Plan 29 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('29_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('29_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.<>) 3) +DEBUG: Subplan 29_1 will be sent to localhost:57637 +DEBUG: Subplan 29_1 will be sent to localhost:57638 +DEBUG: Subplan 29_2 will be sent to localhost:57637 +DEBUG: Subplan 29_2 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +-- the reference table is joined with a distributed table and an intermediate +-- result, but the distributed table hits all shards, so the intermediate +-- result is sent to all nodes +WITH some_values_1 AS + (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key); +DEBUG: generating subplan 32_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.ref_table WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 32 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('32_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.ref_table USING (key)) JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Subplan 32_1 will be sent to localhost:57637 +DEBUG: Subplan 32_1 will be sent to localhost:57638 + count +------- + 2 +(1 row) + +-- similar query as above, but this time the whole query is a router +-- query, so no intermediate results +WITH some_values_1 AS + (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key) WHERE table_2.key = 1; + count +------- + 0 +(1 row) + +-- now, the second CTE has a single shard join with a distributed table +-- so the first CTE should only be broadcasted to that node +-- since the final query doesn't have a join, it should simply be broadcasted +-- to one node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) +SELECT + count(*) +FROM + some_values_2; +DEBUG: generating subplan 35_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 35_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('35_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) +DEBUG: Plan 35 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('35_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 +DEBUG: Subplan 35_1 will be sent to localhost:57637 +DEBUG: Subplan 35_2 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- the same query inlined inside a CTE, and the final query has a +-- join with a distributed table +WITH top_cte as ( + WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) + SELECT + DISTINCT key + FROM + some_values_2 +) +SELECT + count(*) +FROM + top_cte JOIN table_2 USING (key); +DEBUG: generating subplan 38_1 for CTE top_cte: WITH some_values_1 AS (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text]))), some_values_2 AS (SELECT some_values_1.key, random() AS random FROM (some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1)) SELECT DISTINCT key FROM some_values_2 +DEBUG: generating subplan 39_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 39_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) +DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT DISTINCT key FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 +DEBUG: Plan 38 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('38_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) top_cte JOIN intermediate_result_pruning.table_2 USING (key)) +DEBUG: Subplan 38_1 will be sent to localhost:57637 +DEBUG: Subplan 38_1 will be sent to localhost:57638 +DEBUG: Subplan 39_1 will be sent to localhost:57637 +DEBUG: Subplan 39_2 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +-- very much the same query, but this time the top query is also a router query +-- on a single worker, so all intermediate results only hit a single node +WITH top_cte as ( + WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) + SELECT + DISTINCT key + FROM + some_values_2 +) +SELECT + count(*) +FROM + top_cte JOIN table_2 USING (key) WHERE table_2.key = 2; +DEBUG: generating subplan 42_1 for CTE top_cte: WITH some_values_1 AS (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text]))), some_values_2 AS (SELECT some_values_1.key, random() AS random FROM (some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1)) SELECT DISTINCT key FROM some_values_2 +DEBUG: generating subplan 43_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 43_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('43_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) +DEBUG: Plan 43 query after replacing subqueries and CTEs: SELECT DISTINCT key FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('43_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 +DEBUG: Plan 42 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) top_cte JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) +DEBUG: Subplan 42_1 will be sent to localhost:57638 +DEBUG: Subplan 43_1 will be sent to localhost:57637 +DEBUG: Subplan 43_2 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- some_values_1 is first used by a single shard-query, and than with a multi-shard +-- CTE, finally a cartesian product join +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1), + some_values_3 AS + (SELECT key FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key)) +SELECT * FROM some_values_3 JOIN ref_table ON (true); +DEBUG: generating subplan 46_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 46_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('46_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 46_3 for CTE some_values_3: SELECT some_values_2.key FROM (((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('46_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('46_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) +DEBUG: Plan 46 query after replacing subqueries and CTEs: SELECT some_values_3.key, ref_table.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('46_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) some_values_3 JOIN intermediate_result_pruning.ref_table ON (true)) +DEBUG: Subplan 46_1 will be sent to localhost:57637 +DEBUG: Subplan 46_1 will be sent to localhost:57638 +DEBUG: Subplan 46_2 will be sent to localhost:57637 +DEBUG: Subplan 46_2 will be sent to localhost:57638 +DEBUG: Subplan 46_3 will be sent to localhost:57637 +DEBUG: Subplan 46_3 will be sent to localhost:57638 + key | key | value +-----+-----+------- +(0 rows) + +-- join on intermediate results, so should only +-- go to a single node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) +SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key); +DEBUG: generating subplan 50_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 50_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 50 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('50_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('50_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) +DEBUG: Subplan 50_1 will be sent to localhost:57638 +DEBUG: Subplan 50_2 will be sent to localhost:57638 + count +------- + 2 +(1 row) + +-- same query with WHERE false make sure that we're not broken +-- for such edge cases +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) +SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false; +DEBUG: generating subplan 53_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 53_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: Plan 53 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('53_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('53_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE false +DEBUG: Subplan 53_1 will be sent to localhost:57637 +DEBUG: Subplan 53_2 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- do not use some_values_2 at all, so only 2 intermediate results are +-- broadcasted +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1), + some_values_3 AS + (SELECT key, random() FROM some_values_1) +SELECT + count(*) +FROM + some_values_3; +DEBUG: generating subplan 56_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) +DEBUG: generating subplan 56_2 for CTE some_values_3: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('56_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 +DEBUG: Plan 56 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('56_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_3 +DEBUG: Subplan 56_1 will be sent to localhost:57638 +DEBUG: Subplan 56_2 will be sent to localhost:57637 + count +------- + 2 +(1 row) + +-- lets have some deeper intermediate results +-- the inner most two results and the final query (which contains only intermediate results) +-- hitting single worker, others hitting all workers +-- (see below query where all intermediate results hit a single node) +SELECT count(*) FROM +( + SELECT avg(min::int) FROM + ( + SELECT min(table_1.value) FROM + ( + SELECT avg(value::int) as avg_ev_type FROM + ( + SELECT max(value) as mx_val_1 FROM + ( + SELECT avg(value::int) as avg FROM + ( + SELECT cnt FROM + ( + SELECT count(*) as cnt, value + FROM table_1 + WHERE key = 1 + GROUP BY value + ) as level_1, table_1 + WHERE table_1.key = level_1.cnt AND key = 3 + ) as level_2, table_2 + WHERE table_2.key = level_2.cnt AND key = 5 + GROUP BY level_2.cnt + ) as level_3, table_1 + WHERE value::numeric = level_3.avg AND key = 6 + GROUP BY level_3.avg + ) as level_4, table_2 + WHERE level_4.mx_val_1::int = table_2.key + GROUP BY level_4.mx_val_1 + ) as level_5, table_1 + WHERE level_5.avg_ev_type = table_1.key AND key > 111 + GROUP BY level_5.avg_ev_type + ) as level_6, table_1 WHERE table_1.key::int = level_6.min::int + GROUP BY table_1.value +) as bar; +DEBUG: generating subplan 59_1 for subquery SELECT count(*) AS cnt, value FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) GROUP BY value +DEBUG: generating subplan 59_2 for subquery SELECT avg((table_2.value)::integer) AS avg FROM (SELECT level_1.cnt FROM (SELECT intermediate_result.cnt, intermediate_result.value FROM read_intermediate_result('59_1'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint, value text)) level_1, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) level_1.cnt) AND (table_1.key OPERATOR(pg_catalog.=) 3))) level_2, intermediate_result_pruning.table_2 WHERE ((table_2.key OPERATOR(pg_catalog.=) level_2.cnt) AND (table_2.key OPERATOR(pg_catalog.=) 5)) GROUP BY level_2.cnt +DEBUG: generating subplan 59_3 for subquery SELECT max(table_1.value) AS mx_val_1 FROM (SELECT intermediate_result.avg FROM read_intermediate_result('59_2'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) level_3, intermediate_result_pruning.table_1 WHERE (((table_1.value)::numeric OPERATOR(pg_catalog.=) level_3.avg) AND (table_1.key OPERATOR(pg_catalog.=) 6)) GROUP BY level_3.avg +DEBUG: generating subplan 59_4 for subquery SELECT avg((table_2.value)::integer) AS avg_ev_type FROM (SELECT intermediate_result.mx_val_1 FROM read_intermediate_result('59_3'::text, 'binary'::citus_copy_format) intermediate_result(mx_val_1 text)) level_4, intermediate_result_pruning.table_2 WHERE ((level_4.mx_val_1)::integer OPERATOR(pg_catalog.=) table_2.key) GROUP BY level_4.mx_val_1 +DEBUG: generating subplan 59_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('59_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.>) 111)) GROUP BY level_5.avg_ev_type +DEBUG: generating subplan 59_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('59_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) GROUP BY table_1.value +DEBUG: Plan 59 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('59_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar +DEBUG: Subplan 59_1 will be sent to localhost:57638 +DEBUG: Subplan 59_2 will be sent to localhost:57637 +DEBUG: Subplan 59_3 will be sent to localhost:57637 +DEBUG: Subplan 59_3 will be sent to localhost:57638 +DEBUG: Subplan 59_4 will be sent to localhost:57637 +DEBUG: Subplan 59_4 will be sent to localhost:57638 +DEBUG: Subplan 59_5 will be sent to localhost:57637 +DEBUG: Subplan 59_5 will be sent to localhost:57638 +DEBUG: Subplan 59_6 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- the same query where all intermediate results hits one +-- worker because each and every query is a router query -- but on different nodes +SELECT count(*) FROM +( + SELECT avg(min::int) FROM + ( + SELECT min(table_1.value) FROM + ( + SELECT avg(value::int) as avg_ev_type FROM + ( + SELECT max(value) as mx_val_1 FROM + ( + SELECT avg(value::int) as avg FROM + ( + SELECT cnt FROM + ( + SELECT count(*) as cnt, value + FROM table_1 + WHERE key = 1 + GROUP BY value + ) as level_1, table_1 + WHERE table_1.key = level_1.cnt AND key = 3 + ) as level_2, table_2 + WHERE table_2.key = level_2.cnt AND key = 5 + GROUP BY level_2.cnt + ) as level_3, table_1 + WHERE value::numeric = level_3.avg AND key = 6 + GROUP BY level_3.avg + ) as level_4, table_2 + WHERE level_4.mx_val_1::int = table_2.key AND table_2.key = 1 + GROUP BY level_4.mx_val_1 + ) as level_5, table_1 + WHERE level_5.avg_ev_type = table_1.key AND key = 111 + GROUP BY level_5.avg_ev_type + ) as level_6, table_1 + WHERE table_1.key::int = level_6.min::int AND table_1.key = 4 + GROUP BY table_1.value +) as bar; +DEBUG: generating subplan 66_1 for subquery SELECT count(*) AS cnt, value FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) GROUP BY value +DEBUG: generating subplan 66_2 for subquery SELECT avg((table_2.value)::integer) AS avg FROM (SELECT level_1.cnt FROM (SELECT intermediate_result.cnt, intermediate_result.value FROM read_intermediate_result('66_1'::text, 'binary'::citus_copy_format) intermediate_result(cnt bigint, value text)) level_1, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) level_1.cnt) AND (table_1.key OPERATOR(pg_catalog.=) 3))) level_2, intermediate_result_pruning.table_2 WHERE ((table_2.key OPERATOR(pg_catalog.=) level_2.cnt) AND (table_2.key OPERATOR(pg_catalog.=) 5)) GROUP BY level_2.cnt +DEBUG: generating subplan 66_3 for subquery SELECT max(table_1.value) AS mx_val_1 FROM (SELECT intermediate_result.avg FROM read_intermediate_result('66_2'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) level_3, intermediate_result_pruning.table_1 WHERE (((table_1.value)::numeric OPERATOR(pg_catalog.=) level_3.avg) AND (table_1.key OPERATOR(pg_catalog.=) 6)) GROUP BY level_3.avg +DEBUG: generating subplan 66_4 for subquery SELECT avg((table_2.value)::integer) AS avg_ev_type FROM (SELECT intermediate_result.mx_val_1 FROM read_intermediate_result('66_3'::text, 'binary'::citus_copy_format) intermediate_result(mx_val_1 text)) level_4, intermediate_result_pruning.table_2 WHERE (((level_4.mx_val_1)::integer OPERATOR(pg_catalog.=) table_2.key) AND (table_2.key OPERATOR(pg_catalog.=) 1)) GROUP BY level_4.mx_val_1 +DEBUG: generating subplan 66_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('66_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.=) 111)) GROUP BY level_5.avg_ev_type +DEBUG: generating subplan 66_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('66_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) AND (table_1.key OPERATOR(pg_catalog.=) 4)) GROUP BY table_1.value +DEBUG: Plan 66 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('66_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar +DEBUG: Subplan 66_1 will be sent to localhost:57638 +DEBUG: Subplan 66_2 will be sent to localhost:57637 +DEBUG: Subplan 66_3 will be sent to localhost:57637 +DEBUG: Subplan 66_4 will be sent to localhost:57638 +DEBUG: Subplan 66_5 will be sent to localhost:57638 +DEBUG: Subplan 66_6 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- sanity checks for set operations +-- the intermediate results should just hit a single worker +(SELECT key FROM table_1 WHERE key = 1) +INTERSECT +(SELECT key FROM table_1 WHERE key = 2); +DEBUG: generating subplan 73_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 73_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) +DEBUG: Plan 73 query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('73_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('73_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) +DEBUG: Subplan 73_1 will be sent to localhost:57638 +DEBUG: Subplan 73_2 will be sent to localhost:57638 + key +----- +(0 rows) + +-- the intermediate results should just hit a single worker +WITH cte_1 AS +( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) +), +cte_2 AS +( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) +) +SELECT * FROM cte_1 + UNION +SELECT * FROM cte_2; +DEBUG: generating subplan 76_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) +DEBUG: generating subplan 77_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 77_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) +DEBUG: Plan 77 query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('77_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('77_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) +DEBUG: generating subplan 76_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) +DEBUG: Plan 76 query after replacing subqueries and CTEs: SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('76_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('76_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 +DEBUG: Subplan 76_1 will be sent to localhost:57638 +DEBUG: Subplan 77_1 will be sent to localhost:57637 +DEBUG: Subplan 77_2 will be sent to localhost:57637 +DEBUG: Subplan 76_2 will be sent to localhost:57638 + key +----- +(0 rows) + +-- one final test with SET operations, where +-- we join the results with distributed tables +-- so cte_1 should hit all workers, but still the +-- others should hit single worker each +WITH cte_1 AS +( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) +), +cte_2 AS +( + SELECT count(*) FROM table_1 JOIN cte_1 USING (key) +) +SELECT * FROM cte_2; +DEBUG: generating subplan 81_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) +DEBUG: generating subplan 82_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 82_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) +DEBUG: Plan 82 query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('82_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('82_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) +DEBUG: generating subplan 81_2 for CTE cte_2: SELECT count(*) AS count FROM (intermediate_result_pruning.table_1 JOIN (SELECT intermediate_result.key FROM read_intermediate_result('81_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 USING (key)) +DEBUG: Plan 81 query after replacing subqueries and CTEs: SELECT count FROM (SELECT intermediate_result.count FROM read_intermediate_result('81_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) cte_2 +DEBUG: Subplan 81_1 will be sent to localhost:57637 +DEBUG: Subplan 81_1 will be sent to localhost:57638 +DEBUG: Subplan 82_1 will be sent to localhost:57637 +DEBUG: Subplan 82_2 will be sent to localhost:57637 +DEBUG: Subplan 81_2 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +-- sanity checks for non-colocated subquery joins +-- the recursively planned subquery (bar) should hit all +-- nodes +SELECT + count(*) +FROM + (SELECT key, random() FROM table_1) as foo, + (SELECT key, random() FROM table_2) as bar +WHERE + foo.key != bar.key; +DEBUG: generating subplan 86_1 for subquery SELECT key, random() AS random FROM intermediate_result_pruning.table_2 +DEBUG: Plan 86 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1) foo, (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('86_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan 86_1 will be sent to localhost:57637 +DEBUG: Subplan 86_1 will be sent to localhost:57638 + count +------- + 14 +(1 row) + +-- the recursively planned subquery (bar) should hit one +-- node because foo goes to a single node +SELECT + count(*) +FROM + (SELECT key, random() FROM table_1 WHERE key = 1) as foo, + (SELECT key, random() FROM table_2) as bar +WHERE + foo.key != bar.key; +DEBUG: generating subplan 88_1 for subquery SELECT key, random() AS random FROM intermediate_result_pruning.table_2 +DEBUG: Plan 88 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT table_1.key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1)) foo, (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('88_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan 88_1 will be sent to localhost:57637 + count +------- + 4 +(1 row) + +-- sanity checks for modification queries +-- select_data goes to a single node, because it is used in another subquery +-- raw_data is also the final router query, so hits a single shard +-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all +-- nodes +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE key >= (SELECT min(key) FROM select_data WHERE key > 1) RETURNING * +) +SELECT * FROM raw_data; +DEBUG: generating subplan 90_1 for CTE select_data: SELECT key, value FROM intermediate_result_pruning.table_1 +DEBUG: generating subplan 90_2 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE (key OPERATOR(pg_catalog.>=) (SELECT min(select_data.key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('90_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE (select_data.key OPERATOR(pg_catalog.>) 1))) RETURNING key, value +DEBUG: generating subplan 92_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('90_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE (key OPERATOR(pg_catalog.>) 1) +DEBUG: Plan 92 query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (key OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('92_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value +DEBUG: Plan 90 query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('90_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data +DEBUG: Subplan 90_1 will be sent to localhost:57637 +DEBUG: Subplan 90_2 will be sent to localhost:57638 +DEBUG: Subplan 92_1 will be sent to localhost:57637 +DEBUG: Subplan 92_1 will be sent to localhost:57638 + key | value +-----+------- + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 +(4 rows) + +ROLLBACK; +-- select_data goes to a single node, because it is used in another subquery +-- raw_data is also the final router query, so hits a single shard +-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all +-- nodes +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM select_data WHERE key > 1 + random()) RETURNING * +) +SELECT * FROM raw_data; +DEBUG: generating subplan 94_1 for CTE select_data: SELECT key, value FROM intermediate_result_pruning.table_1 +DEBUG: generating subplan 94_2 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE ((value)::integer OPERATOR(pg_catalog.>=) (SELECT min(select_data.key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('94_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE ((select_data.key)::double precision OPERATOR(pg_catalog.>) ((1)::double precision OPERATOR(pg_catalog.+) random())))) RETURNING key, value +DEBUG: generating subplan 96_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('94_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE ((key)::double precision OPERATOR(pg_catalog.>) ((1)::double precision OPERATOR(pg_catalog.+) random())) +DEBUG: Plan 96 query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE ((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('96_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value +DEBUG: Plan 94 query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('94_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data +DEBUG: Subplan 94_1 will be sent to localhost:57637 +DEBUG: Subplan 94_2 will be sent to localhost:57638 +DEBUG: Subplan 96_1 will be sent to localhost:57637 +DEBUG: Subplan 96_1 will be sent to localhost:57638 + key | value +-----+------- + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 +(4 rows) + +ROLLBACK; +-- now, we need only two intermediate results as the subquery in WHERE clause is +-- router plannable +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM table_1 WHERE key > random()) AND key = 6 RETURNING * +) +SELECT * FROM raw_data; +DEBUG: generating subplan 98_1 for CTE raw_data: DELETE FROM intermediate_result_pruning.table_2 WHERE (((value)::integer OPERATOR(pg_catalog.>=) (SELECT min(table_1.key) AS min FROM intermediate_result_pruning.table_1 WHERE ((table_1.key)::double precision OPERATOR(pg_catalog.>) random()))) AND (key OPERATOR(pg_catalog.=) 6)) RETURNING key, value +DEBUG: generating subplan 99_1 for subquery SELECT min(key) AS min FROM intermediate_result_pruning.table_1 WHERE ((key)::double precision OPERATOR(pg_catalog.>) random()) +DEBUG: Plan 99 query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('99_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) AND (key OPERATOR(pg_catalog.=) 6)) RETURNING key, value +DEBUG: Plan 98 query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('98_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data +DEBUG: Subplan 98_1 will be sent to localhost:57637 +DEBUG: Subplan 99_1 will be sent to localhost:57637 + key | value +-----+------- + 6 | 6 +(1 row) + +ROLLBACK; +-- test with INSERT SELECT via coordinator +-- INSERT .. SELECT via coordinator that doesn't have any intermediate results +INSERT INTO table_1 + SELECT * FROM table_2 OFFSET 0; +DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +-- INSERT .. SELECT via coordinator which has intermediate result, +-- and can be pruned to a single worker because the final query is on +-- single shard via filter in key +INSERT INTO table_1 + SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1; +DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: generating subplan 104_1 for subquery SELECT value FROM intermediate_result_pruning.table_1 WHERE (random() OPERATOR(pg_catalog.>) (1)::double precision) +DEBUG: Plan 104 query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('104_1'::text, 'binary'::citus_copy_format) intermediate_result(value text))) AND (key OPERATOR(pg_catalog.=) 1)) +DEBUG: Subplan 104_1 will be sent to localhost:57637 +-- a similar query, with more complex subquery +INSERT INTO table_1 + SELECT * FROM table_2 where key = 1 AND + value::int IN + (WITH cte_1 AS + ( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) + ), + cte_2 AS + ( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) + ) + SELECT * FROM cte_1 + UNION + SELECT * FROM cte_2); +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: generating subplan 107_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) +DEBUG: generating subplan 108_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 108_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) +DEBUG: Plan 108 query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('108_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('108_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) +DEBUG: generating subplan 107_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) +DEBUG: generating subplan 107_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('107_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('107_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 +DEBUG: Plan 107 query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((key OPERATOR(pg_catalog.=) 1) AND ((value)::integer OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('107_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) +DEBUG: Subplan 107_1 will be sent to localhost:57637 +DEBUG: Subplan 108_1 will be sent to localhost:57638 +DEBUG: Subplan 108_2 will be sent to localhost:57638 +DEBUG: Subplan 107_2 will be sent to localhost:57637 +DEBUG: Subplan 107_3 will be sent to localhost:57637 +-- same query, cte is on the FROM clause +-- and this time the final query (and top-level intermediate result) +-- hits all the shards because table_2.key != 1 +INSERT INTO table_1 + SELECT table_2.* FROM table_2, + (WITH cte_1 AS + ( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) + ), + cte_2 AS + ( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) + ) + SELECT * FROM cte_1 + UNION + SELECT * FROM cte_2 + ) foo + where table_2.key != 1 AND + foo.key = table_2.value::int; +DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: generating subplan 114_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) +DEBUG: generating subplan 115_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) +DEBUG: generating subplan 115_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) +DEBUG: Plan 115 query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('115_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('115_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) +DEBUG: generating subplan 114_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) +DEBUG: generating subplan 114_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('114_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('114_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 +DEBUG: Plan 114 query after replacing subqueries and CTEs: SELECT table_2.key, table_2.value FROM intermediate_result_pruning.table_2, (SELECT intermediate_result.key FROM read_intermediate_result('114_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo WHERE ((table_2.key OPERATOR(pg_catalog.<>) 1) AND (foo.key OPERATOR(pg_catalog.=) (table_2.value)::integer)) +DEBUG: Subplan 114_1 will be sent to localhost:57637 +DEBUG: Subplan 115_1 will be sent to localhost:57638 +DEBUG: Subplan 115_2 will be sent to localhost:57638 +DEBUG: Subplan 114_2 will be sent to localhost:57637 +DEBUG: Subplan 114_3 will be sent to localhost:57637 +DEBUG: Subplan 114_3 will be sent to localhost:57638 +-- append partitioned/heap-type +SET citus.replication_model TO statement; +-- do not print out 'building index pg_toast_xxxxx_index' messages +SET client_min_messages TO DEFAULT; +CREATE TABLE range_partitioned(range_column text, data int); +SET client_min_messages TO DEBUG1; +SELECT create_distributed_table('range_partitioned', 'range_column', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT master_create_empty_shard('range_partitioned'); + master_create_empty_shard +--------------------------- + 1480013 +(1 row) + +SELECT master_create_empty_shard('range_partitioned'); + master_create_empty_shard +--------------------------- + 1480014 +(1 row) + +SELECT master_create_empty_shard('range_partitioned'); + master_create_empty_shard +--------------------------- + 1480015 +(1 row) + +SELECT master_create_empty_shard('range_partitioned'); + master_create_empty_shard +--------------------------- + 1480016 +(1 row) + +SELECT master_create_empty_shard('range_partitioned'); + master_create_empty_shard +--------------------------- + 1480017 +(1 row) + +UPDATE pg_dist_shard SET shardminvalue = 'A', shardmaxvalue = 'D' WHERE shardid = 1480013; +UPDATE pg_dist_shard SET shardminvalue = 'D', shardmaxvalue = 'G' WHERE shardid = 1480014; +UPDATE pg_dist_shard SET shardminvalue = 'G', shardmaxvalue = 'K' WHERE shardid = 1480015; +UPDATE pg_dist_shard SET shardminvalue = 'K', shardmaxvalue = 'O' WHERE shardid = 1480016; +UPDATE pg_dist_shard SET shardminvalue = 'O', shardmaxvalue = 'Z' WHERE shardid = 1480017; +-- final query goes to a single shard +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column = 'A' AND + data IN (SELECT data FROM range_partitioned); +DEBUG: generating subplan 120_1 for subquery SELECT data FROM intermediate_result_pruning.range_partitioned +DEBUG: Plan 120 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.=) 'A'::text) AND (data OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.data FROM read_intermediate_result('120_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)))) +DEBUG: Subplan 120_1 will be sent to localhost:57637 + count +------- + 0 +(1 row) + +-- final query goes to three shards, so multiple workers +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column >= 'A' AND range_column <= 'K' AND + data IN (SELECT data FROM range_partitioned); +DEBUG: generating subplan 122_1 for subquery SELECT data FROM intermediate_result_pruning.range_partitioned +DEBUG: Plan 122 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.>=) 'A'::text) AND (range_column OPERATOR(pg_catalog.<=) 'K'::text) AND (data OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.data FROM read_intermediate_result('122_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)))) +DEBUG: Subplan 122_1 will be sent to localhost:57637 +DEBUG: Subplan 122_1 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +-- two shards, both of which are on the first node +WITH some_data AS ( + SELECT data FROM range_partitioned +) +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column IN ('A', 'E') AND + range_partitioned.data IN (SELECT data FROM some_data); +DEBUG: generating subplan 124_1 for CTE some_data: SELECT data FROM intermediate_result_pruning.range_partitioned +DEBUG: Plan 124 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM intermediate_result_pruning.range_partitioned WHERE ((range_column OPERATOR(pg_catalog.=) ANY (ARRAY['A'::text, 'E'::text])) AND (data OPERATOR(pg_catalog.=) ANY (SELECT some_data.data FROM (SELECT intermediate_result.data FROM read_intermediate_result('124_1'::text, 'binary'::citus_copy_format) intermediate_result(data integer)) some_data))) +DEBUG: Subplan 124_1 will be sent to localhost:57637 +DEBUG: Subplan 124_1 will be sent to localhost:57638 + count +------- + 0 +(1 row) + +SET client_min_messages TO DEFAULT; +DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP SCHEMA intermediate_result_pruning; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 91699fd7e..3da84736e 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -82,7 +82,7 @@ test: multi_array_agg multi_limit_clause multi_orderby_limit_pushdown test: multi_jsonb_agg multi_jsonb_object_agg multi_json_agg multi_json_object_agg bool_agg ch_bench_having ch_bench_subquery_repartition chbenchmark_all_queries expression_reference_join test: multi_agg_type_conversion multi_count_type_conversion test: multi_partition_pruning single_hash_repartition_join -test: multi_join_pruning multi_hash_pruning +test: multi_join_pruning multi_hash_pruning intermediate_result_pruning test: multi_null_minmax_value_pruning test: multi_query_directory_cleanup test: multi_task_assignment_policy multi_cross_shard diff --git a/src/test/regress/sql/intermediate_result_pruning.sql b/src/test/regress/sql/intermediate_result_pruning.sql new file mode 100644 index 000000000..22a61d4e4 --- /dev/null +++ b/src/test/regress/sql/intermediate_result_pruning.sql @@ -0,0 +1,554 @@ +CREATE SCHEMA intermediate_result_pruning; +SET search_path TO intermediate_result_pruning; +SET citus.log_intermediate_results TO TRUE; + +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 1480000; +SET citus.shard_replication_factor = 1; + +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + + +CREATE TABLE table_3 (key int, value text); +SELECT create_distributed_table('table_3', 'key'); + +CREATE TABLE ref_table (key int, value text); +SELECT create_reference_table('ref_table'); + + +-- load some data +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO table_3 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +-- see which workers are hit for intermediate results +SET client_min_messages TO DEBUG1; + +-- a very basic case, where the intermediate result +-- should go to both workers +WITH some_values_1 AS + (SELECT key FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN table_2 USING (key); + + +-- a very basic case, where the intermediate result +-- should only go to one worker because the final query is a router +-- we use random() to prevent postgres inline the CTE(s) +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1; + +-- a similar query, but with a reference table now +-- given that reference tables are replicated to all nodes +-- we have to broadcast to all nodes +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + some_values_1 JOIN ref_table USING (key); + + +-- a similar query as above, but this time use the CTE inside +-- another CTE +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1) +SELECT + count(*) +FROM + some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 1; + +-- the second CTE does a join with a distributed table +-- and the final query is a router query +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + some_values_2 JOIN table_2 USING (key) WHERE table_2.key = 3; + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table +-- and the final query is a router query +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table but a router query on a worker +-- and the final query is another router query on another worker +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 3; + +-- the first CTE is used both within second CTE and the final query +-- the second CTE does a join with a distributed table but a router query on a worker +-- and the final query is a router query on the same worker, so the first result is only +-- broadcasted to a single node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 1) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key = 1; + +-- the same query with the above, but the final query is hitting all shards +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key)) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; + +-- even if we add a filter on the first query and make it a router query, +-- the first intermediate result still hits all workers because of the final +-- join is hitting all workers +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE table_2.key = 3) +SELECT + count(*) +FROM + (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key) WHERE table_2.key != 3; + +-- the reference table is joined with a distributed table and an intermediate +-- result, but the distributed table hits all shards, so the intermediate +-- result is sent to all nodes +WITH some_values_1 AS + (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key); + +-- similar query as above, but this time the whole query is a router +-- query, so no intermediate results +WITH some_values_1 AS + (SELECT key, random() FROM ref_table WHERE value IN ('3', '4')) +SELECT + count(*) +FROM + (some_values_1 JOIN ref_table USING (key)) JOIN table_2 USING (key) WHERE table_2.key = 1; + + +-- now, the second CTE has a single shard join with a distributed table +-- so the first CTE should only be broadcasted to that node +-- since the final query doesn't have a join, it should simply be broadcasted +-- to one node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) +SELECT + count(*) +FROM + some_values_2; + + +-- the same query inlined inside a CTE, and the final query has a +-- join with a distributed table +WITH top_cte as ( + WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) + SELECT + DISTINCT key + FROM + some_values_2 +) +SELECT + count(*) +FROM + top_cte JOIN table_2 USING (key); + + +-- very much the same query, but this time the top query is also a router query +-- on a single worker, so all intermediate results only hit a single node +WITH top_cte as ( + WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1) + SELECT + DISTINCT key + FROM + some_values_2 +) +SELECT + count(*) +FROM + top_cte JOIN table_2 USING (key) WHERE table_2.key = 2; + + +-- some_values_1 is first used by a single shard-query, and than with a multi-shard +-- CTE, finally a cartesian product join +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1 JOIN table_2 USING (key) WHERE key = 1), + some_values_3 AS + (SELECT key FROM (some_values_2 JOIN table_2 USING (key)) JOIN some_values_1 USING (key)) +SELECT * FROM some_values_3 JOIN ref_table ON (true); + + + +-- join on intermediate results, so should only +-- go to a single node +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) +SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key); + +-- same query with WHERE false make sure that we're not broken +-- for such edge cases +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM table_2 WHERE value IN ('3', '4')) +SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false; + + +-- do not use some_values_2 at all, so only 2 intermediate results are +-- broadcasted +WITH some_values_1 AS + (SELECT key, random() FROM table_1 WHERE value IN ('3', '4')), + some_values_2 AS + (SELECT key, random() FROM some_values_1), + some_values_3 AS + (SELECT key, random() FROM some_values_1) +SELECT + count(*) +FROM + some_values_3; + +-- lets have some deeper intermediate results +-- the inner most two results and the final query (which contains only intermediate results) +-- hitting single worker, others hitting all workers +-- (see below query where all intermediate results hit a single node) +SELECT count(*) FROM +( + SELECT avg(min::int) FROM + ( + SELECT min(table_1.value) FROM + ( + SELECT avg(value::int) as avg_ev_type FROM + ( + SELECT max(value) as mx_val_1 FROM + ( + SELECT avg(value::int) as avg FROM + ( + SELECT cnt FROM + ( + SELECT count(*) as cnt, value + FROM table_1 + WHERE key = 1 + GROUP BY value + ) as level_1, table_1 + WHERE table_1.key = level_1.cnt AND key = 3 + ) as level_2, table_2 + WHERE table_2.key = level_2.cnt AND key = 5 + GROUP BY level_2.cnt + ) as level_3, table_1 + WHERE value::numeric = level_3.avg AND key = 6 + GROUP BY level_3.avg + ) as level_4, table_2 + WHERE level_4.mx_val_1::int = table_2.key + GROUP BY level_4.mx_val_1 + ) as level_5, table_1 + WHERE level_5.avg_ev_type = table_1.key AND key > 111 + GROUP BY level_5.avg_ev_type + ) as level_6, table_1 WHERE table_1.key::int = level_6.min::int + GROUP BY table_1.value +) as bar; +-- the same query where all intermediate results hits one +-- worker because each and every query is a router query -- but on different nodes +SELECT count(*) FROM +( + SELECT avg(min::int) FROM + ( + SELECT min(table_1.value) FROM + ( + SELECT avg(value::int) as avg_ev_type FROM + ( + SELECT max(value) as mx_val_1 FROM + ( + SELECT avg(value::int) as avg FROM + ( + SELECT cnt FROM + ( + SELECT count(*) as cnt, value + FROM table_1 + WHERE key = 1 + GROUP BY value + ) as level_1, table_1 + WHERE table_1.key = level_1.cnt AND key = 3 + ) as level_2, table_2 + WHERE table_2.key = level_2.cnt AND key = 5 + GROUP BY level_2.cnt + ) as level_3, table_1 + WHERE value::numeric = level_3.avg AND key = 6 + GROUP BY level_3.avg + ) as level_4, table_2 + WHERE level_4.mx_val_1::int = table_2.key AND table_2.key = 1 + GROUP BY level_4.mx_val_1 + ) as level_5, table_1 + WHERE level_5.avg_ev_type = table_1.key AND key = 111 + GROUP BY level_5.avg_ev_type + ) as level_6, table_1 + WHERE table_1.key::int = level_6.min::int AND table_1.key = 4 + GROUP BY table_1.value +) as bar; + + +-- sanity checks for set operations + +-- the intermediate results should just hit a single worker +(SELECT key FROM table_1 WHERE key = 1) +INTERSECT +(SELECT key FROM table_1 WHERE key = 2); + +-- the intermediate results should just hit a single worker +WITH cte_1 AS +( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) +), +cte_2 AS +( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) +) +SELECT * FROM cte_1 + UNION +SELECT * FROM cte_2; + +-- one final test with SET operations, where +-- we join the results with distributed tables +-- so cte_1 should hit all workers, but still the +-- others should hit single worker each +WITH cte_1 AS +( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) +), +cte_2 AS +( + SELECT count(*) FROM table_1 JOIN cte_1 USING (key) +) +SELECT * FROM cte_2; + + +-- sanity checks for non-colocated subquery joins +-- the recursively planned subquery (bar) should hit all +-- nodes +SELECT + count(*) +FROM + (SELECT key, random() FROM table_1) as foo, + (SELECT key, random() FROM table_2) as bar +WHERE + foo.key != bar.key; + +-- the recursively planned subquery (bar) should hit one +-- node because foo goes to a single node +SELECT + count(*) +FROM + (SELECT key, random() FROM table_1 WHERE key = 1) as foo, + (SELECT key, random() FROM table_2) as bar +WHERE + foo.key != bar.key; + + +-- sanity checks for modification queries + +-- select_data goes to a single node, because it is used in another subquery +-- raw_data is also the final router query, so hits a single shard +-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all +-- nodes +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE key >= (SELECT min(key) FROM select_data WHERE key > 1) RETURNING * +) +SELECT * FROM raw_data; +ROLLBACK; + +-- select_data goes to a single node, because it is used in another subquery +-- raw_data is also the final router query, so hits a single shard +-- however, the subquery in WHERE clause of the DELETE query is broadcasted to all +-- nodes +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM select_data WHERE key > 1 + random()) RETURNING * +) +SELECT * FROM raw_data; +ROLLBACK; + +-- now, we need only two intermediate results as the subquery in WHERE clause is +-- router plannable +BEGIN; +WITH select_data AS ( + SELECT * FROM table_1 +), +raw_data AS ( + DELETE FROM table_2 WHERE value::int >= (SELECT min(key) FROM table_1 WHERE key > random()) AND key = 6 RETURNING * +) +SELECT * FROM raw_data; +ROLLBACK; + +-- test with INSERT SELECT via coordinator + +-- INSERT .. SELECT via coordinator that doesn't have any intermediate results +INSERT INTO table_1 + SELECT * FROM table_2 OFFSET 0; + +-- INSERT .. SELECT via coordinator which has intermediate result, +-- and can be pruned to a single worker because the final query is on +-- single shard via filter in key +INSERT INTO table_1 + SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1; + +-- a similar query, with more complex subquery +INSERT INTO table_1 + SELECT * FROM table_2 where key = 1 AND + value::int IN + (WITH cte_1 AS + ( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) + ), + cte_2 AS + ( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) + ) + SELECT * FROM cte_1 + UNION + SELECT * FROM cte_2); + +-- same query, cte is on the FROM clause +-- and this time the final query (and top-level intermediate result) +-- hits all the shards because table_2.key != 1 +INSERT INTO table_1 + SELECT table_2.* FROM table_2, + (WITH cte_1 AS + ( + (SELECT key FROM table_1 WHERE key = 1) + INTERSECT + (SELECT key FROM table_1 WHERE key = 2) + ), + cte_2 AS + ( + (SELECT key FROM table_1 WHERE key = 3) + INTERSECT + (SELECT key FROM table_1 WHERE key = 4) + ) + SELECT * FROM cte_1 + UNION + SELECT * FROM cte_2 + ) foo + where table_2.key != 1 AND + foo.key = table_2.value::int; + + + +-- append partitioned/heap-type +SET citus.replication_model TO statement; + +-- do not print out 'building index pg_toast_xxxxx_index' messages +SET client_min_messages TO DEFAULT; +CREATE TABLE range_partitioned(range_column text, data int); +SET client_min_messages TO DEBUG1; + +SELECT create_distributed_table('range_partitioned', 'range_column', 'range'); +SELECT master_create_empty_shard('range_partitioned'); +SELECT master_create_empty_shard('range_partitioned'); +SELECT master_create_empty_shard('range_partitioned'); +SELECT master_create_empty_shard('range_partitioned'); +SELECT master_create_empty_shard('range_partitioned'); + + +UPDATE pg_dist_shard SET shardminvalue = 'A', shardmaxvalue = 'D' WHERE shardid = 1480013; +UPDATE pg_dist_shard SET shardminvalue = 'D', shardmaxvalue = 'G' WHERE shardid = 1480014; +UPDATE pg_dist_shard SET shardminvalue = 'G', shardmaxvalue = 'K' WHERE shardid = 1480015; +UPDATE pg_dist_shard SET shardminvalue = 'K', shardmaxvalue = 'O' WHERE shardid = 1480016; +UPDATE pg_dist_shard SET shardminvalue = 'O', shardmaxvalue = 'Z' WHERE shardid = 1480017; + +-- final query goes to a single shard +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column = 'A' AND + data IN (SELECT data FROM range_partitioned); + + +-- final query goes to three shards, so multiple workers +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column >= 'A' AND range_column <= 'K' AND + data IN (SELECT data FROM range_partitioned); + +-- two shards, both of which are on the first node +WITH some_data AS ( + SELECT data FROM range_partitioned +) +SELECT + count(*) +FROM + range_partitioned +WHERE + range_column IN ('A', 'E') AND + range_partitioned.data IN (SELECT data FROM some_data); + +SET client_min_messages TO DEFAULT; +DROP TABLE table_1, table_2, table_3, ref_table, range_partitioned; +DROP SCHEMA intermediate_result_pruning; +