mirror of https://github.com/citusdata/citus.git
Merge pull request #3834 from citusdata/prep-routing-modifying-ctes
Prep routing modifying ctespull/3746/merge
commit
a3c470b2b8
|
@ -164,7 +164,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
|
||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDesc,
|
||||
&TTSOpsMinimalTuple);
|
||||
bool hasReturning = true;
|
||||
bool expectResults = true;
|
||||
Task *task = CitusMakeNode(Task);
|
||||
|
||||
task->jobId = INVALID_JOB_ID;
|
||||
|
@ -196,7 +196,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
|||
);
|
||||
executionParams->tupleStore = tupleStore;
|
||||
executionParams->tupleDescriptor = tupleDesc;
|
||||
executionParams->hasReturning = hasReturning;
|
||||
executionParams->expectResults = expectResults;
|
||||
executionParams->xactProperties = xactProperties;
|
||||
ExecuteTaskListExtended(executionParams);
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
|
|||
}
|
||||
|
||||
/*
|
||||
* The user didn't provid "$paramIndex" but potentially the name of the paramater.
|
||||
* The user didn't provid "$paramIndex" but potentially the name of the parameter.
|
||||
* So, loop over the arguments and try to find the argument name that matches
|
||||
* the parameter that user provided.
|
||||
*/
|
||||
|
|
|
@ -187,8 +187,11 @@ typedef struct DistributedExecution
|
|||
List *remoteTaskList;
|
||||
List *localTaskList;
|
||||
|
||||
/* the corresponding distributed plan has RETURNING */
|
||||
bool hasReturning;
|
||||
/*
|
||||
* Corresponding distributed plan returns results,
|
||||
* either because it is a SELECT or has RETURNING.
|
||||
*/
|
||||
bool expectResults;
|
||||
|
||||
/* Parameters for parameterized plans. Can be NULL. */
|
||||
ParamListInfo paramListInfo;
|
||||
|
@ -548,7 +551,7 @@ typedef struct TaskPlacementExecution
|
|||
/* local functions */
|
||||
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
|
||||
List *taskList,
|
||||
bool hasReturning,
|
||||
bool expectResults,
|
||||
ParamListInfo paramListInfo,
|
||||
TupleDesc tupleDescriptor,
|
||||
Tuplestorestate *tupleStore,
|
||||
|
@ -579,7 +582,7 @@ static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList);
|
|||
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
||||
static bool DistributedExecutionRequiresRollback(List *taskList);
|
||||
static bool TaskListRequires2PC(List *taskList);
|
||||
static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
|
||||
static bool SelectForUpdateOnReferenceTable(List *taskList);
|
||||
static void AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution);
|
||||
static void UnclaimAllSessionConnections(List *sessionList);
|
||||
static bool UseConnectionPerPlacement(void);
|
||||
|
@ -697,7 +700,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
DistributedExecution *execution = CreateDistributedExecution(
|
||||
distributedPlan->modLevel,
|
||||
taskList,
|
||||
distributedPlan->hasReturning,
|
||||
distributedPlan->expectResults,
|
||||
paramListInfo,
|
||||
tupleDescriptor,
|
||||
scanState->tuplestorestate,
|
||||
|
@ -729,7 +732,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
RunDistributedExecution(execution);
|
||||
}
|
||||
|
||||
if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
|
||||
if (job->jobQuery->commandType != CMD_SELECT)
|
||||
{
|
||||
if (list_length(execution->localTaskList) == 0)
|
||||
{
|
||||
|
@ -756,7 +759,8 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
DoRepartitionCleanup(jobIdList);
|
||||
}
|
||||
|
||||
if (SortReturning && distributedPlan->hasReturning)
|
||||
if (SortReturning && distributedPlan->expectResults &&
|
||||
job->jobQuery->commandType != CMD_SELECT)
|
||||
{
|
||||
SortTupleStore(scanState);
|
||||
}
|
||||
|
@ -887,7 +891,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
|
|||
uint64
|
||||
ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
||||
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
|
||||
bool hasReturning)
|
||||
bool expectResults)
|
||||
{
|
||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||
bool localExecutionSupported = true;
|
||||
|
@ -897,7 +901,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
|||
|
||||
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
|
||||
modLevel, taskList, false);
|
||||
executionParams->hasReturning = hasReturning;
|
||||
executionParams->expectResults = expectResults;
|
||||
executionParams->tupleStore = tupleStore;
|
||||
executionParams->tupleDescriptor = tupleDescriptor;
|
||||
|
||||
|
@ -960,7 +964,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
|
|||
DistributedExecution *execution =
|
||||
CreateDistributedExecution(
|
||||
executionParams->modLevel, remoteTaskList,
|
||||
executionParams->hasReturning, paramListInfo,
|
||||
executionParams->expectResults, paramListInfo,
|
||||
executionParams->tupleDescriptor, executionParams->tupleStore,
|
||||
executionParams->targetPoolSize, &executionParams->xactProperties,
|
||||
executionParams->jobIdList);
|
||||
|
@ -991,7 +995,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
|
|||
|
||||
executionParams->tupleStore = NULL;
|
||||
executionParams->tupleDescriptor = NULL;
|
||||
executionParams->hasReturning = false;
|
||||
executionParams->expectResults = false;
|
||||
executionParams->isUtilityCommand = false;
|
||||
executionParams->jobIdList = NIL;
|
||||
|
||||
|
@ -1005,7 +1009,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
|
|||
*/
|
||||
static DistributedExecution *
|
||||
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
||||
bool hasReturning,
|
||||
bool expectResults,
|
||||
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
|
||||
Tuplestorestate *tupleStore, int targetPoolSize,
|
||||
TransactionProperties *xactProperties, List *jobIdList)
|
||||
|
@ -1015,7 +1019,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|||
|
||||
execution->modLevel = modLevel;
|
||||
execution->tasksToExecute = taskList;
|
||||
execution->hasReturning = hasReturning;
|
||||
execution->expectResults = expectResults;
|
||||
execution->transactionProperties = xactProperties;
|
||||
|
||||
execution->localTaskList = NIL;
|
||||
|
@ -1426,7 +1430,7 @@ ReadOnlyTask(TaskType taskType)
|
|||
{
|
||||
switch (taskType)
|
||||
{
|
||||
case SELECT_TASK:
|
||||
case READ_TASK:
|
||||
case MAP_OUTPUT_FETCH_TASK:
|
||||
case MAP_TASK:
|
||||
case MERGE_TASK:
|
||||
|
@ -1449,16 +1453,11 @@ ReadOnlyTask(TaskType taskType)
|
|||
|
||||
/*
|
||||
* SelectForUpdateOnReferenceTable returns true if the input task
|
||||
* that contains FOR UPDATE clause that locks any reference tables.
|
||||
* contains a FOR UPDATE clause that locks any reference tables.
|
||||
*/
|
||||
static bool
|
||||
SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList)
|
||||
SelectForUpdateOnReferenceTable(List *taskList)
|
||||
{
|
||||
if (modLevel != ROW_MODIFY_READONLY)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (list_length(taskList) != 1)
|
||||
{
|
||||
/* we currently do not support SELECT FOR UPDATE on multi task queries */
|
||||
|
@ -1529,7 +1528,7 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
|
|||
List *taskList = execution->tasksToExecute;
|
||||
|
||||
if (modLevel <= ROW_MODIFY_READONLY &&
|
||||
!SelectForUpdateOnReferenceTable(modLevel, taskList))
|
||||
!SelectForUpdateOnReferenceTable(taskList))
|
||||
{
|
||||
/*
|
||||
* Executor locks only apply to DML commands and SELECT FOR UPDATE queries
|
||||
|
@ -1686,7 +1685,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
|||
{
|
||||
RowModifyLevel modLevel = execution->modLevel;
|
||||
List *taskList = execution->tasksToExecute;
|
||||
bool hasReturning = execution->hasReturning;
|
||||
bool expectResults = execution->expectResults;
|
||||
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
|
||||
|
@ -1711,8 +1710,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
|||
shardCommandExecution->placementExecutionCount = placementExecutionCount;
|
||||
|
||||
shardCommandExecution->expectResults =
|
||||
(hasReturning && !task->partiallyLocalOrRemote) ||
|
||||
modLevel == ROW_MODIFY_READONLY;
|
||||
expectResults && !task->partiallyLocalOrRemote;
|
||||
|
||||
ShardPlacement *taskPlacement = NULL;
|
||||
foreach_ptr(taskPlacement, task->taskPlacementList)
|
||||
|
@ -1877,7 +1875,7 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
|
|||
{
|
||||
switch (task->taskType)
|
||||
{
|
||||
case SELECT_TASK:
|
||||
case READ_TASK:
|
||||
{
|
||||
return EXECUTION_ORDER_ANY;
|
||||
}
|
||||
|
|
|
@ -358,7 +358,7 @@ AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
|
|||
}
|
||||
|
||||
/*
|
||||
* If lock clause exists and it effects any reference table, we need to get
|
||||
* If lock clause exists and it affects any reference table, we need to get
|
||||
* lock on shard resource. Type of lock is determined by the type of row lock
|
||||
* given in the query. If the type of row lock is either FOR NO KEY UPDATE or
|
||||
* FOR UPDATE we get ExclusiveLock on shard resource. We get ShareLock if it
|
||||
|
|
|
@ -402,7 +402,7 @@ static Tuplestorestate *
|
|||
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||
bool errorOnAnyFailure)
|
||||
{
|
||||
bool hasReturning = true;
|
||||
bool expectResults = true;
|
||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||
bool randomAccess = true;
|
||||
bool interTransactions = false;
|
||||
|
@ -428,7 +428,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
|||
executionParams->tupleDescriptor = resultDescriptor;
|
||||
executionParams->tupleStore = resultStore;
|
||||
executionParams->xactProperties = xactProperties;
|
||||
executionParams->hasReturning = hasReturning;
|
||||
executionParams->expectResults = expectResults;
|
||||
|
||||
ExecuteTaskListExtended(executionParams);
|
||||
|
||||
|
@ -556,7 +556,7 @@ FragmentTransferTaskList(List *fragmentListTransfers)
|
|||
SetPlacementNodeMetadata(targetPlacement, workerNode);
|
||||
|
||||
Task *task = CitusMakeNode(Task);
|
||||
task->taskType = SELECT_TASK;
|
||||
task->taskType = READ_TASK;
|
||||
SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer));
|
||||
task->taskPlacementList = list_make1(targetPlacement);
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
||||
Oid targetRelationId = insertRte->relid;
|
||||
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
|
||||
bool hasReturning = distributedPlan->hasReturning;
|
||||
bool hasReturning = distributedPlan->expectResults;
|
||||
HTAB *shardStateHash = NULL;
|
||||
|
||||
/* select query to execute */
|
||||
|
|
|
@ -635,7 +635,7 @@ TopLevelTask(Task *task)
|
|||
* SQL tasks can only appear at the top level in our query tree. Further, no
|
||||
* other task type can appear at the top level in our tree.
|
||||
*/
|
||||
if (task->taskType == SELECT_TASK)
|
||||
if (task->taskType == READ_TASK)
|
||||
{
|
||||
topLevelTask = true;
|
||||
}
|
||||
|
@ -1063,7 +1063,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
|
|||
* We finally queue this task for execution. Note that we queue sql and
|
||||
* other tasks slightly differently.
|
||||
*/
|
||||
if (taskType == SELECT_TASK)
|
||||
if (taskType == READ_TASK)
|
||||
{
|
||||
TrackerQueueSqlTask(taskTracker, task);
|
||||
}
|
||||
|
@ -1253,7 +1253,7 @@ ManageTransmitExecution(TaskTracker *transmitTracker,
|
|||
TransmitExecStatus *transmitStatusArray = taskExecution->transmitStatusArray;
|
||||
TransmitExecStatus currentTransmitStatus = transmitStatusArray[currentNodeIndex];
|
||||
TransmitExecStatus nextTransmitStatus = EXEC_TRANSMIT_INVALID_FIRST;
|
||||
Assert(task->taskType == SELECT_TASK);
|
||||
Assert(task->taskType == READ_TASK);
|
||||
|
||||
switch (currentTransmitStatus)
|
||||
{
|
||||
|
@ -1852,7 +1852,7 @@ ConstrainedNonMergeTaskList(List *taskAndExecutionList, Task *task)
|
|||
List *dependentTaskList = NIL;
|
||||
|
||||
TaskType taskType = task->taskType;
|
||||
if (taskType == SELECT_TASK || taskType == MAP_TASK)
|
||||
if (taskType == READ_TASK || taskType == MAP_TASK)
|
||||
{
|
||||
upstreamTask = task;
|
||||
dependentTaskList = upstreamTask->dependentTaskList;
|
||||
|
@ -1928,7 +1928,7 @@ ConstrainedMergeTaskList(List *taskAndExecutionList, Task *task)
|
|||
* given task is a SQL or map task, we simply need to find its merge task
|
||||
* dependencies -- if any.
|
||||
*/
|
||||
if (taskType == SELECT_TASK || taskType == MAP_TASK)
|
||||
if (taskType == READ_TASK || taskType == MAP_TASK)
|
||||
{
|
||||
constrainedMergeTaskList = MergeTaskList(task->dependentTaskList);
|
||||
}
|
||||
|
@ -2008,7 +2008,7 @@ ReassignTaskList(List *taskList)
|
|||
TaskExecution *taskExecution = task->taskExecution;
|
||||
|
||||
bool transmitCompleted = TransmitExecutionCompleted(taskExecution);
|
||||
if ((task->taskType == SELECT_TASK) && transmitCompleted)
|
||||
if ((task->taskType == READ_TASK) && transmitCompleted)
|
||||
{
|
||||
completedTaskList = lappend(completedTaskList, task);
|
||||
}
|
||||
|
|
|
@ -574,17 +574,6 @@ IsUpdateOrDelete(Query *query)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsModifyDistributedPlan returns true if the multi plan performs modifications,
|
||||
* false otherwise.
|
||||
*/
|
||||
bool
|
||||
IsModifyDistributedPlan(DistributedPlan *distributedPlan)
|
||||
{
|
||||
return distributedPlan->modLevel > ROW_MODIFY_READONLY;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PlanFastPathDistributedStmt creates a distributed planned statement using
|
||||
* the FastPathPlanner.
|
||||
|
@ -805,7 +794,7 @@ InlineCtesAndCreateDistributedPlannedStmt(uint64 planId,
|
|||
/* after inlining, we shouldn't have any inlinable CTEs */
|
||||
Assert(!QueryTreeContainsInlinableCTE(copyOfOriginalQuery));
|
||||
|
||||
#if PG_VERSION_NUM < PG_VERSION_12
|
||||
#if PG_VERSION_NUM < PG_VERSION_12
|
||||
Query *query = planContext->query;
|
||||
|
||||
/*
|
||||
|
|
|
@ -367,7 +367,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
|||
ereport(DEBUG1, (errmsg("pushing down the function call")));
|
||||
|
||||
task = CitusMakeNode(Task);
|
||||
task->taskType = SELECT_TASK;
|
||||
task->taskType = READ_TASK;
|
||||
task->taskPlacementList = placementList;
|
||||
SetTaskQueryIfShouldLazyDeparse(task, planContext->query);
|
||||
task->anchorShardId = shardInterval->shardId;
|
||||
|
@ -382,7 +382,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
|||
distributedPlan->workerJob = job;
|
||||
distributedPlan->masterQuery = NULL;
|
||||
distributedPlan->routerExecutable = true;
|
||||
distributedPlan->hasReturning = false;
|
||||
distributedPlan->expectResults = true;
|
||||
|
||||
/* worker will take care of any necessary locking, treat query as read-only */
|
||||
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
||||
|
|
|
@ -297,14 +297,9 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
|
|||
distributedPlan->workerJob = workerJob;
|
||||
distributedPlan->masterQuery = NULL;
|
||||
distributedPlan->routerExecutable = true;
|
||||
distributedPlan->hasReturning = false;
|
||||
distributedPlan->expectResults = originalQuery->returningList != NIL;
|
||||
distributedPlan->targetRelationId = targetRelationId;
|
||||
|
||||
if (originalQuery->returningList != NIL)
|
||||
{
|
||||
distributedPlan->hasReturning = true;
|
||||
}
|
||||
|
||||
return distributedPlan;
|
||||
}
|
||||
|
||||
|
@ -1135,7 +1130,7 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
|
|||
}
|
||||
|
||||
distributedPlan->insertSelectQuery = insertSelectQuery;
|
||||
distributedPlan->hasReturning = insertSelectQuery->returningList != NIL;
|
||||
distributedPlan->expectResults = insertSelectQuery->returningList != NIL;
|
||||
distributedPlan->intermediateResultIdPrefix = InsertSelectResultIdPrefix(planId);
|
||||
distributedPlan->targetRelationId = targetRelationId;
|
||||
|
||||
|
|
|
@ -263,6 +263,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
|
|||
distributedPlan->masterQuery = masterQuery;
|
||||
distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan);
|
||||
distributedPlan->modLevel = ROW_MODIFY_READONLY;
|
||||
distributedPlan->expectResults = true;
|
||||
|
||||
return distributedPlan;
|
||||
}
|
||||
|
@ -2216,7 +2217,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
|||
sqlTaskList = QueryPushdownSqlTaskList(job->jobQuery, job->jobId,
|
||||
plannerRestrictionContext->
|
||||
relationRestrictionContext,
|
||||
prunedRelationShardList, SELECT_TASK,
|
||||
prunedRelationShardList, READ_TASK,
|
||||
false);
|
||||
}
|
||||
else
|
||||
|
@ -2249,7 +2250,7 @@ BuildJobTreeTaskList(Job *jobTree, PlannerRestrictionContext *plannerRestriction
|
|||
Task *assignedSqlTask = (Task *) lfirst(assignedSqlTaskCell);
|
||||
|
||||
/* we don't support parameters in the physical planner */
|
||||
if (assignedSqlTask->taskType == SELECT_TASK)
|
||||
if (assignedSqlTask->taskType == READ_TASK)
|
||||
{
|
||||
assignedSqlTask->parametersInQueryStringResolved =
|
||||
job->parametersInJobQueryResolved;
|
||||
|
@ -2679,7 +2680,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
|||
Task *subqueryTask = CreateBasicTask(jobId, taskId, taskType, NULL);
|
||||
|
||||
if ((taskType == MODIFY_TASK && !modifyRequiresMasterEvaluation) ||
|
||||
taskType == SELECT_TASK)
|
||||
taskType == READ_TASK)
|
||||
{
|
||||
pg_get_query_def(taskQuery, queryString);
|
||||
ereport(DEBUG4, (errmsg("distributed statement: %s",
|
||||
|
@ -2973,7 +2974,7 @@ SqlTaskList(Job *job)
|
|||
StringInfo sqlQueryString = makeStringInfo();
|
||||
pg_get_query_def(taskQuery, sqlQueryString);
|
||||
|
||||
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, SELECT_TASK,
|
||||
Task *sqlTask = CreateBasicTask(jobId, taskIdIndex, READ_TASK,
|
||||
sqlQueryString->data);
|
||||
sqlTask->dependentTaskList = dataFetchTaskList;
|
||||
sqlTask->relationShardList = BuildRelationShardList(fragmentRangeTableList,
|
||||
|
@ -5794,7 +5795,7 @@ AssignDataFetchDependencies(List *taskList)
|
|||
ListCell *dependentTaskCell = NULL;
|
||||
|
||||
Assert(task->taskPlacementList != NIL);
|
||||
Assert(task->taskType == SELECT_TASK || task->taskType == MERGE_TASK);
|
||||
Assert(task->taskType == READ_TASK || task->taskType == MERGE_TASK);
|
||||
|
||||
foreach(dependentTaskCell, dependentTaskList)
|
||||
{
|
||||
|
|
|
@ -118,7 +118,7 @@ bool EnableRouterExecution = true;
|
|||
|
||||
|
||||
/* planner functions forward declarations */
|
||||
static void CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan,
|
||||
static void CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan,
|
||||
Query *originalQuery,
|
||||
Query *query,
|
||||
PlannerRestrictionContext *
|
||||
|
@ -184,7 +184,7 @@ CreateRouterPlan(Query *originalQuery, Query *query,
|
|||
|
||||
if (distributedPlan->planningError == NULL)
|
||||
{
|
||||
CreateSingleTaskRouterPlan(distributedPlan, originalQuery, query,
|
||||
CreateSingleTaskRouterSelectPlan(distributedPlan, originalQuery, query,
|
||||
plannerRestrictionContext);
|
||||
}
|
||||
|
||||
|
@ -208,6 +208,8 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
|||
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
|
||||
bool multiShardQuery = false;
|
||||
|
||||
Assert(originalQuery->commandType != CMD_SELECT);
|
||||
|
||||
distributedPlan->modLevel = RowModifyLevelForQuery(query);
|
||||
|
||||
distributedPlan->planningError = ModifyQuerySupported(query, originalQuery,
|
||||
|
@ -238,14 +240,9 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
|||
distributedPlan->workerJob = job;
|
||||
distributedPlan->masterQuery = NULL;
|
||||
distributedPlan->routerExecutable = true;
|
||||
distributedPlan->hasReturning = false;
|
||||
distributedPlan->expectResults = originalQuery->returningList != NIL;
|
||||
distributedPlan->targetRelationId = ResultRelationOidForQuery(query);
|
||||
|
||||
if (list_length(originalQuery->returningList) > 0)
|
||||
{
|
||||
distributedPlan->hasReturning = true;
|
||||
}
|
||||
|
||||
distributedPlan->fastPathRouterPlan =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
||||
|
||||
|
@ -255,20 +252,19 @@ CreateModifyPlan(Query *originalQuery, Query *query,
|
|||
|
||||
|
||||
/*
|
||||
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
|
||||
* either a modify task that changes a single shard, or a router task that returns
|
||||
* query results from a single worker. Supported modify queries (insert/update/delete)
|
||||
* are router plannable by default. If query is not router plannable the returned plan
|
||||
* has planningError set to a description of the problem.
|
||||
* CreateSingleTaskRouterPlan creates a physical plan for given SELECT query.
|
||||
* The returned plan is a router task that returns query results from a single worker.
|
||||
* If not router plannable, the returned plan's planningError describes the problem.
|
||||
*/
|
||||
static void
|
||||
CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery,
|
||||
CreateSingleTaskRouterSelectPlan(DistributedPlan *distributedPlan, Query *originalQuery,
|
||||
Query *query,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
Assert(query->commandType == CMD_SELECT);
|
||||
|
||||
distributedPlan->modLevel = RowModifyLevelForQuery(query);
|
||||
|
||||
/* we cannot have multi shard update/delete query via this code path */
|
||||
Job *job = RouterJob(originalQuery, plannerRestrictionContext,
|
||||
&distributedPlan->planningError);
|
||||
|
||||
|
@ -283,7 +279,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer
|
|||
distributedPlan->workerJob = job;
|
||||
distributedPlan->masterQuery = NULL;
|
||||
distributedPlan->routerExecutable = true;
|
||||
distributedPlan->hasReturning = false;
|
||||
distributedPlan->expectResults = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1414,6 +1410,8 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
|||
static Job *
|
||||
RouterInsertJob(Query *originalQuery)
|
||||
{
|
||||
Assert(originalQuery->commandType == CMD_INSERT);
|
||||
|
||||
bool isMultiRowInsert = IsMultiRowInsert(originalQuery);
|
||||
if (isMultiRowInsert)
|
||||
{
|
||||
|
@ -1809,7 +1807,7 @@ SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
List *placementList, uint64 shardId,
|
||||
bool parametersInQueryResolved)
|
||||
{
|
||||
Task *task = CreateTask(SELECT_TASK);
|
||||
Task *task = CreateTask(READ_TASK);
|
||||
List *relationRowLockList = NIL;
|
||||
|
||||
RowLocksOnRelations((Node *) query, &relationRowLockList);
|
||||
|
@ -3116,7 +3114,7 @@ ExtractInsertPartitionKeyValue(Query *query)
|
|||
/*
|
||||
* MultiRouterPlannableQuery checks if given select query is router plannable,
|
||||
* setting distributedPlan->planningError if not.
|
||||
* The query is router plannable if it is a modify query, or if its is a select
|
||||
* The query is router plannable if it is a modify query, or if it is a select
|
||||
* query issued on a hash partitioned distributed table. Router plannable checks
|
||||
* for select queries can be turned off by setting citus.enable_router_execution
|
||||
* flag to false.
|
||||
|
@ -3177,7 +3175,7 @@ MultiRouterPlannableQuery(Query *query)
|
|||
|
||||
/*
|
||||
* Currently, we don't support tables with replication factor > 1,
|
||||
* except reference tables with SELECT ... FOR UDPATE queries. It is
|
||||
* except reference tables with SELECT ... FOR UPDATE queries. It is
|
||||
* also not supported from MX nodes.
|
||||
*/
|
||||
if (query->hasForUpdate)
|
||||
|
|
|
@ -288,7 +288,7 @@ RecordParallelRelationAccessForTaskList(List *taskList)
|
|||
*/
|
||||
Task *firstTask = linitial(taskList);
|
||||
|
||||
if (firstTask->taskType == SELECT_TASK)
|
||||
if (firstTask->taskType == READ_TASK)
|
||||
{
|
||||
RecordRelationParallelSelectAccessForTask(firstTask);
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ ExecuteMasterEvaluableFunctionsAndParameters(Query *query, PlanState *planState)
|
|||
|
||||
|
||||
/*
|
||||
* ExecuteMasterEvaluableParameters evaluates external paramaters that can be
|
||||
* ExecuteMasterEvaluableParameters evaluates external parameters that can be
|
||||
* resolved to a constant.
|
||||
*/
|
||||
void
|
||||
|
@ -87,7 +87,7 @@ ExecuteMasterEvaluableParameters(Query *query, PlanState *planState)
|
|||
|
||||
|
||||
/*
|
||||
* PartiallyEvaluateExpression descend into an expression tree to evaluate
|
||||
* PartiallyEvaluateExpression descends into an expression tree to evaluate
|
||||
* expressions that can be resolved to a constant on the master. Expressions
|
||||
* containing a Var are skipped, since the value of the Var is not known
|
||||
* on the master.
|
||||
|
|
|
@ -120,7 +120,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
|
|||
|
||||
COPY_SCALAR_FIELD(planId);
|
||||
COPY_SCALAR_FIELD(modLevel);
|
||||
COPY_SCALAR_FIELD(hasReturning);
|
||||
COPY_SCALAR_FIELD(expectResults);
|
||||
COPY_SCALAR_FIELD(routerExecutable);
|
||||
|
||||
COPY_NODE_FIELD(workerJob);
|
||||
|
|
|
@ -184,7 +184,7 @@ OutDistributedPlan(OUTFUNC_ARGS)
|
|||
|
||||
WRITE_UINT64_FIELD(planId);
|
||||
WRITE_ENUM_FIELD(modLevel, RowModifyLevel);
|
||||
WRITE_BOOL_FIELD(hasReturning);
|
||||
WRITE_BOOL_FIELD(expectResults);
|
||||
WRITE_BOOL_FIELD(routerExecutable);
|
||||
|
||||
WRITE_NODE_FIELD(workerJob);
|
||||
|
|
|
@ -198,7 +198,6 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
|
|||
JoinPathExtraData *extra);
|
||||
extern bool HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams);
|
||||
extern bool IsModifyCommand(Query *query);
|
||||
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
|
||||
extern void EnsurePartitionTableNotReplicated(Oid relationId);
|
||||
extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
|
||||
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
|
||||
|
|
|
@ -93,8 +93,8 @@ typedef struct ExecutionParams
|
|||
/* tupleStore is where the results will be stored for this execution */
|
||||
Tuplestorestate *tupleStore;
|
||||
|
||||
/* hasReturning is true if this execution will return some result. */
|
||||
bool hasReturning;
|
||||
/* expectResults is true if this execution will return some result. */
|
||||
bool expectResults;
|
||||
|
||||
/* targetPoolSize is the maximum amount of connections per worker */
|
||||
int targetPoolSize;
|
||||
|
|
|
@ -81,7 +81,7 @@ typedef enum
|
|||
typedef enum
|
||||
{
|
||||
TASK_TYPE_INVALID_FIRST,
|
||||
SELECT_TASK,
|
||||
READ_TASK,
|
||||
MAP_TASK,
|
||||
MERGE_TASK,
|
||||
MAP_OUTPUT_FETCH_TASK,
|
||||
|
@ -364,8 +364,11 @@ typedef struct DistributedPlan
|
|||
/* specifies nature of modifications in query */
|
||||
RowModifyLevel modLevel;
|
||||
|
||||
/* specifies whether a DML command has a RETURNING */
|
||||
bool hasReturning;
|
||||
/*
|
||||
* specifies whether plan returns results,
|
||||
* either as a SELECT or a DML which has RETURNING.
|
||||
*/
|
||||
bool expectResults;
|
||||
|
||||
/* a router executable query is executed entirely on a worker */
|
||||
bool routerExecutable;
|
||||
|
|
Loading…
Reference in New Issue