diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index fde187ded..4ec2ad4a2 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -164,7 +164,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, TupleDesc tupleDesc = CallStmtResultDesc(callStmt); TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc, &TTSOpsMinimalTuple); - bool hasReturning = true; + bool expectResults = true; Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; @@ -196,7 +196,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ); executionParams->tupleStore = tupleStore; executionParams->tupleDescriptor = tupleDesc; - executionParams->hasReturning = hasReturning; + executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; ExecuteTaskListExtended(executionParams); diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 458b9a00d..763578b9f 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -300,7 +300,7 @@ GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, } /* - * The user didn't provid "$paramIndex" but potentially the name of the paramater. + * The user didn't provid "$paramIndex" but potentially the name of the parameter. * So, loop over the arguments and try to find the argument name that matches * the parameter that user provided. */ diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e03a0330b..b3952d2b8 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -187,8 +187,11 @@ typedef struct DistributedExecution List *remoteTaskList; List *localTaskList; - /* the corresponding distributed plan has RETURNING */ - bool hasReturning; + /* + * Corresponding distributed plan returns results, + * either because it is a SELECT or has RETURNING. + */ + bool expectResults; /* Parameters for parameterized plans. Can be NULL. */ ParamListInfo paramListInfo; @@ -548,7 +551,7 @@ typedef struct TaskPlacementExecution /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, - bool hasReturning, + bool expectResults, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, @@ -579,7 +582,7 @@ static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(List *taskList); static bool TaskListRequires2PC(List *taskList); -static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); +static bool SelectForUpdateOnReferenceTable(List *taskList); static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); @@ -697,7 +700,7 @@ AdaptiveExecutor(CitusScanState *scanState) DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, taskList, - distributedPlan->hasReturning, + distributedPlan->expectResults, paramListInfo, tupleDescriptor, scanState->tuplestorestate, @@ -729,7 +732,7 @@ AdaptiveExecutor(CitusScanState *scanState) RunDistributedExecution(execution); } - if (distributedPlan->modLevel != ROW_MODIFY_READONLY) + if (job->jobQuery->commandType != CMD_SELECT) { if (list_length(execution->localTaskList) == 0) { @@ -756,7 +759,8 @@ AdaptiveExecutor(CitusScanState *scanState) DoRepartitionCleanup(jobIdList); } - if (SortReturning && distributedPlan->hasReturning) + if (SortReturning && distributedPlan->expectResults && + job->jobQuery->commandType != CMD_SELECT) { SortTupleStore(scanState); } @@ -887,7 +891,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, - bool hasReturning) + bool expectResults) { int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool localExecutionSupported = true; @@ -897,7 +901,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, executionParams->xactProperties = DecideTransactionPropertiesForTaskList( modLevel, taskList, false); - executionParams->hasReturning = hasReturning; + executionParams->expectResults = expectResults; executionParams->tupleStore = tupleStore; executionParams->tupleDescriptor = tupleDescriptor; @@ -960,7 +964,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) DistributedExecution *execution = CreateDistributedExecution( executionParams->modLevel, remoteTaskList, - executionParams->hasReturning, paramListInfo, + executionParams->expectResults, paramListInfo, executionParams->tupleDescriptor, executionParams->tupleStore, executionParams->targetPoolSize, &executionParams->xactProperties, executionParams->jobIdList); @@ -991,7 +995,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, executionParams->tupleStore = NULL; executionParams->tupleDescriptor = NULL; - executionParams->hasReturning = false; + executionParams->expectResults = false; executionParams->isUtilityCommand = false; executionParams->jobIdList = NIL; @@ -1005,7 +1009,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel, */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, - bool hasReturning, + bool expectResults, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, TransactionProperties *xactProperties, List *jobIdList) @@ -1015,7 +1019,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->modLevel = modLevel; execution->tasksToExecute = taskList; - execution->hasReturning = hasReturning; + execution->expectResults = expectResults; execution->transactionProperties = xactProperties; execution->localTaskList = NIL; @@ -1426,7 +1430,7 @@ ReadOnlyTask(TaskType taskType) { switch (taskType) { - case SELECT_TASK: + case READ_TASK: case MAP_OUTPUT_FETCH_TASK: case MAP_TASK: case MERGE_TASK: @@ -1449,16 +1453,11 @@ ReadOnlyTask(TaskType taskType) /* * SelectForUpdateOnReferenceTable returns true if the input task - * that contains FOR UPDATE clause that locks any reference tables. + * contains a FOR UPDATE clause that locks any reference tables. */ static bool -SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) +SelectForUpdateOnReferenceTable(List *taskList) { - if (modLevel != ROW_MODIFY_READONLY) - { - return false; - } - if (list_length(taskList) != 1) { /* we currently do not support SELECT FOR UPDATE on multi task queries */ @@ -1529,7 +1528,7 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) List *taskList = execution->tasksToExecute; if (modLevel <= ROW_MODIFY_READONLY && - !SelectForUpdateOnReferenceTable(modLevel, taskList)) + !SelectForUpdateOnReferenceTable(taskList)) { /* * Executor locks only apply to DML commands and SELECT FOR UPDATE queries @@ -1676,7 +1675,7 @@ UnclaimAllSessionConnections(List *sessionList) /* - * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any + * AssignTasksToConnectionsOrWorkerPool goes through the list of tasks to determine whether any * task placements need to be assigned to particular connections because of preceding * operations in the transaction. It then adds those connections to the pool and adds * the task placement executions to the assigned task queue of the connection. @@ -1686,7 +1685,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) { RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; - bool hasReturning = execution->hasReturning; + bool expectResults = execution->expectResults; int32 localGroupId = GetLocalGroupId(); @@ -1711,8 +1710,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) shardCommandExecution->placementExecutionCount = placementExecutionCount; shardCommandExecution->expectResults = - (hasReturning && !task->partiallyLocalOrRemote) || - modLevel == ROW_MODIFY_READONLY; + expectResults && !task->partiallyLocalOrRemote; ShardPlacement *taskPlacement = NULL; foreach_ptr(taskPlacement, task->taskPlacementList) @@ -1877,7 +1875,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) { switch (task->taskType) { - case SELECT_TASK: + case READ_TASK: { return EXECUTION_ORDER_ANY; } diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index 43ec15298..fba4ce1d2 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -358,7 +358,7 @@ AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) } /* - * If lock clause exists and it effects any reference table, we need to get + * If lock clause exists and it affects any reference table, we need to get * lock on shard resource. Type of lock is determined by the type of row lock * given in the query. If the type of row lock is either FOR NO KEY UPDATE or * FOR UPDATE we get ExclusiveLock on shard resource. We get ShareLock if it diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 0392880b8..adeac51c7 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -402,7 +402,7 @@ static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, bool errorOnAnyFailure) { - bool hasReturning = true; + bool expectResults = true; int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool randomAccess = true; bool interTransactions = false; @@ -428,7 +428,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, executionParams->tupleDescriptor = resultDescriptor; executionParams->tupleStore = resultStore; executionParams->xactProperties = xactProperties; - executionParams->hasReturning = hasReturning; + executionParams->expectResults = expectResults; ExecuteTaskListExtended(executionParams); @@ -556,7 +556,7 @@ FragmentTransferTaskList(List *fragmentListTransfers) SetPlacementNodeMetadata(targetPlacement, workerNode); Task *task = CitusMakeNode(Task); - task->taskType = SELECT_TASK; + task->taskType = READ_TASK; SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer)); task->taskPlacementList = list_make1(targetPlacement); diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 1b8ed2fc7..2ef5c6f2b 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -133,7 +133,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix; - bool hasReturning = distributedPlan->hasReturning; + bool hasReturning = distributedPlan->expectResults; HTAB *shardStateHash = NULL; /* select query to execute */ diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 4ece5b34d..a6482c1b8 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -635,7 +635,7 @@ TopLevelTask(Task *task) * SQL tasks can only appear at the top level in our query tree. Further, no * other task type can appear at the top level in our tree. */ - if (task->taskType == SELECT_TASK) + if (task->taskType == READ_TASK) { topLevelTask = true; } @@ -1063,7 +1063,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, * We finally queue this task for execution. Note that we queue sql and * other tasks slightly differently. */ - if (taskType == SELECT_TASK) + if (taskType == READ_TASK) { TrackerQueueSqlTask(taskTracker, task); } @@ -1253,7 +1253,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker, TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray; TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex]; TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST; - Assert(task->taskType == SELECT_TASK); + Assert(task->taskType == READ_TASK); switch (currentTransmitStatus) { @@ -1852,7 +1852,7 @@ ConstrainedNonMergeTaskList(List *taskAndExecutionList, Task *task) List *dependentTaskList = NIL; TaskType taskType = task->taskType; - if (taskType == SELECT_TASK || taskType == MAP_TASK) + if (taskType == READ_TASK || taskType == MAP_TASK) { upstreamTask = task; dependentTaskList = upstreamTask->dependentTaskList; @@ -1928,7 +1928,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task) * given task is a SQL or map task, we simply need to find its merge task * dependencies -- if any. */ - if (taskType == SELECT_TASK || taskType == MAP_TASK) + if (taskType == READ_TASK || taskType == MAP_TASK) { constrainedMergeTaskList = MergeTaskList(task->dependentTaskList); } @@ -2008,7 +2008,7 @@ ReassignTaskList(List *taskList) TaskExecution *taskExecution = task->taskExecution; bool transmitCompleted = TransmitExecutionCompleted(taskExecution); - if ((task->taskType == SELECT_TASK) && transmitCompleted) + if ((task->taskType == READ_TASK) && transmitCompleted) { completedTaskList = lappend(completedTaskList, task); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 68b3a17b7..861cbc70f 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -574,17 +574,6 @@ IsUpdateOrDelete(Query *query) } -/* - * IsModifyDistributedPlan returns true if the multi plan performs modifications, - * false otherwise. - */ -bool -IsModifyDistributedPlan(DistributedPlan *distributedPlan) -{ - return distributedPlan->modLevel > ROW_MODIFY_READONLY; -} - - /* * PlanFastPathDistributedStmt creates a distributed planned statement using * the FastPathPlanner. @@ -805,7 +794,7 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId, /* after inlining, we shouldn't have any inlinable CTEs */ Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery)); - #if PG_VERSION_NUM < PG_VERSION_12 +#if PG_VERSION_NUM < PG_VERSION_12 Query *query = planContext->query; /* diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 06605800f..0fdb06043 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG1, (errmsg("pushing down the function call"))); task = CitusMakeNode(Task); - task->taskType = SELECT_TASK; + task->taskType = READ_TASK; task->taskPlacementList = placementList; SetTaskQueryIfShouldLazyDeparse(task, planContext->query); task->anchorShardId = shardInterval->shardId; @@ -382,7 +382,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) distributedPlan->workerJob = job; distributedPlan->masterQuery = NULL; distributedPlan->routerExecutable = true; - distributedPlan->hasReturning = false; + distributedPlan->expectResults = true; /* worker will take care of any necessary locking, treat query as read-only */ distributedPlan->modLevel = ROW_MODIFY_READONLY; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index e55209e33..303b54d63 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -297,14 +297,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, distributedPlan->workerJob = workerJob; distributedPlan->masterQuery = NULL; distributedPlan->routerExecutable = true; - distributedPlan->hasReturning = false; + distributedPlan->expectResults = originalQuery->returningList != NIL; distributedPlan->targetRelationId = targetRelationId; - if (originalQuery->returningList != NIL) - { - distributedPlan->hasReturning = true; - } - return distributedPlan; } @@ -1135,7 +1130,7 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) } distributedPlan->insertSelectQuery = insertSelectQuery; - distributedPlan->hasReturning = insertSelectQuery->returningList != NIL; + distributedPlan->expectResults = insertSelectQuery->returningList != NIL; distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId); distributedPlan->targetRelationId = targetRelationId; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 528b2e812..6d5bfedff 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -263,6 +263,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, distributedPlan->masterQuery = masterQuery; distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); distributedPlan->modLevel = ROW_MODIFY_READONLY; + distributedPlan->expectResults = true; return distributedPlan; } @@ -2216,7 +2217,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId, plannerRestrictionContext-> relationRestrictionContext, - prunedRelationShardList, SELECT_TASK, + prunedRelationShardList, READ_TASK, false); } else @@ -2249,7 +2250,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell); /* we don't support parameters in the physical planner */ - if (assignedSqlTask->taskType == SELECT_TASK) + if (assignedSqlTask->taskType == READ_TASK) { assignedSqlTask->parametersInQueryStringResolved = job->parametersInJobQueryResolved; @@ -2679,7 +2680,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL); if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) || - taskType == SELECT_TASK) + taskType == READ_TASK) { pg_get_query_def(taskQuery, queryString); ereport(DEBUG4, (errmsg("distributed statement: %s", @@ -2973,7 +2974,7 @@ SqlTaskList(Job *job) StringInfo sqlQueryString = makeStringInfo(); pg_get_query_def(taskQuery, sqlQueryString); - Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SELECT_TASK, + Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, READ_TASK, sqlQueryString->data); sqlTask->dependentTaskList = dataFetchTaskList; sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList, @@ -5794,7 +5795,7 @@ AssignDataFetchDependencies(List *taskList) ListCell *dependentTaskCell = NULL; Assert(task->taskPlacementList != NIL); - Assert(task->taskType == SELECT_TASK || task->taskType == MERGE_TASK); + Assert(task->taskType == READ_TASK || task->taskType == MERGE_TASK); foreach(dependentTaskCell, dependentTaskList) { diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 80c963c6b..afc6212d0 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -118,11 +118,11 @@ bool EnableRouterExecution = true; /* planner functions forward declarations */ -static void CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, - Query *originalQuery, - Query *query, - PlannerRestrictionContext * - plannerRestrictionContext); +static void CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, + Query *originalQuery, + Query *query, + PlannerRestrictionContext * + plannerRestrictionContext); static Oid ResultRelationOidForQuery(Query *query); static bool IsTidColumn(Node *node); static DeferredErrorMessage * MultiShardModifyQuerySupported(Query *originalQuery, @@ -184,8 +184,8 @@ CreateRouterPlan(Query *originalQuery, Query *query, if (distributedPlan->planningError == NULL) { - CreateSingleTaskRouterPlan(distributedPlan, originalQuery, query, - plannerRestrictionContext); + CreateSingleTaskRouterSelectPlan(distributedPlan, originalQuery, query, + plannerRestrictionContext); } distributedPlan->fastPathRouterPlan = @@ -208,6 +208,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); bool multiShardQuery = false; + Assert(originalQuery->commandType != CMD_SELECT); + distributedPlan->modLevel = RowModifyLevelForQuery(query); distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, @@ -238,14 +240,9 @@ CreateModifyPlan(Query *originalQuery, Query *query, distributedPlan->workerJob = job; distributedPlan->masterQuery = NULL; distributedPlan->routerExecutable = true; - distributedPlan->hasReturning = false; + distributedPlan->expectResults = originalQuery->returningList != NIL; distributedPlan->targetRelationId = ResultRelationOidForQuery(query); - if (list_length(originalQuery->returningList) > 0) - { - distributedPlan->hasReturning = true; - } - distributedPlan->fastPathRouterPlan = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; @@ -255,20 +252,19 @@ CreateModifyPlan(Query *originalQuery, Query *query, /* - * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is - * either a modify task that changes a single shard, or a router task that returns - * query results from a single worker. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable the returned plan - * has planningError set to a description of the problem. + * CreateSingleTaskRouterPlan creates a physical plan for given SELECT query. + * The returned plan is a router task that returns query results from a single worker. + * If not router plannable, the returned plan's planningError describes the problem. */ static void -CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery, - Query *query, - PlannerRestrictionContext *plannerRestrictionContext) +CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *originalQuery, + Query *query, + PlannerRestrictionContext *plannerRestrictionContext) { + Assert(query->commandType == CMD_SELECT); + distributedPlan->modLevel = RowModifyLevelForQuery(query); - /* we cannot have multi shard update/delete query via this code path */ Job *job = RouterJob(originalQuery, plannerRestrictionContext, &distributedPlan->planningError); @@ -283,7 +279,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer distributedPlan->workerJob = job; distributedPlan->masterQuery = NULL; distributedPlan->routerExecutable = true; - distributedPlan->hasReturning = false; + distributedPlan->expectResults = true; } @@ -1414,6 +1410,8 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre static Job * RouterInsertJob(Query *originalQuery) { + Assert(originalQuery->commandType == CMD_INSERT); + bool isMultiRowInsert = IsMultiRowInsert(originalQuery); if (isMultiRowInsert) { @@ -1809,7 +1807,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved) { - Task *task = CreateTask(SELECT_TASK); + Task *task = CreateTask(READ_TASK); List *relationRowLockList = NIL; RowLocksOnRelations((Node *) query, &relationRowLockList); @@ -3116,7 +3114,7 @@ ExtractInsertPartitionKeyValue(Query *query) /* * MultiRouterPlannableQuery checks if given select query is router plannable, * setting distributedPlan->planningError if not. - * The query is router plannable if it is a modify query, or if its is a select + * The query is router plannable if it is a modify query, or if it is a select * query issued on a hash partitioned distributed table. Router plannable checks * for select queries can be turned off by setting citus.enable_router_execution * flag to false. @@ -3177,7 +3175,7 @@ MultiRouterPlannableQuery(Query *query) /* * Currently, we don't support tables with replication factor > 1, - * except reference tables with SELECT ... FOR UDPATE queries. It is + * except reference tables with SELECT ... FOR UPDATE queries. It is * also not supported from MX nodes. */ if (query->hasForUpdate) diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index e2a40677e..261257906 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -288,7 +288,7 @@ RecordParallelRelationAccessForTaskList(List *taskList) */ Task *firstTask = linitial(taskList); - if (firstTask->taskType == SELECT_TASK) + if (firstTask->taskType == READ_TASK) { RecordRelationParallelSelectAccessForTask(firstTask); } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index 17e076503..27779fc67 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -71,7 +71,7 @@ ExecuteMasterEvaluableFunctionsAndParameters(Query *query, PlanState *planState) /* - * ExecuteMasterEvaluableParameters evaluates external paramaters that can be + * ExecuteMasterEvaluableParameters evaluates external parameters that can be * resolved to a constant. */ void @@ -87,7 +87,7 @@ ExecuteMasterEvaluableParameters(Query *query, PlanState *planState) /* - * PartiallyEvaluateExpression descend into an expression tree to evaluate + * PartiallyEvaluateExpression descends into an expression tree to evaluate * expressions that can be resolved to a constant on the master. Expressions * containing a Var are skipped, since the value of the Var is not known * on the master. diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 1d2819309..80afd89b2 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -120,7 +120,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) COPY_SCALAR_FIELD(planId); COPY_SCALAR_FIELD(modLevel); - COPY_SCALAR_FIELD(hasReturning); + COPY_SCALAR_FIELD(expectResults); COPY_SCALAR_FIELD(routerExecutable); COPY_NODE_FIELD(workerJob); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index a6515f599..68c63cc27 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -184,7 +184,7 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_UINT64_FIELD(planId); WRITE_ENUM_FIELD(modLevel, RowModifyLevel); - WRITE_BOOL_FIELD(hasReturning); + WRITE_BOOL_FIELD(expectResults); WRITE_BOOL_FIELD(routerExecutable); WRITE_NODE_FIELD(workerJob); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index f08ebd094..859193199 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -198,7 +198,6 @@ extern void multi_join_restriction_hook(PlannerInfo *root, JoinPathExtraData *extra); extern bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams); extern bool IsModifyCommand(Query *query); -extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern void EnsurePartitionTableNotReplicated(Oid relationId); extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9670b0a6d..03ac8ccdb 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -93,8 +93,8 @@ typedef struct ExecutionParams /* tupleStore is where the results will be stored for this execution */ Tuplestorestate *tupleStore; - /* hasReturning is true if this execution will return some result. */ - bool hasReturning; + /* expectResults is true if this execution will return some result. */ + bool expectResults; /* targetPoolSize is the maximum amount of connections per worker */ int targetPoolSize; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index dca343e27..78faf41c3 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -81,7 +81,7 @@ typedef enum typedef enum { TASK_TYPE_INVALID_FIRST, - SELECT_TASK, + READ_TASK, MAP_TASK, MERGE_TASK, MAP_OUTPUT_FETCH_TASK, @@ -364,8 +364,11 @@ typedef struct DistributedPlan /* specifies nature of modifications in query */ RowModifyLevel modLevel; - /* specifies whether a DML command has a RETURNING */ - bool hasReturning; + /* + * specifies whether plan returns results, + * either as a SELECT or a DML which has RETURNING. + */ + bool expectResults; /* a router executable query is executed entirely on a worker */ bool routerExecutable;