|
|
|
@ -186,11 +186,14 @@ typedef struct DistributedExecution
|
|
|
|
|
RowModifyLevel modLevel;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* tasksToExecute contains all the tasks required to finish the execution, and
|
|
|
|
|
* it is the union of remoteTaskList and localTaskList. After (if any) local
|
|
|
|
|
* tasks are executed, remoteTaskList becomes equivalent of tasksToExecute.
|
|
|
|
|
* remoteAndLocalTaskList contains all the tasks required to finish the
|
|
|
|
|
* execution. remoteTaskList contains all the tasks required to
|
|
|
|
|
* finish the remote execution. localTaskList contains all the
|
|
|
|
|
* local tasks required to finish the local execution.
|
|
|
|
|
*
|
|
|
|
|
* remoteAndLocalTaskList is the union of remoteTaskList and localTaskList.
|
|
|
|
|
*/
|
|
|
|
|
List *tasksToExecute;
|
|
|
|
|
List *remoteAndLocalTaskList;
|
|
|
|
|
List *remoteTaskList;
|
|
|
|
|
List *localTaskList;
|
|
|
|
|
|
|
|
|
@ -568,7 +571,8 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
|
|
|
|
|
defaultTupleDest,
|
|
|
|
|
TransactionProperties *
|
|
|
|
|
xactProperties,
|
|
|
|
|
List *jobIdList);
|
|
|
|
|
List *jobIdList,
|
|
|
|
|
bool localExecutionSupported);
|
|
|
|
|
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
|
|
|
|
|
modLevel,
|
|
|
|
|
List *taskList,
|
|
|
|
@ -585,8 +589,6 @@ static void CleanUpSessions(DistributedExecution *execution);
|
|
|
|
|
|
|
|
|
|
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
|
|
|
|
|
static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
|
|
|
|
|
static void AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *
|
|
|
|
|
execution);
|
|
|
|
|
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
|
|
|
|
|
static bool IsMultiShardModification(RowModifyLevel modLevel, List *taskList);
|
|
|
|
|
static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
|
|
|
|
@ -746,7 +748,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|
|
|
|
distributedPlan->modLevel, taskList,
|
|
|
|
|
hasDependentJobs);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool localExecutionSupported = true;
|
|
|
|
|
DistributedExecution *execution = CreateDistributedExecution(
|
|
|
|
|
distributedPlan->modLevel,
|
|
|
|
|
taskList,
|
|
|
|
@ -754,7 +756,8 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|
|
|
|
targetPoolSize,
|
|
|
|
|
defaultTupleDest,
|
|
|
|
|
&xactProperties,
|
|
|
|
|
jobIdList);
|
|
|
|
|
jobIdList,
|
|
|
|
|
localExecutionSupported);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Make sure that we acquire the appropriate locks even if the local tasks
|
|
|
|
@ -762,16 +765,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|
|
|
|
*/
|
|
|
|
|
StartDistributedExecution(execution);
|
|
|
|
|
|
|
|
|
|
/* execute tasks local to the node (if any) */
|
|
|
|
|
if (list_length(execution->localTaskList) > 0)
|
|
|
|
|
{
|
|
|
|
|
RunLocalExecution(scanState, execution);
|
|
|
|
|
|
|
|
|
|
/* make sure that we only execute remoteTaskList afterwards */
|
|
|
|
|
AdjustDistributedExecutionAfterLocalExecution(execution);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ShouldRunTasksSequentially(execution->tasksToExecute))
|
|
|
|
|
if (ShouldRunTasksSequentially(execution->remoteTaskList))
|
|
|
|
|
{
|
|
|
|
|
SequentialRunDistributedExecution(execution);
|
|
|
|
|
}
|
|
|
|
@ -780,24 +774,17 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|
|
|
|
RunDistributedExecution(execution);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (job->jobQuery->commandType != CMD_SELECT)
|
|
|
|
|
/* execute tasks local to the node (if any) */
|
|
|
|
|
if (list_length(execution->localTaskList) > 0)
|
|
|
|
|
{
|
|
|
|
|
if (list_length(execution->localTaskList) == 0)
|
|
|
|
|
{
|
|
|
|
|
Assert(executorState->es_processed == 0);
|
|
|
|
|
/* now execute the local tasks */
|
|
|
|
|
RunLocalExecution(scanState, execution);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
executorState->es_processed = execution->rowsProcessed;
|
|
|
|
|
}
|
|
|
|
|
else if (distributedPlan->targetRelationId != InvalidOid &&
|
|
|
|
|
!IsCitusTableType(distributedPlan->targetRelationId, REFERENCE_TABLE))
|
|
|
|
|
CmdType commandType = job->jobQuery->commandType;
|
|
|
|
|
if (commandType != CMD_SELECT)
|
|
|
|
|
{
|
|
|
|
|
/*
|
|
|
|
|
* For reference tables we already add rowsProcessed on the local execution,
|
|
|
|
|
* this is required to ensure that mixed local/remote executions reports
|
|
|
|
|
* the accurate number of rowsProcessed to the user.
|
|
|
|
|
*/
|
|
|
|
|
executorState->es_processed += execution->rowsProcessed;
|
|
|
|
|
}
|
|
|
|
|
executorState->es_processed = execution->rowsProcessed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
FinishDistributedExecution(execution);
|
|
|
|
@ -807,8 +794,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|
|
|
|
DoRepartitionCleanup(jobIdList);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (SortReturning && distributedPlan->expectResults &&
|
|
|
|
|
job->jobQuery->commandType != CMD_SELECT)
|
|
|
|
|
if (SortReturning && distributedPlan->expectResults && commandType != CMD_SELECT)
|
|
|
|
|
{
|
|
|
|
|
SortTupleStore(scanState);
|
|
|
|
|
}
|
|
|
|
@ -845,29 +831,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
|
|
|
|
|
execution->defaultTupleDest,
|
|
|
|
|
isUtilityCommand);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We're deliberately not setting execution->rowsProcessed here. The main reason
|
|
|
|
|
* is that modifications to reference tables would end-up setting it both here
|
|
|
|
|
* and in AdaptiveExecutor. Instead, we set executorState here and skip updating it
|
|
|
|
|
* for reference table modifications in AdaptiveExecutor.
|
|
|
|
|
*/
|
|
|
|
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
|
|
|
|
executorState->es_processed = rowsProcessed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* AdjustDistributedExecutionAfterLocalExecution simply updates the necessary fields of
|
|
|
|
|
* the distributed execution.
|
|
|
|
|
*/
|
|
|
|
|
static void
|
|
|
|
|
AdjustDistributedExecutionAfterLocalExecution(DistributedExecution *execution)
|
|
|
|
|
{
|
|
|
|
|
/* we only need to execute the remote tasks */
|
|
|
|
|
execution->tasksToExecute = execution->remoteTaskList;
|
|
|
|
|
|
|
|
|
|
execution->totalTaskCount = list_length(execution->remoteTaskList);
|
|
|
|
|
execution->unfinishedTaskCount = list_length(execution->remoteTaskList);
|
|
|
|
|
execution->rowsProcessed += rowsProcessed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -971,45 +935,9 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
|
|
|
|
|
{
|
|
|
|
|
ParamListInfo paramListInfo = NULL;
|
|
|
|
|
uint64 locallyProcessedRows = 0;
|
|
|
|
|
List *localTaskList = NIL;
|
|
|
|
|
List *remoteTaskList = NIL;
|
|
|
|
|
|
|
|
|
|
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
|
|
|
|
|
|
|
|
|
|
if (executionParams->localExecutionSupported && ShouldExecuteTasksLocally(
|
|
|
|
|
executionParams->taskList))
|
|
|
|
|
{
|
|
|
|
|
bool readOnlyPlan = false;
|
|
|
|
|
ExtractLocalAndRemoteTasks(readOnlyPlan, executionParams->taskList,
|
|
|
|
|
&localTaskList,
|
|
|
|
|
&remoteTaskList);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
remoteTaskList = executionParams->taskList;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If current transaction accessed local placements and task list includes
|
|
|
|
|
* tasks that should be executed locally (accessing any of the local placements),
|
|
|
|
|
* then we should error out as it would cause inconsistencies across the
|
|
|
|
|
* remote connection and local execution.
|
|
|
|
|
*/
|
|
|
|
|
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
|
|
|
|
|
AnyTaskAccessesLocalNode(remoteTaskList))
|
|
|
|
|
{
|
|
|
|
|
ErrorIfTransactionAccessedPlacementsLocally();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (executionParams->isUtilityCommand)
|
|
|
|
|
{
|
|
|
|
|
locallyProcessedRows += ExecuteLocalUtilityTaskList(localTaskList);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
locallyProcessedRows += ExecuteLocalTaskList(localTaskList, defaultTupleDest);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
|
|
|
|
{
|
|
|
|
|
executionParams->targetPoolSize = 1;
|
|
|
|
@ -1017,15 +945,40 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
|
|
|
|
|
|
|
|
|
|
DistributedExecution *execution =
|
|
|
|
|
CreateDistributedExecution(
|
|
|
|
|
executionParams->modLevel, remoteTaskList,
|
|
|
|
|
executionParams->modLevel, executionParams->taskList,
|
|
|
|
|
paramListInfo, executionParams->targetPoolSize,
|
|
|
|
|
defaultTupleDest, &executionParams->xactProperties,
|
|
|
|
|
executionParams->jobIdList);
|
|
|
|
|
executionParams->jobIdList, executionParams->localExecutionSupported);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If current transaction accessed local placements and task list includes
|
|
|
|
|
* tasks that should be executed locally (accessing any of the local placements),
|
|
|
|
|
* then we should error out as it would cause inconsistencies across the
|
|
|
|
|
* remote connection and local execution.
|
|
|
|
|
*/
|
|
|
|
|
List *remoteTaskList = execution->remoteTaskList;
|
|
|
|
|
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
|
|
|
|
|
AnyTaskAccessesLocalNode(remoteTaskList))
|
|
|
|
|
{
|
|
|
|
|
ErrorIfTransactionAccessedPlacementsLocally();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* run the remote execution */
|
|
|
|
|
StartDistributedExecution(execution);
|
|
|
|
|
RunDistributedExecution(execution);
|
|
|
|
|
FinishDistributedExecution(execution);
|
|
|
|
|
|
|
|
|
|
/* now, switch back to the local execution */
|
|
|
|
|
if (executionParams->isUtilityCommand)
|
|
|
|
|
{
|
|
|
|
|
locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList,
|
|
|
|
|
defaultTupleDest);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return execution->rowsProcessed + locallyProcessedRows;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1064,15 +1017,16 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|
|
|
|
ParamListInfo paramListInfo,
|
|
|
|
|
int targetPoolSize, TupleDestination *defaultTupleDest,
|
|
|
|
|
TransactionProperties *xactProperties,
|
|
|
|
|
List *jobIdList)
|
|
|
|
|
List *jobIdList, bool localExecutionSupported)
|
|
|
|
|
{
|
|
|
|
|
DistributedExecution *execution =
|
|
|
|
|
(DistributedExecution *) palloc0(sizeof(DistributedExecution));
|
|
|
|
|
|
|
|
|
|
execution->modLevel = modLevel;
|
|
|
|
|
execution->tasksToExecute = taskList;
|
|
|
|
|
execution->remoteAndLocalTaskList = taskList;
|
|
|
|
|
execution->transactionProperties = xactProperties;
|
|
|
|
|
|
|
|
|
|
/* we are going to calculate this values below */
|
|
|
|
|
execution->localTaskList = NIL;
|
|
|
|
|
execution->remoteTaskList = NIL;
|
|
|
|
|
|
|
|
|
@ -1082,8 +1036,6 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|
|
|
|
execution->targetPoolSize = targetPoolSize;
|
|
|
|
|
execution->defaultTupleDest = defaultTupleDest;
|
|
|
|
|
|
|
|
|
|
execution->totalTaskCount = list_length(taskList);
|
|
|
|
|
execution->unfinishedTaskCount = list_length(taskList);
|
|
|
|
|
execution->rowsProcessed = 0;
|
|
|
|
|
|
|
|
|
|
execution->raiseInterrupts = true;
|
|
|
|
@ -1121,13 +1073,19 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ShouldExecuteTasksLocally(taskList))
|
|
|
|
|
if (localExecutionSupported && ShouldExecuteTasksLocally(taskList))
|
|
|
|
|
{
|
|
|
|
|
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
|
|
|
|
|
|
|
|
|
|
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &execution->localTaskList,
|
|
|
|
|
&execution->remoteTaskList);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
execution->remoteTaskList = execution->remoteAndLocalTaskList;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
execution->totalTaskCount = list_length(execution->remoteTaskList);
|
|
|
|
|
execution->unfinishedTaskCount = list_length(execution->remoteTaskList);
|
|
|
|
|
|
|
|
|
|
return execution;
|
|
|
|
|
}
|
|
|
|
@ -1277,7 +1235,12 @@ StartDistributedExecution(DistributedExecution *execution)
|
|
|
|
|
*/
|
|
|
|
|
if (execution->targetPoolSize > 1)
|
|
|
|
|
{
|
|
|
|
|
RecordParallelRelationAccessForTaskList(execution->tasksToExecute);
|
|
|
|
|
/*
|
|
|
|
|
* Record the access for both the local and remote tasks. The main goal
|
|
|
|
|
* is to make sure that Citus behaves consistently even if the local
|
|
|
|
|
* shards are moved away.
|
|
|
|
|
*/
|
|
|
|
|
RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1289,7 +1252,8 @@ StartDistributedExecution(DistributedExecution *execution)
|
|
|
|
|
static bool
|
|
|
|
|
DistributedExecutionModifiesDatabase(DistributedExecution *execution)
|
|
|
|
|
{
|
|
|
|
|
return TaskListModifiesDatabase(execution->modLevel, execution->tasksToExecute);
|
|
|
|
|
return TaskListModifiesDatabase(execution->modLevel,
|
|
|
|
|
execution->remoteAndLocalTaskList);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -1581,7 +1545,9 @@ static void
|
|
|
|
|
AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
|
|
|
|
|
{
|
|
|
|
|
RowModifyLevel modLevel = execution->modLevel;
|
|
|
|
|
List *taskList = execution->tasksToExecute;
|
|
|
|
|
|
|
|
|
|
/* acquire the locks for both the remote and local tasks */
|
|
|
|
|
List *taskList = execution->remoteAndLocalTaskList;
|
|
|
|
|
|
|
|
|
|
if (modLevel <= ROW_MODIFY_READONLY &&
|
|
|
|
|
!SelectForUpdateOnReferenceTable(taskList))
|
|
|
|
@ -1741,7 +1707,7 @@ static void
|
|
|
|
|
AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
|
|
|
|
{
|
|
|
|
|
RowModifyLevel modLevel = execution->modLevel;
|
|
|
|
|
List *taskList = execution->tasksToExecute;
|
|
|
|
|
List *taskList = execution->remoteTaskList;
|
|
|
|
|
|
|
|
|
|
int32 localGroupId = GetLocalGroupId();
|
|
|
|
|
|
|
|
|
@ -2169,7 +2135,7 @@ ShouldRunTasksSequentially(List *taskList)
|
|
|
|
|
static void
|
|
|
|
|
SequentialRunDistributedExecution(DistributedExecution *execution)
|
|
|
|
|
{
|
|
|
|
|
List *taskList = execution->tasksToExecute;
|
|
|
|
|
List *taskList = execution->remoteTaskList;
|
|
|
|
|
int connectionMode = MultiShardConnectionType;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -2182,7 +2148,8 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
|
|
|
|
|
foreach_ptr(taskToExecute, taskList)
|
|
|
|
|
{
|
|
|
|
|
/* execute each task one by one */
|
|
|
|
|
execution->tasksToExecute = list_make1(taskToExecute);
|
|
|
|
|
execution->remoteAndLocalTaskList = list_make1(taskToExecute);
|
|
|
|
|
execution->remoteTaskList = execution->remoteAndLocalTaskList;
|
|
|
|
|
execution->totalTaskCount = 1;
|
|
|
|
|
execution->unfinishedTaskCount = 1;
|
|
|
|
|
|
|
|
|
@ -3329,7 +3296,12 @@ TransactionStateMachine(WorkerSession *session)
|
|
|
|
|
}
|
|
|
|
|
else if (task->partiallyLocalOrRemote)
|
|
|
|
|
{
|
|
|
|
|
/* already received results from local execution */
|
|
|
|
|
/*
|
|
|
|
|
* For the tasks that involves placements from both
|
|
|
|
|
* remote and local placments, such as modifications
|
|
|
|
|
* to reference tables, we store the rows during the
|
|
|
|
|
* local placement/execution.
|
|
|
|
|
*/
|
|
|
|
|
storeRows = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3723,11 +3695,10 @@ ReceiveResults(WorkerSession *session, bool storeRows)
|
|
|
|
|
int64 currentAffectedTupleCount = 0;
|
|
|
|
|
|
|
|
|
|
/* if there are multiple replicas, make sure to consider only one */
|
|
|
|
|
if (!shardCommandExecution->gotResults && *currentAffectedTupleString != '\0')
|
|
|
|
|
if (storeRows && *currentAffectedTupleString != '\0')
|
|
|
|
|
{
|
|
|
|
|
scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount);
|
|
|
|
|
Assert(currentAffectedTupleCount >= 0);
|
|
|
|
|
|
|
|
|
|
execution->rowsProcessed += currentAffectedTupleCount;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|