Merge pull request #2807 from citusdata/2776_preparation

DistributedPlan: replace operation with modLevel
pull/2829/head
Hadi Moshayedi 2019-07-16 14:04:10 -07:00 committed by GitHub
commit 86b30ee094
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 224 additions and 215 deletions

View File

@ -158,8 +158,8 @@
*/ */
typedef struct DistributedExecution typedef struct DistributedExecution
{ {
/* the corresponding distributed plan's operation */ /* the corresponding distributed plan's modLevel */
CmdType operation; RowModifyLevel modLevel;
List *tasksToExecute; List *tasksToExecute;
@ -504,7 +504,7 @@ int ExecutorSlowStartInterval = 10;
/* local functions */ /* local functions */
static DistributedExecution * CreateDistributedExecution(CmdType operation, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList, List *taskList,
bool hasReturning, bool hasReturning,
ParamListInfo paramListInfo, ParamListInfo paramListInfo,
@ -520,16 +520,16 @@ static void FinishDistributedExecution(DistributedExecution *execution);
static void CleanUpSessions(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution);
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
static void AcquireExecutorShardLocks(DistributedExecution *execution); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static bool DistributedPlanModifiesDatabase(DistributedPlan *plan); static bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
static bool TaskListModifiesDatabase(CmdType operation, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool SelectForUpdateOnReferenceTable(CmdType operation, List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution); static void AssignTasksToConnections(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList); static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(CmdType operation, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
WorkerNode *workerNode); WorkerNode *workerNode);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
@ -610,7 +610,7 @@ AdaptiveExecutor(CustomScanState *node)
targetPoolSize = 1; targetPoolSize = 1;
} }
execution = CreateDistributedExecution(distributedPlan->operation, taskList, execution = CreateDistributedExecution(distributedPlan->modLevel, taskList,
distributedPlan->hasReturning, distributedPlan->hasReturning,
paramListInfo, tupleDescriptor, paramListInfo, tupleDescriptor,
tupleStore, targetPoolSize); tupleStore, targetPoolSize);
@ -626,7 +626,7 @@ AdaptiveExecutor(CustomScanState *node)
RunDistributedExecution(execution); RunDistributedExecution(execution);
} }
if (distributedPlan->operation != CMD_SELECT) if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
{ {
executorState->es_processed = execution->rowsProcessed; executorState->es_processed = execution->rowsProcessed;
} }
@ -654,14 +654,14 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
{ {
if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE) if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE)
{ {
ExecuteTaskList(CMD_UTILITY, taskList, targetPoolSize); ExecuteTaskList(ROW_MODIFY_NONE, taskList, targetPoolSize);
} }
else else
{ {
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION || if (MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
forceSequentialExecution) forceSequentialExecution)
{ {
ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY); ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE);
} }
else else
{ {
@ -676,13 +676,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
* for some of the arguments. * for some of the arguments.
*/ */
uint64 uint64
ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL; Tuplestorestate *tupleStore = NULL;
bool hasReturning = false; bool hasReturning = false;
return ExecuteTaskListExtended(operation, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize); tupleStore, hasReturning, targetPoolSize);
} }
@ -692,7 +692,7 @@ ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize)
* runs it. * runs it.
*/ */
uint64 uint64
ExecuteTaskListExtended(CmdType operation, List *taskList, ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize) bool hasReturning, int targetPoolSize)
{ {
@ -705,7 +705,7 @@ ExecuteTaskListExtended(CmdType operation, List *taskList,
} }
execution = execution =
CreateDistributedExecution(operation, taskList, hasReturning, paramListInfo, CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize); tupleDescriptor, tupleStore, targetPoolSize);
StartDistributedExecution(execution); StartDistributedExecution(execution);
@ -721,14 +721,14 @@ ExecuteTaskListExtended(CmdType operation, List *taskList,
* a distributed plan. * a distributed plan.
*/ */
DistributedExecution * DistributedExecution *
CreateDistributedExecution(CmdType operation, List *taskList, bool hasReturning, CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize) Tuplestorestate *tupleStore, int targetPoolSize)
{ {
DistributedExecution *execution = DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution)); (DistributedExecution *) palloc0(sizeof(DistributedExecution));
execution->operation = operation; execution->modLevel = modLevel;
execution->tasksToExecute = taskList; execution->tasksToExecute = taskList;
execution->hasReturning = hasReturning; execution->hasReturning = hasReturning;
@ -823,7 +823,7 @@ StartDistributedExecution(DistributedExecution *execution)
} }
/* prevent unsafe concurrent modifications */ /* prevent unsafe concurrent modifications */
AcquireExecutorShardLocks(execution); AcquireExecutorShardLocksForExecution(execution);
} }
@ -834,7 +834,7 @@ StartDistributedExecution(DistributedExecution *execution)
static bool static bool
DistributedExecutionModifiesDatabase(DistributedExecution *execution) DistributedExecutionModifiesDatabase(DistributedExecution *execution)
{ {
return TaskListModifiesDatabase(execution->operation, execution->tasksToExecute); return TaskListModifiesDatabase(execution->modLevel, execution->tasksToExecute);
} }
@ -845,7 +845,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution)
static bool static bool
DistributedPlanModifiesDatabase(DistributedPlan *plan) DistributedPlanModifiesDatabase(DistributedPlan *plan)
{ {
return TaskListModifiesDatabase(plan->operation, plan->workerJob->taskList); return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList);
} }
@ -854,22 +854,22 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan)
* DistributedPlanModifiesDatabase. * DistributedPlanModifiesDatabase.
*/ */
static bool static bool
TaskListModifiesDatabase(CmdType operation, List *taskList) TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList)
{ {
Task *firstTask = NULL; Task *firstTask = NULL;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) if (modLevel > ROW_MODIFY_READONLY)
{ {
return true; return true;
} }
/* /*
* If we cannot decide by only checking the operation, we should look closer * If we cannot decide by only checking the row modify level,
* to the tasks. * we should look closer to the tasks.
*/ */
if (list_length(taskList) < 1) if (list_length(taskList) < 1)
{ {
/* does this ever possible? */ /* is this ever possible? */
return false; return false;
} }
@ -968,12 +968,12 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution)
* that contains FOR UPDATE clause that locks any reference tables. * that contains FOR UPDATE clause that locks any reference tables.
*/ */
static bool static bool
SelectForUpdateOnReferenceTable(CmdType operation, List *taskList) SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList)
{ {
Task *task = NULL; Task *task = NULL;
ListCell *rtiLockCell = NULL; ListCell *rtiLockCell = NULL;
if (operation != CMD_SELECT) if (modLevel != ROW_MODIFY_READONLY)
{ {
return false; return false;
} }
@ -1025,8 +1025,8 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
/* /*
* AcquireExecutorShardLocks acquires advisory lock on shard IDs to prevent * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs
* unsafe concurrent modifications of shards. * to prevent unsafe concurrent modifications of shards.
* *
* We prevent concurrent modifications of shards in two cases: * We prevent concurrent modifications of shards in two cases:
* 1. Any non-commutative writes to a replicated table * 1. Any non-commutative writes to a replicated table
@ -1042,13 +1042,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
* TRUNCATE because the table locks already prevent concurrent access. * TRUNCATE because the table locks already prevent concurrent access.
*/ */
static void static void
AcquireExecutorShardLocks(DistributedExecution *execution) AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
{ {
CmdType operation = execution->operation; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
if (!(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE || if (modLevel <= ROW_MODIFY_READONLY &&
SelectForUpdateOnReferenceTable(operation, taskList))) !SelectForUpdateOnReferenceTable(modLevel, taskList))
{ {
/* /*
* Executor locks only apply to DML commands and SELECT FOR UPDATE queries * Executor locks only apply to DML commands and SELECT FOR UPDATE queries
@ -1069,7 +1069,7 @@ AcquireExecutorShardLocks(DistributedExecution *execution)
{ {
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLocks(task, modLevel);
} }
} }
else if (list_length(taskList) > 1) else if (list_length(taskList) > 1)
@ -1211,7 +1211,7 @@ UnclaimAllSessionConnections(List *sessionList)
static void static void
AssignTasksToConnections(DistributedExecution *execution) AssignTasksToConnections(DistributedExecution *execution)
{ {
CmdType operation = execution->operation; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
bool hasReturning = execution->hasReturning; bool hasReturning = execution->hasReturning;
@ -1233,13 +1233,15 @@ AssignTasksToConnections(DistributedExecution *execution)
shardCommandExecution = shardCommandExecution =
(ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution)); (ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution));
shardCommandExecution->task = task; shardCommandExecution->task = task;
shardCommandExecution->executionOrder = ExecutionOrderForTask(operation, task); shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task);
shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED;
shardCommandExecution->placementExecutions = shardCommandExecution->placementExecutions =
(TaskPlacementExecution **) palloc0(placementExecutionCount * (TaskPlacementExecution **) palloc0(placementExecutionCount *
sizeof(TaskPlacementExecution *)); sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount; shardCommandExecution->placementExecutionCount = placementExecutionCount;
shardCommandExecution->expectResults = hasReturning || operation == CMD_SELECT;
shardCommandExecution->expectResults = hasReturning ||
modLevel == ROW_MODIFY_READONLY;
foreach(taskPlacementCell, task->taskPlacementList) foreach(taskPlacementCell, task->taskPlacementList)
@ -1390,7 +1392,7 @@ UseConnectionPerPlacement(void)
* ExecutionOrderForTask gives the appropriate execution order for a task. * ExecutionOrderForTask gives the appropriate execution order for a task.
*/ */
static PlacementExecutionOrder static PlacementExecutionOrder
ExecutionOrderForTask(CmdType operation, Task *task) ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
{ {
switch (task->taskType) switch (task->taskType)
{ {
@ -1402,7 +1404,14 @@ ExecutionOrderForTask(CmdType operation, Task *task)
case MODIFY_TASK: case MODIFY_TASK:
{ {
if (operation == CMD_INSERT && !task->upsertQuery) /*
* For non-commutative modifications we take aggressive locks, so
* there is no risk of deadlock and we can run them in parallel.
* When the modification is commutative, we take no additional
* locks, so we take a conservative approach and execute sequentially
* to avoid deadlocks.
*/
if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{ {
return EXECUTION_ORDER_SEQUENTIAL; return EXECUTION_ORDER_SEQUENTIAL;
} }

View File

@ -163,7 +163,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
scanState = (CitusScanState *) node; scanState = (CitusScanState *) node;
distributedPlan = scanState->distributedPlan; distributedPlan = scanState->distributedPlan;
if (distributedPlan->operation == CMD_SELECT || if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectSubquery != NULL) distributedPlan->insertSelectSubquery != NULL)
{ {
/* no more action required */ /* no more action required */

View File

@ -133,7 +133,8 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
ExecuteModifyTasksSequentially(scanState, prunedTaskList, ExecuteModifyTasksSequentially(scanState, prunedTaskList,
CMD_INSERT, hasReturning); ROW_MODIFY_COMMUTATIVE,
hasReturning);
} }
else else
{ {
@ -151,7 +152,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
scanState->tuplestorestate = scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ExecuteTaskListExtended(CMD_INSERT, prunedTaskList, ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate, tupleDescriptor, scanState->tuplestorestate,
hasReturning, MaxAdaptiveExecutorPoolSize); hasReturning, MaxAdaptiveExecutorPoolSize);
} }

View File

@ -91,15 +91,17 @@ bool SortReturning = false;
/* functions needed during run phase */ /* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList); static void AcquireMetadataLocks(List *taskList);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
operation, bool alwaysThrowErrorOnFailure, bool RowModifyLevel modLevel,
expectResults); bool alwaysThrowErrorOnFailure, bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * BuildPlacementAccessList(int32 groupId, List *relationShardList, static List * BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
static List * GetModifyConnections(Task *task, bool markCritical); static List * GetModifyConnections(Task *task, bool markCritical);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, CitusScanState *scanState); ParamListInfo paramListInfo, CitusScanState *scanState);
static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel);
static void AcquireExecutorShardLocksForRelationRowLockList(Task *task);
static bool RequiresConsistentSnapshot(Task *task); static bool RequiresConsistentSnapshot(Task *task);
static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node);
@ -138,27 +140,18 @@ AcquireMetadataLocks(List *taskList)
} }
/* static void
* AcquireExecutorShardLock acquires a lock on the shard for the given task and AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel)
* command type if necessary to avoid divergence between multiple replicas of
* the same shard. No lock is obtained when there is only one replica.
*
* The function determines the appropriate lock mode based on the commutativity
* rule of the command. In each case, it uses a lock mode that enforces the
* commutativity rule.
*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/
void
AcquireExecutorShardLock(Task *task, CmdType commandType)
{ {
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
int64 shardId = task->anchorShardId; int64 shardId = task->anchorShardId;
if (commandType == CMD_SELECT) if (shardId == INVALID_SHARD_ID)
{
return;
}
if (modLevel <= ROW_MODIFY_READONLY)
{ {
/* /*
* The executor shard lock is used to maintain consistency between * The executor shard lock is used to maintain consistency between
@ -205,24 +198,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
lockMode = RowExclusiveLock; lockMode = RowExclusiveLock;
} }
else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{
/*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock;
}
else if (commandType == CMD_INSERT)
{ {
/* /*
* An INSERT commutes with other INSERT commands, since performing them * An INSERT commutes with other INSERT commands, since performing them
@ -245,16 +221,34 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
} }
else else
{ {
ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); /*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock;
} }
if (shardId != INVALID_SHARD_ID && lockMode != NoLock) if (lockMode != NoLock)
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode);
} }
}
static void
AcquireExecutorShardLocksForRelationRowLockList(Task *task)
{
/* /*
* If lock clause exists and it effects any reference table, we need to get * If lock clause exists and it effects any reference table, we need to get
* lock on shard resource. Type of lock is determined by the type of row lock * lock on shard resource. Type of lock is determined by the type of row lock
@ -301,6 +295,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
} }
} }
} }
}
/*
* AcquireExecutorShardLocks acquires locks on shards for the given task if
* necessary to avoid divergence between multiple replicas of the same shard.
* No lock is obtained when there is only one replica.
*
* The function determines the appropriate lock mode based on the commutativity
* rule of the command. In each case, it uses a lock mode that enforces the
* commutativity rule.
*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/
void
AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel)
{
AcquireExecutorShardLockForRowModify(task, modLevel);
AcquireExecutorShardLocksForRelationRowLockList(task);
/* /*
* If the task has a subselect, then we may need to lock the shards from which * If the task has a subselect, then we may need to lock the shards from which
@ -717,16 +733,16 @@ RouterSequentialModifyExecScan(CustomScanState *node)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
RowModifyLevel modLevel = distributedPlan->modLevel;
bool hasReturning = distributedPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
CmdType operation = scanState->distributedPlan->operation;
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
executorState->es_processed += executorState->es_processed +=
ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning); ExecuteModifyTasksSequentially(scanState, taskList, modLevel, hasReturning);
} }
@ -1056,7 +1072,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
* The function returns affectedTupleCount if applicable. * The function returns affectedTupleCount if applicable.
*/ */
static int64 static int64
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, RowModifyLevel modLevel,
bool alwaysThrowErrorOnFailure, bool expectResults) bool alwaysThrowErrorOnFailure, bool expectResults)
{ {
EState *executorState = NULL; EState *executorState = NULL;
@ -1107,10 +1123,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
* acquire the necessary locks on the relations, and blocks any * acquire the necessary locks on the relations, and blocks any
* unsafe concurrent operations. * unsafe concurrent operations.
*/ */
if (operation == CMD_INSERT || operation == CMD_UPDATE || if (modLevel > ROW_MODIFY_NONE)
operation == CMD_DELETE || operation == CMD_SELECT)
{ {
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLocks(task, modLevel);
} }
/* try to execute modification on all placements */ /* try to execute modification on all placements */
@ -1356,9 +1371,9 @@ ExecuteModifyTasksWithoutResults(List *taskList)
* and ignores the results. * and ignores the results.
*/ */
int64 int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, RowModifyLevel modLevel)
{ {
return ExecuteModifyTasksSequentially(NULL, taskList, operation, false); return ExecuteModifyTasksSequentially(NULL, taskList, modLevel, false);
} }
@ -1373,7 +1388,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
*/ */
int64 int64
ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning) RowModifyLevel modLevel, bool hasReturning)
{ {
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
@ -1422,7 +1437,7 @@ ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
bool expectResults = (hasReturning || task->relationRowLockList != NIL); bool expectResults = (hasReturning || task->relationRowLockList != NIL);
affectedTupleCount += affectedTupleCount +=
ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure, ExecuteSingleModifyTask(scanState, task, modLevel, alwaysThrowErrorOnFailure,
expectResults); expectResults);
} }

View File

@ -95,7 +95,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
} }
Assert(distributedPlan->operation == CMD_SELECT); Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY);
workerNodeList = ActiveReadableNodeList(); workerNodeList = ActiveReadableNodeList();
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);

View File

@ -592,7 +592,7 @@ CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacement
poolSize = MaxAdaptiveExecutorPoolSize; poolSize = MaxAdaptiveExecutorPoolSize;
} }
ExecuteTaskList(CMD_UTILITY, taskList, poolSize); ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize);
} }

View File

@ -69,7 +69,7 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the * start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting queries. The function first sets the localGroupId of the worker * worker for accepting queries. The function first sets the localGroupId of the worker
* so that the worker knows which tuple in pg_dist_node table represents itself. After * so that the worker knows which tuple in pg_dist_node table represents itself. After
* that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are * that, SQL statements for re-creating metadata of MX-eligible distributed tables are
* sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
* is marked as true. * is marked as true.
*/ */
@ -1188,7 +1188,7 @@ DetachPartitionCommandList(void)
/* /*
* We probably do not need this but as an extra precaution, we are enabling * We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to swtich back to original state. * DDL propagation to switch back to original state.
*/ */
detachPartitionCommandList = lappend(detachPartitionCommandList, detachPartitionCommandList = lappend(detachPartitionCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);

View File

@ -70,7 +70,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
query = copyObject(originalQuery); query = copyObject(originalQuery);
copiedInsertRte = ExtractInsertRangeTableEntry(query); copiedInsertRte = ExtractResultRelationRTE(query);
copiedSubqueryRte = ExtractSelectRangeTableEntry(query); copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
copiedSubquery = copiedSubqueryRte->subquery; copiedSubquery = copiedSubqueryRte->subquery;
@ -92,7 +92,8 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
} }
else if (task->upsertQuery || valuesRTE != NULL) else if (query->commandType == CMD_INSERT && (query->onConflict != NULL ||
valuesRTE != NULL))
{ {
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;

View File

@ -50,8 +50,8 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log
static uint64 NextPlanId = 1; static uint64 NextPlanId = 1;
/* local function forward declarations */
static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
Query *originalQuery, Query *query, Query *originalQuery, Query *query,
ParamListInfo boundParams, ParamListInfo boundParams,
@ -423,22 +423,6 @@ IsModifyCommand(Query *query)
} }
/*
* IsMultiShardModifyPlan returns true if the given plan was generated for
* multi shard update or delete query.
*/
bool
IsMultiShardModifyPlan(DistributedPlan *distributedPlan)
{
if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan))
{
return true;
}
return false;
}
/* /*
* IsMultiTaskPlan returns true if job contains multiple tasks. * IsMultiTaskPlan returns true if job contains multiple tasks.
*/ */
@ -457,19 +441,13 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
/* /*
* IsUpdateOrDelete returns true if the query performs update or delete. * IsUpdateOrDelete returns true if the query performs an update or delete.
*/ */
bool bool
IsUpdateOrDelete(DistributedPlan *distributedPlan) IsUpdateOrDelete(Query *query)
{ {
CmdType commandType = distributedPlan->operation; return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
return true;
}
return false;
} }
@ -480,15 +458,7 @@ IsUpdateOrDelete(DistributedPlan *distributedPlan)
bool bool
IsModifyDistributedPlan(DistributedPlan *distributedPlan) IsModifyDistributedPlan(DistributedPlan *distributedPlan)
{ {
bool isModifyDistributedPlan = false; return distributedPlan->modLevel > ROW_MODIFY_READONLY;
CmdType operation = distributedPlan->operation;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE)
{
isModifyDistributedPlan = true;
}
return isModifyDistributedPlan;
} }
@ -569,11 +539,12 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
* query planning failed (possibly) due to prepared statement parameters or * query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query. * if it is planned as a multi shard modify query.
*/ */
if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) && if ((distributedPlan->planningError ||
(IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
hasUnresolvedParams) hasUnresolvedParams)
{ {
/* /*
* Arbitraryly high cost, but low enough that it can be added up * Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan(). * without overflowing by choose_custom_plan().
*/ */
resultPlan->planTree->total_cost = FLT_MAX / 100000000; resultPlan->planTree->total_cost = FLT_MAX / 100000000;

View File

@ -85,7 +85,7 @@ InsertSelectIntoDistributedTable(Query *query)
if (insertSelectQuery) if (insertSelectQuery)
{ {
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
if (IsDistributedTable(insertRte->relid)) if (IsDistributedTable(insertRte->relid))
{ {
return true; return true;
@ -108,7 +108,7 @@ InsertSelectIntoLocalTable(Query *query)
if (insertSelectQuery) if (insertSelectQuery)
{ {
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
if (!IsDistributedTable(insertRte->relid)) if (!IsDistributedTable(insertRte->relid))
{ {
return true; return true;
@ -220,7 +220,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
Job *workerJob = NULL; Job *workerJob = NULL;
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery);
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
@ -230,7 +230,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool allDistributionKeysInQueryAreEqual = false; bool allDistributionKeysInQueryAreEqual = false;
distributedPlan->operation = originalQuery->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
/* /*
* Error semantics for INSERT ... SELECT queries are different than regular * Error semantics for INSERT ... SELECT queries are different than regular
@ -423,7 +423,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
bool safeToPushdownSubquery) bool safeToPushdownSubquery)
{ {
Query *copiedQuery = copyObject(originalQuery); Query *copiedQuery = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery);
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery);
Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
@ -443,7 +443,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
List *insertShardPlacementList = NULL; List *insertShardPlacementList = NULL;
List *intersectedPlacementList = NULL; List *intersectedPlacementList = NULL;
bool upsertQuery = false;
bool replacePrunedQueryWithDummy = false; bool replacePrunedQueryWithDummy = false;
bool allReferenceTables = bool allReferenceTables =
plannerRestrictionContext->relationRestrictionContext->allReferenceTables; plannerRestrictionContext->relationRestrictionContext->allReferenceTables;
@ -566,12 +565,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
/* this is required for correct deparsing of the query */ /* this is required for correct deparsing of the query */
ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte);
/* set the upsert flag */
if (originalQuery->onConflict != NULL)
{
upsertQuery = true;
}
/* setting an alias simplifies deparsing of RETURNING */ /* setting an alias simplifies deparsing of RETURNING */
if (copiedInsertRte->alias == NULL) if (copiedInsertRte->alias == NULL)
{ {
@ -589,7 +582,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
modifyTask->dependedTaskList = NULL; modifyTask->dependedTaskList = NULL;
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = upsertQuery;
modifyTask->relationShardList = relationShardList; modifyTask->relationShardList = relationShardList;
modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->replicationModel = cacheEntry->replicationModel;
@ -1138,11 +1130,11 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
Query *selectQuery = NULL; Query *selectQuery = NULL;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->operation = CMD_INSERT; distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
distributedPlan->planningError = distributedPlan->planningError =
CoordinatorInsertSelectSupported(insertSelectQuery); CoordinatorInsertSelectSupported(insertSelectQuery);
@ -1243,7 +1235,7 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery)
return deferredError; return deferredError;
} }
insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); insertRte = ExtractResultRelationRTE(insertSelectQuery);
if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND)
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
@ -1347,7 +1339,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
* then deparse it. * then deparse it.
*/ */
Query *insertResultQuery = copyObject(insertSelectQuery); Query *insertResultQuery = copyObject(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertResultQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
@ -1441,7 +1433,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
modifyTask->dependedTaskList = NULL; modifyTask->dependedTaskList = NULL;
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = insertResultQuery->onConflict != NULL;
modifyTask->relationShardList = list_make1(relationShard); modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetCacheEntry->replicationModel; modifyTask->replicationModel = targetCacheEntry->replicationModel;

View File

@ -228,7 +228,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
distributedPlan->workerJob = workerJob; distributedPlan->workerJob = workerJob;
distributedPlan->masterQuery = masterQuery; distributedPlan->masterQuery = masterQuery;
distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan);
distributedPlan->operation = CMD_SELECT; distributedPlan->modLevel = ROW_MODIFY_READONLY;
return distributedPlan; return distributedPlan;
} }
@ -2430,7 +2430,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
subqueryTask->dependedTaskList = NULL; subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId; subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList; subqueryTask->relationShardList = relationShardList;
return subqueryTask; return subqueryTask;
@ -4310,6 +4309,41 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
} }
/*
* Determine RowModifyLevel required for given query
*/
RowModifyLevel
RowModifyLevelForQuery(Query *query)
{
CmdType commandType = query->commandType;
if (commandType == CMD_SELECT)
{
return ROW_MODIFY_READONLY;
}
if (commandType == CMD_INSERT)
{
if (query->onConflict == NULL)
{
return ROW_MODIFY_COMMUTATIVE;
}
else
{
return ROW_MODIFY_NONCOMMUTATIVE;
}
}
if (commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
return ROW_MODIFY_NONCOMMUTATIVE;
}
return ROW_MODIFY_NONE;
}
/* /*
* ColumnName resolves the given column's name. The given column could belong to * ColumnName resolves the given column's name. The given column could belong to
* a regular table or to an intermediate table formed to execute a distributed * a regular table or to an intermediate table formed to execute a distributed

View File

@ -186,7 +186,7 @@ CreateRouterPlan(Query *originalQuery, Query *query,
/* /*
* CreateModifyPlan attempts to create a plan the given modification * CreateModifyPlan attempts to create a plan for the given modification
* statement. If planning fails ->planningError is set to a description of * statement. If planning fails ->planningError is set to a description of
* the failure. * the failure.
*/ */
@ -198,7 +198,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false; bool multiShardQuery = false;
distributedPlan->operation = query->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, distributedPlan->planningError = ModifyQuerySupported(query, originalQuery,
multiShardQuery, multiShardQuery,
@ -244,8 +244,8 @@ CreateModifyPlan(Query *originalQuery, Query *query,
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns * either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete) * query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is * are router plannable by default. If query is not router plannable the returned plan
* returned, or the returned plan has planningError set to a description of the problem. * has planningError set to a description of the problem.
*/ */
static void static void
CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery, CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery,
@ -254,7 +254,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer
{ {
Job *job = NULL; Job *job = NULL;
distributedPlan->operation = query->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(query);
/* we cannot have multi shard update/delete query via this code path */ /* we cannot have multi shard update/delete query via this code path */
job = RouterJob(originalQuery, plannerRestrictionContext, job = RouterJob(originalQuery, plannerRestrictionContext,
@ -491,7 +491,7 @@ ModifyQueryResultRelationId(Query *query)
errmsg("input query is not a modification query"))); errmsg("input query is not a modification query")));
} }
resultRte = ExtractInsertRangeTableEntry(query); resultRte = ExtractResultRelationRTE(query);
Assert(OidIsValid(resultRte->relid)); Assert(OidIsValid(resultRte->relid));
return resultRte->relid; return resultRte->relid;
@ -512,20 +512,12 @@ ResultRelationOidForQuery(Query *query)
/* /*
* ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. * ExtractResultRelationRTE returns the table's resultRelation range table entry.
* Note that the function expects and asserts that the input query be
* an INSERT...SELECT query.
*/ */
RangeTblEntry * RangeTblEntry *
ExtractInsertRangeTableEntry(Query *query) ExtractResultRelationRTE(Query *query)
{ {
int resultRelation = query->resultRelation; return rt_fetch(query->resultRelation, query->rtable);
List *rangeTableList = query->rtable;
RangeTblEntry *insertRTE = NULL;
insertRTE = rt_fetch(resultRelation, rangeTableList);
return insertRTE;
} }
@ -1129,14 +1121,8 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
bool bool
UpdateOrDeleteQuery(Query *query) UpdateOrDeleteQuery(Query *query)
{ {
CmdType commandType = query->commandType; return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
return true;
}
return false;
} }
@ -1538,11 +1524,6 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->replicationModel = cacheEntry->replicationModel;
modifyTask->rowValuesLists = modifyRoute->rowValuesLists; modifyTask->rowValuesLists = modifyRoute->rowValuesLists;
if (query->onConflict != NULL)
{
modifyTask->upsertQuery = true;
}
insertTaskList = lappend(insertTaskList, modifyTask); insertTaskList = lappend(insertTaskList, modifyTask);
} }
@ -1572,7 +1553,6 @@ CreateTask(TaskType taskType)
task->shardInterval = NULL; task->shardInterval = NULL;
task->assignmentConstrained = false; task->assignmentConstrained = false;
task->taskExecution = NULL; task->taskExecution = NULL;
task->upsertQuery = false;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->relationRowLockList = NIL; task->relationRowLockList = NIL;
@ -1858,8 +1838,8 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
/* /*
* GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE.
* it returns it. * Returns that RTE if found.
*/ */
static RangeTblEntry * static RangeTblEntry *
GetUpdateOrDeleteRTE(Query *query) GetUpdateOrDeleteRTE(Query *query)

View File

@ -103,12 +103,12 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
DECLARE_FROM_AND_NEW_NODE(DistributedPlan); DECLARE_FROM_AND_NEW_NODE(DistributedPlan);
COPY_SCALAR_FIELD(planId); COPY_SCALAR_FIELD(planId);
COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(modLevel);
COPY_SCALAR_FIELD(hasReturning); COPY_SCALAR_FIELD(hasReturning);
COPY_SCALAR_FIELD(routerExecutable);
COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(workerJob);
COPY_NODE_FIELD(masterQuery); COPY_NODE_FIELD(masterQuery);
COPY_SCALAR_FIELD(routerExecutable);
COPY_SCALAR_FIELD(queryId); COPY_SCALAR_FIELD(queryId);
COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(relationIdList);
@ -257,7 +257,6 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(shardInterval); COPY_NODE_FIELD(shardInterval);
COPY_SCALAR_FIELD(assignmentConstrained); COPY_SCALAR_FIELD(assignmentConstrained);
COPY_NODE_FIELD(taskExecution); COPY_NODE_FIELD(taskExecution);
COPY_SCALAR_FIELD(upsertQuery);
COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(replicationModel);
COPY_SCALAR_FIELD(modifyWithSubquery); COPY_SCALAR_FIELD(modifyWithSubquery);
COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(relationShardList);

View File

@ -176,12 +176,12 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
WRITE_UINT64_FIELD(planId); WRITE_UINT64_FIELD(planId);
WRITE_INT_FIELD(operation); WRITE_ENUM_FIELD(modLevel, RowModifyLevel);
WRITE_BOOL_FIELD(hasReturning); WRITE_BOOL_FIELD(hasReturning);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_UINT64_FIELD(queryId); WRITE_UINT64_FIELD(queryId);
WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(relationIdList);
@ -467,7 +467,6 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(shardInterval); WRITE_NODE_FIELD(shardInterval);
WRITE_BOOL_FIELD(assignmentConstrained); WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution); WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_CHAR_FIELD(replicationModel); WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_BOOL_FIELD(modifyWithSubquery);
WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationShardList);

View File

@ -202,12 +202,12 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_LOCALS(DistributedPlan); READ_LOCALS(DistributedPlan);
READ_UINT64_FIELD(planId); READ_UINT64_FIELD(planId);
READ_INT_FIELD(operation); READ_ENUM_FIELD(modLevel, RowModifyLevel);
READ_BOOL_FIELD(hasReturning); READ_BOOL_FIELD(hasReturning);
READ_BOOL_FIELD(routerExecutable);
READ_NODE_FIELD(workerJob); READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_BOOL_FIELD(routerExecutable);
READ_UINT64_FIELD(queryId); READ_UINT64_FIELD(queryId);
READ_NODE_FIELD(relationIdList); READ_NODE_FIELD(relationIdList);
@ -381,7 +381,6 @@ ReadTask(READFUNC_ARGS)
READ_NODE_FIELD(shardInterval); READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_CHAR_FIELD(replicationModel); READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(modifyWithSubquery); READ_BOOL_FIELD(modifyWithSubquery);
READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(relationShardList);

View File

@ -108,12 +108,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
JoinType jointype, JoinType jointype,
JoinPathExtraData *extra); JoinPathExtraData *extra);
extern bool IsModifyCommand(Query *query); extern bool IsModifyCommand(Query *query);
extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan);
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern void EnsurePartitionTableNotReplicated(Oid relationId); extern void EnsurePartitionTableNotReplicated(Oid relationId);
extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int GetRTEIdentity(RangeTblEntry *rte); extern int GetRTEIdentity(RangeTblEntry *rte);

View File

@ -38,13 +38,14 @@ extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once); bool execute_once);
extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node); extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node);
extern uint64 ExecuteTaskListExtended(CmdType operation, List *taskList, extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize); bool hasReturning, int targetPoolSize);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
bool forceSequentialExecution); bool forceSequentialExecution);
extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);
extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * CitusExecScan(CustomScanState *node);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);

View File

@ -110,6 +110,15 @@ typedef enum
} BoundaryNodeJobType; } BoundaryNodeJobType;
/* Enumeration that specifies extent of DML modifications */
typedef enum RowModifyLevel
{
ROW_MODIFY_NONE = 0,
ROW_MODIFY_READONLY = 1,
ROW_MODIFY_COMMUTATIVE = 2,
ROW_MODIFY_NONCOMMUTATIVE = 3
} RowModifyLevel;
/* /*
* Job represents a logical unit of work that contains one set of data transfers * Job represents a logical unit of work that contains one set of data transfers
* in our physical plan. The physical planner maps each SQL query into one or * in our physical plan. The physical planner maps each SQL query into one or
@ -183,7 +192,6 @@ typedef struct Task
ShardInterval *shardInterval; /* only applies to merge tasks */ ShardInterval *shardInterval; /* only applies to merge tasks */
bool assignmentConstrained; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
char replicationModel; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */
List *relationRowLockList; List *relationRowLockList;
@ -229,21 +237,21 @@ typedef struct DistributedPlan
/* unique identifier of the plan within the session */ /* unique identifier of the plan within the session */
uint64 planId; uint64 planId;
/* type of command to execute (SELECT/INSERT/...) */ /* specifies nature of modifications in query */
CmdType operation; RowModifyLevel modLevel;
/* specifies whether a DML command has a RETURNING */ /* specifies whether a DML command has a RETURNING */
bool hasReturning; bool hasReturning;
/* a router executable query is executed entirely on a worker */
bool routerExecutable;
/* job tree containing the tasks to be executed on workers */ /* job tree containing the tasks to be executed on workers */
Job *workerJob; Job *workerJob;
/* local query that merges results from the workers */ /* local query that merges results from the workers */
Query *masterQuery; Query *masterQuery;
/* a router executable query is executed entirely on a worker */
bool routerExecutable;
/* query identifier (copied from the top-level PlannedStmt) */ /* query identifier (copied from the top-level PlannedStmt) */
uint64 queryId; uint64 queryId;
@ -340,6 +348,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval); ShardInterval *secondInterval);
extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount);
extern RowModifyLevel RowModifyLevelForQuery(Query *query);
/* function declarations for Task and Task list operations */ /* function declarations for Task and Task list operations */

View File

@ -46,15 +46,15 @@ extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning); RowModifyLevel modLevel, bool hasReturning);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation); RowModifyLevel modLevel);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
/* helper functions */ /* helper functions */
extern void AcquireExecutorShardLock(Task *task, CmdType commandType); extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
extern void AcquireExecutorMultiShardLocks(List *taskList); extern void AcquireExecutorMultiShardLocks(List *taskList);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);

View File

@ -61,7 +61,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext(
extern Oid ExtractFirstDistributedTableId(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query);
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern Oid ModifyQueryResultRelationId(Query *query); extern Oid ModifyQueryResultRelationId(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractResultRelationRTE(Query *query);
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query); extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery, extern void AddShardIntervalRestrictionToSelect(Query *subqery,

View File

@ -17,6 +17,8 @@ multi_subtransactions
multi_modifying_xacts multi_modifying_xacts
multi_insert_select multi_insert_select
sql_procedure sql_procedure
multi_reference_table
multi_create_table_constraints
# the following tests' output are # the following tests' output are
# normalized for EXPLAIN outputs # normalized for EXPLAIN outputs