diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a54f0fd1c..5af86bad3 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1112,23 +1112,49 @@ FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery) static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery) { - /* first, get all the subplans in the query */ - plan->usedSubPlanNodeList = FindSubPlansUsedInNode((Node *) originalQuery); + Node *havingQual = originalQuery->havingQual; + + /* temporarily set to NULL, we're going to restore before the function returns */ + originalQuery->havingQual = NULL; /* - * Later, remove the subplans used in the HAVING clause, because they - * are only required in the coordinator. Including them in the - * usedSubPlanNodeList prevents the intermediate results to be sent to the - * coordinator only. + * Mark the subplans as needed on remote side. Note that this decision is revisited + * on execution, when the query only consists of intermediate results. */ - if (originalQuery->hasSubLinks && - FindNodeCheck(originalQuery->havingQual, IsNodeSubquery)) - { - List *subplansInHaving = FindSubPlansUsedInNode(originalQuery->havingQual); + List *subplansExceptHaving = FindSubPlansUsedInNode((Node *) originalQuery); + UpdateUsedPlanListLocation(subplansExceptHaving, SUBPLAN_ACCESS_REMOTE); - plan->usedSubPlanNodeList = - list_difference(plan->usedSubPlanNodeList, subplansInHaving); + /* do the same for HAVING part of the query */ + List *subplansInHaving = NIL; + if (originalQuery->hasSubLinks && + FindNodeCheck(havingQual, IsNodeSubquery)) + { + subplansInHaving = FindSubPlansUsedInNode(havingQual); + if (plan->masterQuery) + { + /* + * If we have the master query, we're sure that the result is needed locally. + * Otherwise, such as router queries, the plan may not be required locally. + * Note that if the query consists of only intermediate results, the executor + * may still prefer to write locally. + * + * If any of the subplansInHaving is used in other parts of the query, + * we'll later merge it those subPlans and send it to remote. + */ + UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_LOCAL); + } + else + { + UpdateUsedPlanListLocation(subplansInHaving, SUBPLAN_ACCESS_REMOTE); + } } + + /* set back the havingQual and the calculated subplans */ + originalQuery->havingQual = havingQual; + + /* merge the used subplans */ + plan->usedSubPlanNodeList = + MergeUsedSubPlanLists(subplansExceptHaving, subplansInHaving); } diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 5acab9df5..f01595d6b 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -31,23 +31,29 @@ static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, List *workerNodeList); +static bool UsedSubPlansEqual(UsedDistributedSubPlan *left, + UsedDistributedSubPlan *right); +static UsedDistributedSubPlan * UsedSubPlanListMember(List *list, + UsedDistributedSubPlan *usedPlan); + /* * FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing - * the range table entries in the plan. + * the input node. */ List * FindSubPlansUsedInNode(Node *node) { List *rangeTableList = NIL; ListCell *rangeTableCell = NULL; - List *subPlanList = NIL; + List *usedSubPlanList = NIL; ExtractRangeTableEntryWalker(node, &rangeTableList); foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = lfirst(rangeTableCell); + if (rangeTableEntry->rtekind == RTE_FUNCTION) { char *resultId = @@ -62,12 +68,21 @@ FindSubPlansUsedInNode(Node *node) * Use a Value to be able to use list_append_unique and store * the result ID in the DistributedPlan. */ - Value *resultIdValue = makeString(resultId); - subPlanList = list_append_unique(subPlanList, resultIdValue); + UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan); + + usedPlan->subPlanId = pstrdup(resultId); + + /* the callers are responsible for setting the accurate location */ + usedPlan->locationMask = SUBPLAN_ACCESS_NONE; + + if (!UsedSubPlanListMember(usedSubPlanList, usedPlan)) + { + usedSubPlanList = lappend(usedSubPlanList, usedPlan); + } } } - return subPlanList; + return usedSubPlanList; } @@ -88,26 +103,36 @@ void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, DistributedPlan *distributedPlan) { - Value *usedSubPlanIdValue = NULL; List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; int workerNodeCount = GetWorkerNodeCount(); - foreach_ptr(usedSubPlanIdValue, usedSubPlanNodeList) + foreach(subPlanCell, usedSubPlanNodeList) { - char *resultId = strVal(usedSubPlanIdValue); + UsedDistributedSubPlan *usedPlan = lfirst(subPlanCell); + + char *resultId = usedPlan->subPlanId; IntermediateResultsHashEntry *entry = SearchIntermediateResult( intermediateResultsHash, resultId); - /* no need to traverse the whole plan if all the workers are hit */ + if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL) + { + /* subPlan needs to be written locally as the planner decided */ + entry->writeLocalFile = true; + } + + /* + * There is no need to traverse the whole plan if the intermediate result + * will be written to a local file and send to all nodes + */ if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); break; } - else + else if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE) { /* * traverse the plan and add find all worker nodes @@ -221,20 +246,6 @@ FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, SearchIntermediateResult(intermediateResultsHash, resultId); List *remoteWorkerNodes = FindAllRemoteWorkerNodesUsingSubplan(entry); - if (remoteWorkerNodes == NIL) - { - /* - * This could happen in two cases: - * (a) Subquery in the having - * (b) The intermediate result is not used, such as RETURNING of a - * modifying CTE is not used - * - * For SELECT, Postgres/Citus is clever enough to not execute the CTE - * if it is not used at all, but for modifications we have to execute - * the queries. - */ - entry->writeLocalFile = true; - } /* * Don't include the current worker if the result will be written to local @@ -369,3 +380,97 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) return entry; } + + +/* + * MergeUsedSubPlanLists is a utility function that merges the two input + * UsedSubPlan lists. Existence of the items of the rightSubPlanList + * checked in leftSubPlanList. If not found, items are added to + * leftSubPlanList. If found, the locationMask fields are merged. + * + * Finally, the in-place modified leftSubPlanList is returned. + */ +List * +MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList) +{ + ListCell *rightListCell; + + foreach(rightListCell, rightSubPlanList) + { + UsedDistributedSubPlan *memberOnRightList = lfirst(rightListCell); + UsedDistributedSubPlan *memberOnLeftList = + UsedSubPlanListMember(leftSubPlanList, memberOnRightList); + + if (memberOnLeftList == NULL) + { + leftSubPlanList = lappend(leftSubPlanList, memberOnRightList); + } + else + { + memberOnLeftList->locationMask |= memberOnRightList->locationMask; + } + } + + return leftSubPlanList; +} + + +/* + * UsedSubPlanListMember is a utility function inspired from list_member(), + * but operating on UsedDistributedSubPlan struct, which doesn't have equal() + * function defined (similar to all Citus node types). + */ +static UsedDistributedSubPlan * +UsedSubPlanListMember(List *usedSubPlanList, UsedDistributedSubPlan *usedPlan) +{ + const ListCell *usedSubPlanCell; + + foreach(usedSubPlanCell, usedSubPlanList) + { + if (UsedSubPlansEqual(lfirst(usedSubPlanCell), usedPlan)) + { + return lfirst(usedSubPlanCell); + } + } + + return NULL; +} + + +/* + * UpdateUsedPlanListLocation is a utility function which iterates over the list + * and updates the subPlanLocation to the input location. + */ +void +UpdateUsedPlanListLocation(List *subPlanList, int locationMask) +{ + ListCell *subPlanCell = NULL; + foreach(subPlanCell, subPlanList) + { + UsedDistributedSubPlan *subPlan = lfirst(subPlanCell); + + subPlan->locationMask |= locationMask; + } +} + + +/* + * UsedSubPlansEqual is a utility function inspired from equal(), + * but operating on UsedDistributedSubPlan struct, which doesn't have equal() + * function defined (similar to all Citus node types). + */ +static bool +UsedSubPlansEqual(UsedDistributedSubPlan *left, UsedDistributedSubPlan *right) +{ + if (left == NULL || right == NULL) + { + return false; + } + + if (strncmp(left->subPlanId, right->subPlanId, NAMEDATALEN) == 0) + { + return true; + } + + return false; +} diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index b5243252f..dcac8b8c7 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -133,6 +133,16 @@ CopyNodeDistributedSubPlan(COPYFUNC_ARGS) } +void +CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS) +{ + DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan); + + COPY_STRING_FIELD(subPlanId); + COPY_SCALAR_FIELD(locationMask); +} + + void CopyNodeShardInterval(COPYFUNC_ARGS) { diff --git a/src/backend/distributed/utils/citus_nodefuncs.c b/src/backend/distributed/utils/citus_nodefuncs.c index 1209874a2..3f266cc4b 100644 --- a/src/backend/distributed/utils/citus_nodefuncs.c +++ b/src/backend/distributed/utils/citus_nodefuncs.c @@ -35,6 +35,7 @@ static const char *CitusNodeTagNamesD[] = { "MapMergeJob", "DistributedPlan", "DistributedSubPlan", + "UsedDistributedSubPlan", "Task", "LocalPlannedStatement", "TaskExecution", @@ -382,6 +383,7 @@ const ExtensibleNodeMethods nodeMethods[] = { DEFINE_NODE_METHODS(DistributedPlan), DEFINE_NODE_METHODS(DistributedSubPlan), + DEFINE_NODE_METHODS(UsedDistributedSubPlan), DEFINE_NODE_METHODS(Job), DEFINE_NODE_METHODS(ShardInterval), DEFINE_NODE_METHODS(MapMergeJob), diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index e093739f5..ead94ef78 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -212,6 +212,17 @@ OutDistributedSubPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(plan); } +void +OutUsedDistributedSubPlan(OUTFUNC_ARGS) +{ + WRITE_LOCALS(UsedDistributedSubPlan); + + WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN"); + + WRITE_STRING_FIELD(subPlanId); + WRITE_INT_FIELD(locationMask); +} + void OutMultiProject(OUTFUNC_ARGS) diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index b201dfe19..d06f0e715 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -244,6 +244,16 @@ ReadDistributedSubPlan(READFUNC_ARGS) } +READFUNC_RET +ReadUsedDistributedSubPlan(READFUNC_ARGS) +{ + READ_LOCALS(UsedDistributedSubPlan); + + READ_STRING_FIELD(subPlanId); + READ_INT_FIELD(locationMask); +} + + READFUNC_RET ReadShardInterval(READFUNC_ARGS) { diff --git a/src/include/distributed/citus_nodefuncs.h b/src/include/distributed/citus_nodefuncs.h index e9507bfa8..f04a88aba 100644 --- a/src/include/distributed/citus_nodefuncs.h +++ b/src/include/distributed/citus_nodefuncs.h @@ -45,6 +45,7 @@ extern void RegisterNodes(void); extern READFUNC_RET ReadJob(READFUNC_ARGS); extern READFUNC_RET ReadDistributedPlan(READFUNC_ARGS); extern READFUNC_RET ReadDistributedSubPlan(READFUNC_ARGS); +extern READFUNC_RET ReadUsedDistributedSubPlan(READFUNC_ARGS); extern READFUNC_RET ReadShardInterval(READFUNC_ARGS); extern READFUNC_RET ReadMapMergeJob(READFUNC_ARGS); extern READFUNC_RET ReadShardPlacement(READFUNC_ARGS); @@ -61,6 +62,7 @@ extern READFUNC_RET ReadUnsupportedCitusNode(READFUNC_ARGS); extern void OutJob(OUTFUNC_ARGS); extern void OutDistributedPlan(OUTFUNC_ARGS); extern void OutDistributedSubPlan(OUTFUNC_ARGS); +extern void OutUsedDistributedSubPlan(OUTFUNC_ARGS); extern void OutShardInterval(OUTFUNC_ARGS); extern void OutMapMergeJob(OUTFUNC_ARGS); extern void OutShardPlacement(OUTFUNC_ARGS); @@ -86,6 +88,7 @@ extern void OutMultiExtendedOp(OUTFUNC_ARGS); extern void CopyNodeJob(COPYFUNC_ARGS); extern void CopyNodeDistributedPlan(COPYFUNC_ARGS); extern void CopyNodeDistributedSubPlan(COPYFUNC_ARGS); +extern void CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS); extern void CopyNodeShardInterval(COPYFUNC_ARGS); extern void CopyNodeMapMergeJob(COPYFUNC_ARGS); extern void CopyNodeShardPlacement(COPYFUNC_ARGS); diff --git a/src/include/distributed/citus_nodes.h b/src/include/distributed/citus_nodes.h index f1816d7c5..27fc8d7b0 100644 --- a/src/include/distributed/citus_nodes.h +++ b/src/include/distributed/citus_nodes.h @@ -57,6 +57,7 @@ typedef enum CitusNodeTag T_MapMergeJob, T_DistributedPlan, T_DistributedSubPlan, + T_UsedDistributedSubPlan, T_Task, T_LocalPlannedStatement, T_TaskExecution, diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h index b3d8906f7..2d1ca3050 100644 --- a/src/include/distributed/intermediate_result_pruning.h +++ b/src/include/distributed/intermediate_result_pruning.h @@ -29,5 +29,8 @@ extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash, char *resultId); +/* utility functions related to UsedSubPlans */ +extern List * MergeUsedSubPlanLists(List *leftSubPlanList, List *rightSubPlanList); +extern void UpdateUsedPlanListLocation(List *subPlanList, int localtionMask); #endif /* INTERMEDIATE_RESULT_PRUNING_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index fef1fe60f..36d3f24e3 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -44,6 +44,10 @@ #define MERGE_FILES_AND_RUN_QUERY_COMMAND \ "SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)" +#define SUBPLAN_ACCESS_NONE 0 +#define SUBPLAN_ACCESS_LOCAL 1 +#define SUBPLAN_ACCESS_REMOTE 2 + typedef enum CitusRTEKind { @@ -388,6 +392,19 @@ typedef struct DistributedSubPlan } DistributedSubPlan; +/* + * UsedDistributedSubPlan contains information about a subPlan that is used in a + * distributed plan. + */ +typedef struct UsedDistributedSubPlan +{ + CitusNode type; + + char *subPlanId; + int locationMask; +} UsedDistributedSubPlan; + + /* OperatorCacheEntry contains information for each element in OperatorCache */ typedef struct OperatorCacheEntry { diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index af257a55d..d245c324b 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -225,18 +225,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); -DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file -ERROR: result "31_1" does not exist -CONTEXT: while executing command on localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- +(0 rows) + -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning @@ -330,6 +334,130 @@ NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT inte 4 | 4 (1 row) +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 +DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file +NOTICE: executing the command locally: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) + key | key +--------------------------------------------------------------------- +(0 rows) + \c - - - :worker_1_port -- now use the same queries on a worker SET search_path TO locally_execute_intermediate_results; @@ -524,18 +652,21 @@ DEBUG: Subplan XXX_3 will be written to local file -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); -DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file -ERROR: result "28_1" does not exist -CONTEXT: while executing command on localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- +(0 rows) + -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning @@ -642,6 +773,128 @@ DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx 4 | 4 (1 row) +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT table_2.key FROM locally_execute_intermediate_results.table_2 GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2))) bar WHERE (foo.key OPERATOR(pg_catalog.=) bar.key) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 +DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_4 will be written to local file + key | key +--------------------------------------------------------------------- +(0 rows) + -- finally, use round-robin policy on the workers with same set of queries set citus.task_assignment_policy TO "round-robin" ; -- the query cannot be executed locally, but still because of @@ -727,7 +980,7 @@ DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM lo DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) -DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx count @@ -758,7 +1011,6 @@ DEBUG: Subplan XXX_2 will be written to local file -- multiple CTEs are joined inside HAVING, so written to file -- locally, also the join tree contains only another CTE, so should be -- executed locally, but not on an Citus MX worker --- TODO: fixed by #3396 WITH cte_1 AS (SELECT max(value) FROM table_1), cte_2 AS (SELECT max(value) FROM table_1), cte_3 AS (SELECT * FROM table_2) @@ -772,11 +1024,15 @@ DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM lo DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) -DEBUG: Subplan XXX_1 will be written to local file -DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -ERROR: result "66_1" does not exist -CONTEXT: while executing command on localhost:xxxxx + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + -- now, the CTE is going to be written locally, -- plus that could have been read locally on the coordinator -- because of the aggragate over the cte in HAVING @@ -815,7 +1071,7 @@ DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELEC DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_3 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx count --------------------------------------------------------------------- 1 @@ -825,18 +1081,21 @@ DEBUG: Subplan XXX_3 will be written to local file -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) GROUP BY key HAVING (max(table_2.value) > (SELECT value FROM a)); -DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) DEBUG: Subplan XXX_1 will be written to local file -ERROR: result "77_1" does not exist -CONTEXT: while executing command on localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- +(0 rows) + -- this time the same CTE is both joined with a distributed -- table and used in HAVING -- but used in another subquery/aggregate -- so one more level of recursive planning @@ -943,6 +1202,140 @@ DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx 4 | 4 (1 row) +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.=) 3) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- the same query as above, try to hit local node with either of the queries +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((locally_execute_intermediate_results.table_2 JOIN (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 USING (key)) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 ON ((table_2.key OPERATOR(pg_catalog.=) (cte_2.max)::integer))) JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 USING (max)) WHERE (table_2.key OPERATOR(pg_catalog.=) 3) GROUP BY table_2.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT cte_1_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2_1 USING (max)))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; +ERROR: syntax error at or near "-" +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT table_1.key FROM locally_execute_intermediate_results.table_1 GROUP BY table_1.key HAVING (max(table_1.value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1))) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM locally_execute_intermediate_results.table_1 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) LIMIT 1 +DEBUG: generating subplan XXX_4 for subquery SELECT key FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2)) LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo.key, bar.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) bar WHERE (foo.key OPERATOR(pg_catalog.<>) bar.key) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx + key | key +--------------------------------------------------------------------- +(0 rows) + \c - - - :master_port SET client_min_messages TO ERROR; DROP SCHEMA locally_execute_intermediate_results CASCADE; diff --git a/src/test/regress/sql/locally_execute_intermediate_results.sql b/src/test/regress/sql/locally_execute_intermediate_results.sql index c7f785b07..e6c0e1949 100644 --- a/src/test/regress/sql/locally_execute_intermediate_results.sql +++ b/src/test/regress/sql/locally_execute_intermediate_results.sql @@ -125,8 +125,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1); -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) @@ -172,6 +171,67 @@ cte_2 AS (SELECT * FROM cte_1), cte_3 AS (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; + +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; + + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; \c - - - :worker_1_port @@ -287,8 +347,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1); -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) @@ -346,6 +405,67 @@ cte_3 AS (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; + +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; + + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; + -- finally, use round-robin policy on the workers with same set of queries set citus.task_assignment_policy TO "round-robin" ; -- the query cannot be executed locally, but still because of @@ -413,7 +533,6 @@ HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); -- multiple CTEs are joined inside HAVING, so written to file -- locally, also the join tree contains only another CTE, so should be -- executed locally, but not on an Citus MX worker --- TODO: fixed by #3396 WITH cte_1 AS (SELECT max(value) FROM table_1), cte_2 AS (SELECT max(value) FROM table_1), cte_3 AS (SELECT * FROM table_2) @@ -450,8 +569,7 @@ HAVING max(value) < (SELECT max(max) FROM cte_1); -- this time the same CTE is both joined with a distributed -- table and used in HAVING --- TODO: fixed by #3396 -WITH a AS (SELECT * FROM table_1) +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) SELECT count(*), key FROM a JOIN table_2 USING (key) @@ -509,6 +627,80 @@ cte_3 AS (SELECT max(key) as key FROM cte_2) SELECT * FROM cte_3 JOIN ref_table USING (key); +-- some cases around router queries +-- a router query, but the having has two cte joins +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 +WHERE KEY = 3 +GROUP BY KEY +HAVING max(value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- a router query, but the having has two cte joins +-- and the jointree has a join with another cte +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +-- the same query as above, try to hit local node with either of the queries +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + + +-- a router query, but the having has two cte joins +-- and the jointree has a join with the same CTEs +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_1), + cte_3 AS (SELECT * FROM table_2) +SELECT count(*) +FROM table_2 JOIN cte_3 USING(key) JOIN cte_2 ON (key = MAX::int) JOIN cte_1 USING(MAX) +WHERE KEY = 3 +GROUP BY table_2.KEY +HAVING max(table_2.value) > + (SELECT MAX FROM cte_1 JOIN cte_2 USING (MAX)); + +- subPlans needed remotely as the subquery is pushed down +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key = bar.key; + +-- the second subquery needs to be recursively planned due to non-colocated subquery join +-- so cte_2 becomes part of master query of that recursive subquery planning +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1)) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2)) as bar + WHERE foo.key != bar.key; + + +-- now, forcing all subqueries to be on the local node +WITH cte_1 AS (SELECT max(value) FROM table_1), + cte_2 AS (SELECT max(value) FROM table_2) +SELECT * FROM + (SELECT key FROM table_1 GROUP BY key HAVING max(value) > (SELECT * FROM cte_1) LIMIT 1) as foo, + (SELECT key FROM table_2 GROUP BY key HAVING max(value) > (SELECT * FROM cte_2) LIMIT 1) as bar + WHERE foo.key != bar.key; + \c - - - :master_port SET client_min_messages TO ERROR;