diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 3f25eba14..a393c7114 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '9.2-1' +default_version = '9.2-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 07004df60..4d82ab87b 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1828,7 +1828,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, const char *nullPrintCharacter = "\\N"; /* Citus currently doesn't know how to handle COPY command locally */ - ErrorIfLocalExecutionHappened(); + ErrorIfTransactionAccessedPlacementsLocally(); /* look up table properties */ Relation distributedRelation = heap_open(tableId, RowExclusiveLock); @@ -2910,6 +2910,11 @@ CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure) ClaimConnectionExclusively(connection); } + if (!TransactionConnectedToLocalGroup && placement->groupId == GetLocalGroupId()) + { + TransactionConnectedToLocalGroup = true; + } + return connection; } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 7d3bde2b7..48956641a 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -187,7 +187,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, * Since we cannot execute EXPLAIN ANALYZE locally, we * cannot continue. */ - ErrorIfLocalExecutionHappened(); + ErrorIfTransactionAccessedPlacementsLocally(); } } diff --git a/src/backend/distributed/connection/placement_connection.c b/src/backend/distributed/connection/placement_connection.c index 7198b1dba..ee9aa0c50 100644 --- a/src/backend/distributed/connection/placement_connection.c +++ b/src/backend/distributed/connection/placement_connection.c @@ -1168,19 +1168,3 @@ ColocatedPlacementsHashCompare(const void *a, const void *b, Size keysize) return 0; } } - - -/* - * AnyConnectionAccessedPlacements simply checks the number of entries in - * ConnectionPlacementHash. This is useful to detect whether we're in a - * distirbuted transaction and already executed at least one command that - * accessed to a placement. - */ -bool -AnyConnectionAccessedPlacements(void) -{ - /* this is initialized on PG_INIT */ - Assert(ConnectionPlacementHash != NULL); - - return hash_get_num_entries(ConnectionPlacementHash) > 0; -} diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f77774403..2b271a14e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -764,8 +764,6 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); - LocalExecutionHappened = true; - /* * We're deliberately not setting execution->rowsProceessed here. The main reason * is that modifications to reference tables would end-up setting it both here @@ -882,7 +880,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * The code-paths that rely on this function do not know how execute * commands locally. */ - ErrorIfLocalExecutionHappened(); + ErrorIfTransactionAccessedPlacementsLocally(); if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { @@ -1009,7 +1007,7 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, return xactProperties; } - if (LocalExecutionHappened) + if (TransactionAccessedLocalPlacement) { /* * In case localExecutionHappened, we force the executor to use 2PC. @@ -1730,6 +1728,12 @@ AssignTasksToConnections(DistributedExecution *execution) */ placementExecutionReady = false; } + + if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == + GetLocalGroupId()) + { + TransactionConnectedToLocalGroup = true; + } } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 9e1cab457..075d87374 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -97,7 +97,8 @@ bool EnableLocalExecution = true; bool LogLocalCommands = false; -bool LocalExecutionHappened = false; +bool TransactionAccessedLocalPlacement = false; +bool TransactionConnectedToLocalGroup = false; static void SplitLocalAndRemotePlacements(List *taskPlacementList, @@ -144,6 +145,17 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) { Task *task = (Task *) lfirst(taskCell); + /* + * If we have a valid shard id, a distributed table will be accessed + * during execution. Record it to apply the restrictions related to + * local execution. + */ + if (!TransactionAccessedLocalPlacement && + task->anchorShardId != INVALID_SHARD_ID) + { + TransactionAccessedLocalPlacement = true; + } + PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); /* @@ -395,7 +407,7 @@ ShouldExecuteTasksLocally(List *taskList) return false; } - if (LocalExecutionHappened) + if (TransactionAccessedLocalPlacement) { /* * For various reasons, including the transaction visibility @@ -436,7 +448,7 @@ ShouldExecuteTasksLocally(List *taskList) * has happened because that'd break transaction visibility rules and * many other things. */ - return !AnyConnectionAccessedPlacements(); + return !TransactionConnectedToLocalGroup; } if (!singleTask) @@ -447,7 +459,7 @@ ShouldExecuteTasksLocally(List *taskList) * execution is happening one task at a time (e.g., similar to sequential * distributed execution). */ - Assert(!LocalExecutionHappened); + Assert(!TransactionAccessedLocalPlacement); return false; } @@ -481,20 +493,20 @@ TaskAccessesLocalNode(Task *task) /* - * ErrorIfLocalExecutionHappened() errors out if a local query has already been executed - * in the same transaction. + * ErrorIfTransactionAccessedPlacementsLocally() errors out if a local query on any shard + * has already been executed in the same transaction. * * This check is required because Citus currently hasn't implemented local execution * infrastructure for all the commands/executors. As we implement local execution for * the command/executor that this function call exists, we should simply remove the check. */ void -ErrorIfLocalExecutionHappened(void) +ErrorIfTransactionAccessedPlacementsLocally(void) { - if (LocalExecutionHappened) + if (TransactionAccessedLocalPlacement) { ereport(ERROR, (errmsg("cannot execute command because a local execution has " - "already been done in the transaction"), + "accessed a placement in the transaction"), errhint("Try re-running the transaction with " "\"SET LOCAL citus.enable_local_execution TO OFF;\""), errdetail("Some parallel commands cannot be executed if a " diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 6ab4b2064..0e94340e6 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2921,7 +2921,7 @@ TaskTrackerExecScan(CustomScanState *node) Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; - ErrorIfLocalExecutionHappened(); + ErrorIfTransactionAccessedPlacementsLocally(); DisableLocalExecution(); if (ContainsReadIntermediateResultFunction((Node *) jobQuery)) diff --git a/src/backend/distributed/executor/subplan_execution.c b/src/backend/distributed/executor/subplan_execution.c index ef62f7b03..93a1087a5 100644 --- a/src/backend/distributed/executor/subplan_execution.c +++ b/src/backend/distributed/executor/subplan_execution.c @@ -46,7 +46,6 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) HTAB *intermediateResultsHash = MakeIntermediateResultHTAB(); RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan); - /* * Make sure that this transaction has a distributed transaction ID. * @@ -61,40 +60,18 @@ ExecuteSubPlans(DistributedPlan *distributedPlan) PlannedStmt *plannedStmt = subPlan->plan; uint32 subPlanId = subPlan->subPlanId; ParamListInfo params = NULL; - bool writeLocalFile = false; char *resultId = GenerateResultId(planId, subPlanId); - List *workerNodeList = + List *remoteWorkerNodeList = FindAllWorkerNodesUsingSubplan(intermediateResultsHash, resultId); - /* - * Write intermediate results to local file only if there is no worker - * node that receives them. - * - * This could happen in two cases: - * (a) Subquery in the having - * (b) The intermediate result is not used, such as RETURNING of a - * modifying CTE is not used - * - * For SELECT, Postgres/Citus is clever enough to not execute the CTE - * if it is not used at all, but for modifications we have to execute - * the queries. - */ - if (workerNodeList == NIL) - { - writeLocalFile = true; - - if ((LogIntermediateResults && IsLoggableLevel(DEBUG1)) || - IsLoggableLevel(DEBUG4)) - { - elog(DEBUG1, "Subplan %s will be written to local file", resultId); - } - } + IntermediateResultsHashEntry *entry = + SearchIntermediateResult(intermediateResultsHash, resultId); SubPlanLevel++; EState *estate = CreateExecutorState(); - DestReceiver *copyDest = CreateRemoteFileDestReceiver(resultId, estate, - workerNodeList, - writeLocalFile); + DestReceiver *copyDest = + CreateRemoteFileDestReceiver(resultId, estate, remoteWorkerNodeList, + entry->writeLocalFile); ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest); diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index f1b75f754..5acab9df5 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -24,13 +24,13 @@ /* controlled via GUC, used mostly for testing */ bool LogIntermediateResults = false; -static List * AppendAllAccessedWorkerNodes(List *workerNodeList, - DistributedPlan *distributedPlan, - int workerNodeCount); -static IntermediateResultsHashEntry * SearchIntermediateResult(HTAB - *intermediateResultsHash, - char *resultId); - +static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, + DistributedPlan *distributedPlan, + int workerNodeCount); +static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry); +static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); +static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, + List *workerNodeList); /* * FindSubPlansUsedInPlan finds all the subplans used by the plan by traversing @@ -102,10 +102,9 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, intermediateResultsHash, resultId); /* no need to traverse the whole plan if all the workers are hit */ - if (list_length(entry->nodeIdList) == workerNodeCount) + if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); - break; } else @@ -117,9 +116,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * workers will be in the node list. We can improve intermediate result * pruning by deciding which reference table shard will be accessed earlier */ - entry->nodeIdList = AppendAllAccessedWorkerNodes(entry->nodeIdList, - distributedPlan, - workerNodeCount); + AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount); elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId); } @@ -142,16 +139,19 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, /* * AppendAllAccessedWorkerNodes iterates over all the tasks in a distributed plan - * to create the list of worker nodes that can be accessed when this plan is executed. + * to updates the list of worker nodes that can be accessed when this plan is + * executed in entry. Depending on the plan, the function may give the decision for + * writing the results locally. * * If there are multiple placements of a Shard, all of them are considered and * all the workers with placements are appended to the list. This effectively * means that if there is a reference table access in the distributed plan, all * the workers will be in the resulting list. */ -static List * -AppendAllAccessedWorkerNodes(List *workerNodeList, DistributedPlan *distributedPlan, int - workerNodeCount) +static void +AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, + DistributedPlan *distributedPlan, + int workerNodeCount) { List *taskList = distributedPlan->workerJob->taskList; ListCell *taskCell = NULL; @@ -163,17 +163,24 @@ AppendAllAccessedWorkerNodes(List *workerNodeList, DistributedPlan *distributedP foreach(placementCell, task->taskPlacementList) { ShardPlacement *placement = lfirst(placementCell); - workerNodeList = list_append_unique_int(workerNodeList, placement->nodeId); + + if (placement->nodeId == LOCAL_NODE_ID) + { + entry->writeLocalFile = true; + continue; + } + + entry->nodeIdList = + list_append_unique_int(entry->nodeIdList, placement->nodeId); /* early return if all the workers are accessed */ - if (list_length(workerNodeList) == workerNodeCount) + if (list_length(entry->nodeIdList) == workerNodeCount && + entry->writeLocalFile) { - return workerNodeList; + return; } } } - - return workerNodeList; } @@ -203,28 +210,67 @@ MakeIntermediateResultHTAB() /* * FindAllWorkerNodesUsingSubplan creates a list of worker nodes that - * may need to access subplan results. + * may need to access subplan results. The function also sets writeToLocalFile + * flag if the result should also need be written locally. */ List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, char *resultId) { - List *workerNodeList = NIL; IntermediateResultsHashEntry *entry = SearchIntermediateResult(intermediateResultsHash, resultId); + List *remoteWorkerNodes = FindAllRemoteWorkerNodesUsingSubplan(entry); + if (remoteWorkerNodes == NIL) + { + /* + * This could happen in two cases: + * (a) Subquery in the having + * (b) The intermediate result is not used, such as RETURNING of a + * modifying CTE is not used + * + * For SELECT, Postgres/Citus is clever enough to not execute the CTE + * if it is not used at all, but for modifications we have to execute + * the queries. + */ + entry->writeLocalFile = true; + } + + /* + * Don't include the current worker if the result will be written to local + * file as this would be very inefficient and potentially leading race + * conditions while tring to write the same file twice. + */ + if (entry->writeLocalFile) + { + remoteWorkerNodes = RemoveLocalNodeFromWorkerList(remoteWorkerNodes); + } + + LogIntermediateResultMulticastSummary(entry, remoteWorkerNodes); + + return remoteWorkerNodes; +} + + +/* + * FindAllRemoteWorkerNodesUsingSubplan goes over the nodeIdList of the + * intermediate result entry, and returns a list of workerNodes that the + * entry should be multi-casted to. The aim of the function is to filter + * out nodes with LOCAL_NODE_ID. + */ +static List * +FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry) +{ + List *workerNodeList = NIL; + ListCell *nodeIdCell = NULL; foreach(nodeIdCell, entry->nodeIdList) { - WorkerNode *workerNode = LookupNodeByNodeId(lfirst_int(nodeIdCell)); - - workerNodeList = lappend(workerNodeList, workerNode); - - if ((LogIntermediateResults && IsLoggableLevel(DEBUG1)) || - IsLoggableLevel(DEBUG4)) + uint32 nodeId = lfirst_int(nodeIdCell); + WorkerNode *workerNode = LookupNodeByNodeId(nodeId); + if (workerNode != NULL) { - elog(DEBUG1, "Subplan %s will be sent to %s:%d", resultId, - workerNode->workerName, workerNode->workerPort); + workerNodeList = lappend(workerNodeList, workerNode); } } @@ -232,13 +278,81 @@ FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, } +/* + * RemoveLocalNodeFromWorkerList goes over the input workerNode list and + * removes the worker node with the local group id, and returns a new list. + */ +static List * +RemoveLocalNodeFromWorkerList(List *workerNodeList) +{ + int32 localGroupId = GetLocalGroupId(); + + ListCell *workerNodeCell = NULL; + ListCell *prev = NULL; + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (workerNode->groupId == localGroupId) + { + return list_delete_cell(workerNodeList, workerNodeCell, prev); + } + + prev = workerNodeCell; + } + + return workerNodeList; +} + + +/* + * LogIntermediateResultMulticastSummary is a utility function to DEBUG output + * the decisions given on which intermediate result should be sent to which node. + * + * For details, see the function comments. + */ +static void +LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry, + List *workerNodeList) +{ + char *resultId = entry->key; + + /* + * Log a summary of decisions made for intermediate result multicast. By default + * we log at level DEBUG4. When the user has set citus.log_intermediate_results + * we change the log level to DEBUG1. This is mostly useful in regression tests + * where we specifically want to debug this decisions, but not all DEBUG4 messages. + */ + int logLevel = DEBUG4; + + if (LogIntermediateResults) + { + logLevel = DEBUG1; + } + + if (IsLoggableLevel(logLevel)) + { + if (entry->writeLocalFile) + { + elog(logLevel, "Subplan %s will be written to local file", resultId); + } + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + elog(logLevel, "Subplan %s will be sent to %s:%d", resultId, + workerNode->workerName, workerNode->workerPort); + } + } +} + + /* * SearchIntermediateResult searches through intermediateResultsHash for a given * intermediate result id. * * If an entry is not found, creates a new entry with sane defaults. */ -static IntermediateResultsHashEntry * +IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) { bool found = false; @@ -250,6 +364,7 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId) if (!found) { entry->nodeIdList = NIL; + entry->writeLocalFile = false; } return entry; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index b4ba6c237..223bc79d4 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -27,6 +27,7 @@ #include "distributed/errormessage.h" #include "distributed/log_utils.h" #include "distributed/insert_select_planner.h" +#include "distributed/intermediate_result_pruning.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -68,6 +69,7 @@ #include "optimizer/restrictinfo.h" #include "parser/parsetree.h" #include "parser/parse_oper.h" +#include "postmaster/postmaster.h" #include "storage/lock.h" #include "utils/builtins.h" #include "utils/elog.h" @@ -150,6 +152,7 @@ static DeferredErrorMessage * MultiRouterPlannableQuery(Query *query); static DeferredErrorMessage * ErrorIfQueryHasModifyingCTE(Query *queryTree); static RangeTblEntry * GetUpdateOrDeleteRTE(Query *query); static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); +static ShardPlacement * CreateDummyPlacement(void); static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); @@ -2214,8 +2217,6 @@ List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, bool replacePrunedQueryWithDummy) { - static uint32 zeroShardQueryRoundRobin = 0; - List *workerList = NIL; /* @@ -2230,23 +2231,10 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, } else if (replacePrunedQueryWithDummy) { - List *workerNodeList = ActiveReadableWorkerNodeList(); - if (workerNodeList != NIL) + ShardPlacement *dummyPlacement = CreateDummyPlacement(); + if (dummyPlacement != NULL) { - int workerNodeCount = list_length(workerNodeList); - int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; - WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, - workerNodeIndex); - ShardPlacement *dummyPlacement = - (ShardPlacement *) CitusMakeNode(ShardPlacement); - dummyPlacement->nodeName = workerNode->workerName; - dummyPlacement->nodePort = workerNode->workerPort; - dummyPlacement->nodeId = workerNode->nodeId; - dummyPlacement->groupId = workerNode->groupId; - workerList = lappend(workerList, dummyPlacement); - - zeroShardQueryRoundRobin++; } } @@ -2254,6 +2242,52 @@ FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, } +/* + * CreateDummyPlacement creates a dummy placement that can be used for queries + * that don't involve any shards. The typical examples are: + * (a) queries that consist of only intermediate results + * (b) queries that hit zero shards (... WHERE false;) + * + * If round robin policy is set, the placement could be on any node in pg_dist_node. + * Else, the local node is set for the placement. + */ +static ShardPlacement * +CreateDummyPlacement(void) +{ + static uint32 zeroShardQueryRoundRobin = 0; + ShardPlacement *dummyPlacement = CitusMakeNode(ShardPlacement); + + if (TaskAssignmentPolicy == TASK_ASSIGNMENT_ROUND_ROBIN) + { + List *workerNodeList = ActiveReadableWorkerNodeList(); + if (workerNodeList == NIL) + { + return NULL; + } + + int workerNodeCount = list_length(workerNodeList); + int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; + WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, + workerNodeIndex); + dummyPlacement->nodeName = workerNode->workerName; + dummyPlacement->nodePort = workerNode->workerPort; + dummyPlacement->nodeId = workerNode->nodeId; + dummyPlacement->groupId = workerNode->groupId; + + zeroShardQueryRoundRobin++; + } + else + { + dummyPlacement->nodeId = LOCAL_NODE_ID; + dummyPlacement->nodeName = LOCAL_HOST_NAME; + dummyPlacement->nodePort = PostPortNumber; + dummyPlacement->groupId = GetLocalGroupId(); + } + + return dummyPlacement; +} + + /* * RelationShardListForShardIntervalList is a utility function which gets a list of * shardInterval, and returns a list of RelationShard. diff --git a/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql b/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql new file mode 100644 index 000000000..6576b11b5 --- /dev/null +++ b/src/backend/distributed/sql/citus--9.2-1--9.2-2.sql @@ -0,0 +1,2 @@ +-- reserve UINT32_MAX (4294967295) for a special node +ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq MAXVALUE 4294967294; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index b94057da6..b5bedc3ac 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -248,7 +248,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; - LocalExecutionHappened = false; + TransactionAccessedLocalPlacement = false; + TransactionConnectedToLocalGroup = false; dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; @@ -302,7 +303,8 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) CurrentCoordinatedTransactionState = COORD_TRANS_NONE; XactModificationLevel = XACT_MODIFICATION_NONE; - LocalExecutionHappened = false; + TransactionAccessedLocalPlacement = false; + TransactionConnectedToLocalGroup = false; dlist_init(&InProgressTransactions); activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; diff --git a/src/include/distributed/intermediate_result_pruning.h b/src/include/distributed/intermediate_result_pruning.h index 08f75e24c..b3d8906f7 100644 --- a/src/include/distributed/intermediate_result_pruning.h +++ b/src/include/distributed/intermediate_result_pruning.h @@ -13,6 +13,11 @@ #include "distributed/subplan_execution.h" +/* + * UINT32_MAX is reserved in pg_dist_node, so we can use it safely. + */ +#define LOCAL_NODE_ID UINT32_MAX + extern bool LogIntermediateResults; extern List * FindSubPlansUsedInNode(Node *node); @@ -21,6 +26,8 @@ extern List * FindAllWorkerNodesUsingSubplan(HTAB *intermediateResultsHash, extern HTAB * MakeIntermediateResultHTAB(void); extern void RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, DistributedPlan *distributedPlan); +extern IntermediateResultsHashEntry * SearchIntermediateResult(HTAB *resultsHash, + char *resultId); #endif /* INTERMEDIATE_RESULT_PRUNING_H */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 464d87e0e..1a0d8ff24 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -17,13 +17,14 @@ extern bool EnableLocalExecution; extern bool LogLocalCommands; -extern bool LocalExecutionHappened; +extern bool TransactionAccessedLocalPlacement; +extern bool TransactionConnectedToLocalGroup; extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); -extern void ErrorIfLocalExecutionHappened(void); +extern void ErrorIfTransactionAccessedPlacementsLocally(void); extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList); extern char * TaskQueryString(Task *task); extern bool TaskAccessesLocalNode(Task *task); diff --git a/src/include/distributed/placement_connection.h b/src/include/distributed/placement_connection.h index bfd1f119a..02beed1f5 100644 --- a/src/include/distributed/placement_connection.h +++ b/src/include/distributed/placement_connection.h @@ -39,7 +39,6 @@ extern void CloseShardPlacementAssociation(struct MultiConnection *connection); extern void ResetShardPlacementAssociation(struct MultiConnection *connection); extern void InitPlacementConnectionManagement(void); -extern bool AnyConnectionAccessedPlacements(void); extern bool ConnectionModifiedPlacement(MultiConnection *connection); extern bool ConnectionUsedForAnyPlacements(MultiConnection *connection); diff --git a/src/include/distributed/subplan_execution.h b/src/include/distributed/subplan_execution.h index 10e088c8e..d68db43ce 100644 --- a/src/include/distributed/subplan_execution.h +++ b/src/include/distributed/subplan_execution.h @@ -28,11 +28,16 @@ extern void ExecuteSubPlans(DistributedPlan *distributedPlan); * The nodeIdList contains a set of unique WorkerNode ids that have placements * that can be used in non-colocated subquery joins with the intermediate result * given in the key. + * + * writeLocalFile indicates if the intermediate result is accessed during local + * execution. Note that there can possibly be an item for the local node in the + * NodeIdList. */ typedef struct IntermediateResultsHashEntry { char key[NAMEDATALEN]; List *nodeIdList; + bool writeLocalFile; } IntermediateResultsHashEntry; #endif /* SUBPLAN_EXECUTION_H */ diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 8084d101f..54571f077 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -107,7 +107,7 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar (1 row) ALTER TABLE test DROP COLUMN z; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; diff --git a/src/test/regress/expected/failure_cte_subquery.out b/src/test/regress/expected/failure_cte_subquery.out index ac1dc5aa2..bad22eeed 100644 --- a/src/test/regress/expected/failure_cte_subquery.out +++ b/src/test/regress/expected/failure_cte_subquery.out @@ -6,6 +6,8 @@ SET citus.next_shard_id TO 16000000; -- CTE inlining should not happen because -- the tests rely on intermediate results SET citus.enable_cte_inlining TO false; +-- prevent using locally executing the intermediate results +SET citus.task_assignment_policy TO "round-robin"; SELECT pg_backend_pid() as pid \gset CREATE TABLE users_table (user_id int, user_name text); CREATE TABLE events_table(user_id int, event_id int, event_type int); diff --git a/src/test/regress/expected/fast_path_router_modify.out b/src/test/regress/expected/fast_path_router_modify.out index e155206d3..0780e20ce 100644 --- a/src/test/regress/expected/fast_path_router_modify.out +++ b/src/test/regress/expected/fast_path_router_modify.out @@ -368,8 +368,7 @@ EXECUTE prepared_zero_shard_select(1); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) count --------------------------------------------------------------------- 0 @@ -379,8 +378,7 @@ EXECUTE prepared_zero_shard_select(2); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) count --------------------------------------------------------------------- 0 @@ -390,8 +388,7 @@ EXECUTE prepared_zero_shard_select(3); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 3) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 3) AND false) count --------------------------------------------------------------------- 0 @@ -401,8 +398,7 @@ EXECUTE prepared_zero_shard_select(4); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 4) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 4) AND false) count --------------------------------------------------------------------- 0 @@ -412,8 +408,7 @@ EXECUTE prepared_zero_shard_select(5); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 5) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 5) AND false) count --------------------------------------------------------------------- 0 @@ -423,16 +418,14 @@ EXECUTE prepared_zero_shard_select(6); DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 6) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 6) AND false) count --------------------------------------------------------------------- 0 (1 row) EXECUTE prepared_zero_shard_select(7); -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 7) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 7) AND false) count --------------------------------------------------------------------- 0 @@ -466,16 +459,14 @@ EXECUTE prepared_zero_shard_update(7); -- same test with fast-path disabled SET citus.enable_fast_path_router_planner TO FALSE; EXECUTE prepared_zero_shard_select(1); -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) count --------------------------------------------------------------------- 0 (1 row) EXECUTE prepared_zero_shard_select(2); -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) count --------------------------------------------------------------------- 0 @@ -490,8 +481,7 @@ PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = EXECUTE prepared_zero_shard_select(1); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 @@ -500,8 +490,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 4 EXECUTE prepared_zero_shard_select(2); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 @@ -510,8 +499,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 3 EXECUTE prepared_zero_shard_select(3); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 @@ -520,8 +508,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 4 EXECUTE prepared_zero_shard_select(4); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 @@ -530,8 +517,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 3 EXECUTE prepared_zero_shard_select(5); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 @@ -540,16 +526,14 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 4 EXECUTE prepared_zero_shard_select(6); DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 (1 row) EXECUTE prepared_zero_shard_select(7); -NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) -DETAIL: on server postgres@localhost:xxxxx connectionId: 3 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index fb20c941b..697d2ce21 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -106,7 +106,7 @@ FROM DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx count --------------------------------------------------------------------- @@ -291,7 +291,7 @@ DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT some_values_1.key, random() AS random FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (some_values_1.key OPERATOR(pg_catalog.=) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 @@ -321,7 +321,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 @@ -350,7 +350,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT key DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) top_cte JOIN intermediate_result_pruning.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 @@ -389,8 +389,8 @@ SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key); DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 2 @@ -406,8 +406,8 @@ SELECT count(*) FROM some_values_2 JOIN some_values_1 USING (key) WHERE false; DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_2: SELECT key, random() AS random FROM intermediate_result_pruning.table_2 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_2 JOIN (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 USING (key)) WHERE false -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 @@ -428,8 +428,8 @@ FROM DEBUG: generating subplan XXX_1 for CTE some_values_1: SELECT key, random() AS random FROM intermediate_result_pruning.table_1 WHERE (value OPERATOR(pg_catalog.=) ANY (ARRAY['3'::text, '4'::text])) DEBUG: generating subplan XXX_2 for CTE some_values_3: SELECT key, random() AS random FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.random FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, random double precision)) some_values_3 -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 2 @@ -481,7 +481,7 @@ DEBUG: generating subplan XXX_4 for subquery SELECT avg((table_2.value)::intege DEBUG: generating subplan XXX_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.>) 111)) GROUP BY level_5.avg_ev_type DEBUG: generating subplan XXX_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) GROUP BY table_1.value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx @@ -489,7 +489,7 @@ DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_6 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_6 will be written to local file count --------------------------------------------------------------------- 0 @@ -540,12 +540,12 @@ DEBUG: generating subplan XXX_4 for subquery SELECT avg((table_2.value)::intege DEBUG: generating subplan XXX_5 for subquery SELECT min(table_1.value) AS min FROM (SELECT intermediate_result.avg_ev_type FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(avg_ev_type numeric)) level_5, intermediate_result_pruning.table_1 WHERE ((level_5.avg_ev_type OPERATOR(pg_catalog.=) (table_1.key)::numeric) AND (table_1.key OPERATOR(pg_catalog.=) 111)) GROUP BY level_5.avg_ev_type DEBUG: generating subplan XXX_6 for subquery SELECT avg((level_6.min)::integer) AS avg FROM (SELECT intermediate_result.min FROM read_intermediate_result('XXX_5'::text, 'binary'::citus_copy_format) intermediate_result(min text)) level_6, intermediate_result_pruning.table_1 WHERE ((table_1.key OPERATOR(pg_catalog.=) (level_6.min)::integer) AND (table_1.key OPERATOR(pg_catalog.=) 4)) GROUP BY table_1.value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.avg FROM read_intermediate_result('XXX_6'::text, 'binary'::citus_copy_format) intermediate_result(avg numeric)) bar -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx DEBUG: Subplan XXX_5 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_6 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_6 will be written to local file count --------------------------------------------------------------------- 0 @@ -559,8 +559,8 @@ INTERSECT DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file key --------------------------------------------------------------------- (0 rows) @@ -587,10 +587,10 @@ DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_resul DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer) INTERSECT SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer) DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file key --------------------------------------------------------------------- (0 rows) @@ -618,9 +618,9 @@ DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT count(*) AS count FROM (i DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) cte_2 DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file count --------------------------------------------------------------------- 0 @@ -680,8 +680,8 @@ DEBUG: generating subplan XXX_2 for CTE raw_data: DELETE FROM intermediate_resu DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE (key OPERATOR(pg_catalog.>) 1) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (key OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value @@ -710,8 +710,8 @@ DEBUG: generating subplan XXX_2 for CTE raw_data: DELETE FROM intermediate_resu DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) select_data WHERE ((key)::double precision OPERATOR(pg_catalog.>) ((1)::double precision OPERATOR(pg_catalog.+) random())) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE ((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value @@ -737,7 +737,7 @@ DEBUG: generating subplan XXX_1 for CTE raw_data: DELETE FROM intermediate_resu DEBUG: generating subplan XXX_1 for subquery SELECT min(key) AS min FROM intermediate_result_pruning.table_1 WHERE ((key)::double precision OPERATOR(pg_catalog.>) random()) DEBUG: Plan XXX query after replacing subqueries and CTEs: DELETE FROM intermediate_result_pruning.table_2 WHERE (((value)::integer OPERATOR(pg_catalog.>=) (SELECT intermediate_result.min FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(min integer))) AND (key OPERATOR(pg_catalog.=) 6)) RETURNING key, value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) raw_data -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- @@ -789,10 +789,10 @@ DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermed DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((key OPERATOR(pg_catalog.=) 1) AND ((value)::integer OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -- same query, cte is on the FROM clause -- and this time the final query (and top-level intermediate result) @@ -826,10 +826,10 @@ DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermed DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table_2.key, table_2.value FROM intermediate_result_pruning.table_2, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo WHERE ((table_2.key OPERATOR(pg_catalog.<>) 1) AND (foo.key OPERATOR(pg_catalog.=) (table_2.value)::integer)) DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -- append partitioned/heap-type diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 2dd3247e0..2275661bf 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -528,7 +528,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar TRUNCATE distributed_table CASCADE; NOTICE: truncate cascades to table "second_distributed_table" -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -543,7 +543,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- even no need to supply any data COPY distributed_table FROM STDIN WITH CSV; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -557,7 +557,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -571,7 +571,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) INSERT INTO distributed_table (key) SELECT key+1 FROM distributed_table; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -586,7 +586,7 @@ BEGIN; DELETE FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) EXPLAIN ANALYZE DELETE FROM distributed_table WHERE key = 1; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -686,6 +686,7 @@ distributed_local_mixed AS (SELECT * FROM reference_table WHERE key IN (SELECT k SELECT * FROM local_insert, distributed_local_mixed; NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age NOTICE: executing the command locally: SELECT key FROM local_shard_execution.reference_table_1470000 reference_table WHERE (key OPERATOR(pg_catalog.=) ANY (SELECT local_insert.key FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.age FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint)) local_insert)) +NOTICE: executing the command locally: SELECT local_insert.key, local_insert.value, local_insert.age, distributed_local_mixed.key FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.age FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint)) local_insert, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_local_mixed key | value | age | key --------------------------------------------------------------------- 1 | 11 | 21 | 1 @@ -732,6 +733,7 @@ WHERE distributed_table.value = all_data.value AND distributed_table.key = 1 ORDER BY 1 DESC; +NOTICE: executing the command locally: SELECT distributed_table.key FROM local_shard_execution.distributed_table_1470001 distributed_table, (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) all_data WHERE ((distributed_table.value OPERATOR(pg_catalog.=) all_data.value) AND (distributed_table.key OPERATOR(pg_catalog.=) 1)) ORDER BY distributed_table.key DESC key --------------------------------------------------------------------- 1 @@ -1225,7 +1227,7 @@ BEGIN; NOTICE: executing the command locally: DELETE FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.=) 500) SET LOCAL citus.task_executor_type = 'task-tracker'; SELECT count(*) FROM distributed_table; -ERROR: cannot execute command because a local execution has already been done in the transaction +ERROR: cannot execute command because a local execution has accessed a placement in the transaction DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; @@ -1434,6 +1436,7 @@ EXECUTE serial_prepared_local; -- Citus currently doesn't allow using task_assignment_policy for intermediate results WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1000) RETURNING key +NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_local_mixed key --------------------------------------------------------------------- 1000 diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out new file mode 100644 index 000000000..af257a55d --- /dev/null +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -0,0 +1,948 @@ +CREATE SCHEMA locally_execute_intermediate_results; +SET search_path TO locally_execute_intermediate_results; +SET citus.log_intermediate_results TO TRUE; +SET citus.log_local_commands TO TRUE; +SET client_min_messages TO DEBUG1; +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 1580000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +CREATE TABLE table_1 (key int, value text); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_table (key int, value text); +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +SELECT create_reference_table('ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- load some data +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +-- prevent PG 11 - PG 12 outputs to diverge +-- and have a lot more CTEs recursively planned for the +-- sake of increasing the test coverage +SET citus.enable_cte_inlining TO false; +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- subquery in the WHERE part of the query can be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- now all the intermediate results are safe to be in local files +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) + count +--------------------------------------------------------------------- +(0 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- now, the CTE is going to be written locally, +-- plus that is going to be read locally because +-- of the aggragate over the cte in HAVING +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- two ctes are going to be written locally and executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file +NOTICE: executing the command locally: SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) + count +--------------------------------------------------------------------- + 1 + 1 + 1 +(3 rows) + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +ERROR: result "31_1" does not exist +CONTEXT: while executing command on localhost:xxxxx +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a + count | key +--------------------------------------------------------------------- + 1 | 4 +(1 row) + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); + count | key +--------------------------------------------------------------------- + 1 | 6 +(1 row) + +-- some edge cases around CTEs used inside other CTEs +-- everything can be executed locally +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: Subplan XXX_3 will be written to local file +NOTICE: executing the command locally: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +NOTICE: executing the command locally: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 + max +--------------------------------------------------------------------- + 4 +(1 row) + +-- the join between cte_3 and table_2 has to happen remotely +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 + key | value +--------------------------------------------------------------------- +(0 rows) + +-- the join between cte_3 and table_2 has to happen remotely +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 + key | value +--------------------------------------------------------------------- + 4 | 4 +(1 row) + +\c - - - :worker_1_port +-- now use the same queries on a worker +SET search_path TO locally_execute_intermediate_results; +SET citus.log_intermediate_results TO TRUE; +SET citus.log_local_commands TO TRUE; +SET client_min_messages TO DEBUG1; +-- prevent PG 11 - PG 12 outputs to diverge +-- and have a lot more CTEs recursively planned for the +-- sake of increasing the test coverage +SET citus.enable_cte_inlining TO false; +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- On non-mx case the subquery in the WHERE part of the query can be executed locally +-- however, on Citus MX we have this limitation where the query cannot be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- although all the intermediate results are safe to be in local files +-- we currently do not support it on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file + count +--------------------------------------------------------------------- +(0 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally, but not on an Citus MX worker +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- now, the CTE is going to be written locally, +-- plus that could have been read locally on the coordinator +-- because of the aggragate over the cte in HAVING +-- but not on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- two could have been written locally and executed locally +-- on the coordinator, but not on the workers +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 + 1 +(3 rows) + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +ERROR: result "28_1" does not exist +CONTEXT: while executing command on localhost:xxxxx +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file + count | key +--------------------------------------------------------------------- + 1 | 4 +(1 row) + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); +NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) + count | key +--------------------------------------------------------------------- + 1 | 6 +(1 row) + +-- some edge cases around CTEs used inside other CTEs +-- everything could be executed locally on the coordinator, +-- but not on the worker +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be written to local file + max +--------------------------------------------------------------------- + 4 +(1 row) + +-- the join between cte_3 and table_2 has to could have happened +-- locally since the key = 1 resides on this node +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- +(0 rows) + +-- the join between cte_3 and table_2 has to cannot happen +-- locally because the key = 2 resides on a remote node +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- +(0 rows) + +-- the join between cte_3 and ref can could have happened locally +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- + 4 | 4 +(1 row) + +-- finally, use round-robin policy on the workers with same set of queries +set citus.task_assignment_policy TO "round-robin" ; +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- On non-mx case the subquery in the WHERE part of the query can be executed locally +-- however, on Citus MX we have this limitation where the query cannot be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for subquery SELECT key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 ORDER BY key LIMIT 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer))) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- although all the intermediate results are safe to be in local files +-- we currently do not support it on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(key) AS max FROM locally_execute_intermediate_results.table_2 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 WHERE (key OPERATOR(pg_catalog.>) (SELECT cte_2.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_2)) GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + count +--------------------------------------------------------------------- +(0 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally, but not on an Citus MX worker +-- TODO: fixed by #3396 +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT key, value FROM locally_execute_intermediate_results.table_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_3 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT cte_1.max FROM ((SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 JOIN (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_2 USING (max)))) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +ERROR: result "66_1" does not exist +CONTEXT: while executing command on localhost:xxxxx +-- now, the CTE is going to be written locally, +-- plus that could have been read locally on the coordinator +-- because of the aggragate over the cte in HAVING +-- but not on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM locally_execute_intermediate_results.table_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.>) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- two could have been written locally and executed locally +-- on the coordinator, but not on the workers +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT max(value) AS max FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_3 for subquery SELECT max(max) AS max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(max text)) cte_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 GROUP BY key HAVING (max(value) OPERATOR(pg_catalog.<) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be written to local file + count +--------------------------------------------------------------------- + 1 + 1 + 1 +(3 rows) + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +ERROR: result "77_1" does not exist +CONTEXT: while executing command on localhost:xxxxx +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for subquery SELECT max(value) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN locally_execute_intermediate_results.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.=) (SELECT intermediate_result.max FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(max text))) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file + count | key +--------------------------------------------------------------------- + 1 | 4 +(1 row) + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); +NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) + count | key +--------------------------------------------------------------------- + 1 | 6 +(1 row) + +-- some edge cases around CTEs used inside other CTEs +-- everything could be executed locally on the coordinator, +-- but not on the worker +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS max FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT max FROM (SELECT intermediate_result.max FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(max integer)) cte_3 +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + max +--------------------------------------------------------------------- + 4 +(1 row) + +-- the join between cte_3 and table_2 has to could have happened +-- locally since the key = 1 resides on this node +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 1) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- +(0 rows) + +-- the join between cte_3 and table_2 has to cannot happen +-- locally because the key = 2 resides on a remote node +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, table_2.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.table_2 USING (key)) WHERE (table_2.key OPERATOR(pg_catalog.=) 2) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- +(0 rows) + +-- the join between cte_3 and ref can could have happened locally +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); +DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT key, value FROM locally_execute_intermediate_results.table_1 +DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 +DEBUG: generating subplan XXX_3 for CTE cte_3: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table USING (key)) +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx + key | value +--------------------------------------------------------------------- + 4 | 4 +(1 row) + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA locally_execute_intermediate_results CASCADE; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index a4db688af..b982e583e 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1732,7 +1732,7 @@ SELECT * FROM articles_range where author_id = 1; DEBUG: Creating router plan DEBUG: Plan is router executable NOTICE: issuing SELECT id, author_id, title, word_count FROM public.articles_range_840012 articles_range WHERE (author_id OPERATOR(pg_catalog.=) 1) -DETAIL: on server postgres@localhost:xxxxx connectionId: 1 +DETAIL: on server postgres@localhost:xxxxx connectionId: 2 id | author_id | title | word_count --------------------------------------------------------------------- (0 rows) @@ -1741,7 +1741,7 @@ SELECT * FROM articles_range where author_id = 1 or author_id = 5; DEBUG: Creating router plan DEBUG: Plan is router executable NOTICE: issuing SELECT id, author_id, title, word_count FROM public.articles_range_840012 articles_range WHERE ((author_id OPERATOR(pg_catalog.=) 1) OR (author_id OPERATOR(pg_catalog.=) 5)) -DETAIL: on server postgres@localhost:xxxxx connectionId: 1 +DETAIL: on server postgres@localhost:xxxxx connectionId: 2 id | author_id | title | word_count --------------------------------------------------------------------- (0 rows) @@ -1750,8 +1750,7 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: 1 SELECT * FROM articles_range where author_id = 1 and author_id = 2; DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT id, author_id, title, word_count FROM (SELECT NULL::bigint AS id, NULL::bigint AS author_id, NULL::character varying(20) AS title, NULL::integer AS word_count WHERE false) articles_range(id, author_id, title, word_count) WHERE ((author_id OPERATOR(pg_catalog.=) 1) AND (author_id OPERATOR(pg_catalog.=) 2)) -DETAIL: on server postgres@localhost:xxxxx connectionId: 2 +NOTICE: executing the command locally: SELECT id, author_id, title, word_count FROM (SELECT NULL::bigint AS id, NULL::bigint AS author_id, NULL::character varying(20) AS title, NULL::integer AS word_count WHERE false) articles_range(id, author_id, title, word_count) WHERE ((author_id OPERATOR(pg_catalog.=) 1) AND (author_id OPERATOR(pg_catalog.=) 2)) id | author_id | title | word_count --------------------------------------------------------------------- (0 rows) @@ -1762,7 +1761,7 @@ SELECT * FROM articles_range ar join authors_range au on (ar.author_id = au.id) DEBUG: Creating router plan DEBUG: Plan is router executable NOTICE: issuing SELECT ar.id, ar.author_id, ar.title, ar.word_count, au.name, au.id FROM (public.articles_range_840012 ar JOIN public.authors_range_840008 au ON ((ar.author_id OPERATOR(pg_catalog.=) au.id))) WHERE (ar.author_id OPERATOR(pg_catalog.=) 1) -DETAIL: on server postgres@localhost:xxxxx connectionId: 1 +DETAIL: on server postgres@localhost:xxxxx connectionId: 2 id | author_id | title | word_count | name | id --------------------------------------------------------------------- (0 rows) @@ -1772,8 +1771,7 @@ SELECT * FROM articles_range ar join authors_range au on (ar.author_id = au.id) WHERE ar.author_id = 1 and au.id = 2; DEBUG: Creating router plan DEBUG: Plan is router executable -NOTICE: issuing SELECT ar.id, ar.author_id, ar.title, ar.word_count, au.name, au.id FROM ((SELECT NULL::bigint AS id, NULL::bigint AS author_id, NULL::character varying(20) AS title, NULL::integer AS word_count WHERE false) ar(id, author_id, title, word_count) JOIN (SELECT NULL::character varying(20) AS name, NULL::bigint AS id WHERE false) au(name, id) ON ((ar.author_id OPERATOR(pg_catalog.=) au.id))) WHERE ((ar.author_id OPERATOR(pg_catalog.=) 1) AND (au.id OPERATOR(pg_catalog.=) 2)) -DETAIL: on server postgres@localhost:xxxxx connectionId: 1 +NOTICE: executing the command locally: SELECT ar.id, ar.author_id, ar.title, ar.word_count, au.name, au.id FROM ((SELECT NULL::bigint AS id, NULL::bigint AS author_id, NULL::character varying(20) AS title, NULL::integer AS word_count WHERE false) ar(id, author_id, title, word_count) JOIN (SELECT NULL::character varying(20) AS name, NULL::bigint AS id WHERE false) au(name, id) ON ((ar.author_id OPERATOR(pg_catalog.=) au.id))) WHERE ((ar.author_id OPERATOR(pg_catalog.=) 1) AND (au.id OPERATOR(pg_catalog.=) 2)) id | author_id | title | word_count | name | id --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/set_operation_and_local_tables.out b/src/test/regress/expected/set_operation_and_local_tables.out index e4b847654..c0b9c27c9 100644 --- a/src/test/regress/expected/set_operation_and_local_tables.out +++ b/src/test/regress/expected/set_operation_and_local_tables.out @@ -81,7 +81,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_ DEBUG: Creating router plan DEBUG: Plan is router executable ERROR: division by zero -CONTEXT: while executing command on localhost:xxxxx -- we should be able to run set operations with generate series and local tables as well ((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC; DEBUG: Local tables cannot be used in distributed queries. diff --git a/src/test/regress/expected/subquery_complex_target_list.out b/src/test/regress/expected/subquery_complex_target_list.out index 8090384f7..55152b68d 100644 --- a/src/test/regress/expected/subquery_complex_target_list.out +++ b/src/test/regress/expected/subquery_complex_target_list.out @@ -5,6 +5,9 @@ CREATE SCHEMA subquery_complex; SET search_path TO subquery_complex, public; SET client_min_messages TO DEBUG1; +-- the logs are enabled and it sometimes +-- lead to flaky outputs when jit enabled +SET jit_above_cost TO -1; -- COUNT DISTINCT at the top level query SELECT event_type, count(distinct value_2) diff --git a/src/test/regress/expected/with_basics.out b/src/test/regress/expected/with_basics.out index 9b553fd80..153ec2651 100644 --- a/src/test/regress/expected/with_basics.out +++ b/src/test/regress/expected/with_basics.out @@ -101,7 +101,6 @@ WITH cte AS ( ) SELECT (SELECT * FROM cte); ERROR: more than one row returned by a subquery used as an expression -CONTEXT: while executing command on localhost:xxxxx WITH cte_basic AS ( SELECT user_id FROM users_table WHERE user_id = 1 ) diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 8ee00809f..6cdadd3c3 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -44,6 +44,7 @@ test: multi_mx_modifying_xacts test: multi_mx_explain test: multi_mx_reference_table test: multi_mx_insert_select_repartition +test: locally_execute_intermediate_results # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/failure_cte_subquery.sql b/src/test/regress/sql/failure_cte_subquery.sql index c4341ef4b..63f79b00b 100644 --- a/src/test/regress/sql/failure_cte_subquery.sql +++ b/src/test/regress/sql/failure_cte_subquery.sql @@ -9,6 +9,9 @@ SET citus.next_shard_id TO 16000000; -- the tests rely on intermediate results SET citus.enable_cte_inlining TO false; +-- prevent using locally executing the intermediate results +SET citus.task_assignment_policy TO "round-robin"; + SELECT pg_backend_pid() as pid \gset CREATE TABLE users_table (user_id int, user_name text); diff --git a/src/test/regress/sql/locally_execute_intermediate_results.sql b/src/test/regress/sql/locally_execute_intermediate_results.sql new file mode 100644 index 000000000..c7f785b07 --- /dev/null +++ b/src/test/regress/sql/locally_execute_intermediate_results.sql @@ -0,0 +1,515 @@ +CREATE SCHEMA locally_execute_intermediate_results; +SET search_path TO locally_execute_intermediate_results; +SET citus.log_intermediate_results TO TRUE; +SET citus.log_local_commands TO TRUE; +SET client_min_messages TO DEBUG1; + +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 1580000; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + +CREATE TABLE ref_table (key int, value text); +SELECT create_reference_table('ref_table'); + +-- load some data +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); +INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +-- prevent PG 11 - PG 12 outputs to diverge +-- and have a lot more CTEs recursively planned for the +-- sake of increasing the test coverage +SET citus.enable_cte_inlining TO false; + +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- subquery in the WHERE part of the query can be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- now all the intermediate results are safe to be in local files +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + +-- now, the CTE is going to be written locally, +-- plus that is going to be read locally because +-- of the aggragate over the cte in HAVING +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); + + +-- two ctes are going to be written locally and executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); + + +-- some edge cases around CTEs used inside other CTEs + +-- everything can be executed locally +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; + +-- the join between cte_3 and table_2 has to happen remotely +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; + +-- the join between cte_3 and table_2 has to happen remotely +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); + + +\c - - - :worker_1_port + +-- now use the same queries on a worker +SET search_path TO locally_execute_intermediate_results; +SET citus.log_intermediate_results TO TRUE; +SET citus.log_local_commands TO TRUE; +SET client_min_messages TO DEBUG1; + +-- prevent PG 11 - PG 12 outputs to diverge +-- and have a lot more CTEs recursively planned for the +-- sake of increasing the test coverage +SET citus.enable_cte_inlining TO false; + +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- On non-mx case the subquery in the WHERE part of the query can be executed locally +-- however, on Citus MX we have this limitation where the query cannot be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- although all the intermediate results are safe to be in local files +-- we currently do not support it on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally, but not on an Citus MX worker +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + +-- now, the CTE is going to be written locally, +-- plus that could have been read locally on the coordinator +-- because of the aggragate over the cte in HAVING +-- but not on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); + + +-- two could have been written locally and executed locally +-- on the coordinator, but not on the workers +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); + + +-- some edge cases around CTEs used inside other CTEs + +-- everything could be executed locally on the coordinator, +-- but not on the worker +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; + +-- the join between cte_3 and table_2 has to could have happened +-- locally since the key = 1 resides on this node +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; + +-- the join between cte_3 and table_2 has to cannot happen +-- locally because the key = 2 resides on a remote node +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; + +-- the join between cte_3 and ref can could have happened locally +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); + + +-- finally, use round-robin policy on the workers with same set of queries +set citus.task_assignment_policy TO "round-robin" ; +-- the query cannot be executed locally, but still because of +-- HAVING the intermediate result is written to local file as well +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- On non-mx case the subquery in the WHERE part of the query can be executed locally +-- however, on Citus MX we have this limitation where the query cannot be executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT key FROM cte_2 ORDER BY 1 LIMIT 1) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- subquery in the WHERE part of the query should not be executed locally +-- because it can be pushed down with the jointree +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2) +SELECT +count(*) +FROM +table_2 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- although all the intermediate results are safe to be in local files +-- we currently do not support it on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(key) FROM table_2), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +WHERE +key > (SELECT max FROM cte_2) +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1); + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, but nothing executed locally +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + + +-- multiple CTEs are joined inside HAVING, so written to file +-- locally, also the join tree contains only another CTE, so should be +-- executed locally, but not on an Citus MX worker +-- TODO: fixed by #3396 +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT max(value) FROM table_1), +cte_3 AS (SELECT * FROM table_2) +SELECT +count(*) +FROM +cte_3 +GROUP BY key +HAVING max(value) > (SELECT max FROM cte_1 JOIN cte_2 USING (max)); + +-- now, the CTE is going to be written locally, +-- plus that could have been read locally on the coordinator +-- because of the aggragate over the cte in HAVING +-- but not on Citus MX +WITH cte_1 AS (SELECT max(value) FROM table_1) +SELECT +count(*) +FROM +table_2 +GROUP BY key +HAVING max(value) > (SELECT max(max) FROM cte_1); + + +-- two could have been written locally and executed locally +-- on the coordinator, but not on the workers +WITH cte_1 AS (SELECT max(value) FROM table_1), +cte_2 AS (SELECT * FROM table_1) +SELECT +count(*) +FROM +cte_2 +GROUP BY key +HAVING max(value) < (SELECT max(max) FROM cte_1); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING +-- TODO: fixed by #3396 +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) > (SELECT value FROM a)); + +-- this time the same CTE is both joined with a distributed +-- table and used in HAVING -- but used in another subquery/aggregate +-- so one more level of recursive planning +WITH a AS (SELECT * FROM table_1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) = (SELECT max(value) FROM a)); + +-- same query as the above, without the aggragate +WITH a AS (SELECT max(key) as key, max(value) as value FROM ref_table) +SELECT count(*), +key +FROM a JOIN ref_table USING (key) +GROUP BY key +HAVING (max(ref_table.value) <= (SELECT value FROM a)); + + +-- some edge cases around CTEs used inside other CTEs + +-- everything could be executed locally on the coordinator, +-- but not on the worker +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) FROM cte_2) +SELECT * FROM cte_3; + +-- the join between cte_3 and table_2 has to could have happened +-- locally since the key = 1 resides on this node +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 1; + +-- the join between cte_3 and table_2 has to cannot happen +-- locally because the key = 2 resides on a remote node +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN table_2 USING (key) WHERE table_2.key = 2; + +-- the join between cte_3 and ref can could have happened locally +-- but because of the current implementation limitations we can't +WITH cte_1 as (SELECT * FROM table_1), +cte_2 AS (SELECT * FROM cte_1), +cte_3 AS (SELECT max(key) as key FROM cte_2) +SELECT * FROM cte_3 JOIN ref_table USING (key); + + +\c - - - :master_port + +SET client_min_messages TO ERROR; +DROP SCHEMA locally_execute_intermediate_results CASCADE; diff --git a/src/test/regress/sql/subquery_complex_target_list.sql b/src/test/regress/sql/subquery_complex_target_list.sql index d64da5670..777984e9b 100644 --- a/src/test/regress/sql/subquery_complex_target_list.sql +++ b/src/test/regress/sql/subquery_complex_target_list.sql @@ -7,6 +7,10 @@ SET search_path TO subquery_complex, public; SET client_min_messages TO DEBUG1; +-- the logs are enabled and it sometimes +-- lead to flaky outputs when jit enabled +SET jit_above_cost TO -1; + -- COUNT DISTINCT at the top level query SELECT event_type, count(distinct value_2)