DistributedPlan: replace operation with modLevel

This causes no behaviorial changes, only organizes better to implement modifying CTEs

Also rename ExtactInsertRangeTableEntry to ExtractResultRelationRTE,
as the source of this function didn't match the documentation

Remove Task's upsertQuery in favor of ROW_MODIFY_NONCOMMUTATIVE

Split up AcquireExecutorShardLock into more internal functions

Tests: Normalize multi_reference_table multi_create_table_constraints
pull/2807/head
Philip Dubé 2019-06-27 13:22:32 +02:00 committed by Hadi Moshayedi
parent 0bdec52761
commit 0915027389
21 changed files with 224 additions and 215 deletions

View File

@ -158,8 +158,8 @@
*/ */
typedef struct DistributedExecution typedef struct DistributedExecution
{ {
/* the corresponding distributed plan's operation */ /* the corresponding distributed plan's modLevel */
CmdType operation; RowModifyLevel modLevel;
List *tasksToExecute; List *tasksToExecute;
@ -504,7 +504,7 @@ int ExecutorSlowStartInterval = 10;
/* local functions */ /* local functions */
static DistributedExecution * CreateDistributedExecution(CmdType operation, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList, List *taskList,
bool hasReturning, bool hasReturning,
ParamListInfo paramListInfo, ParamListInfo paramListInfo,
@ -520,16 +520,16 @@ static void FinishDistributedExecution(DistributedExecution *execution);
static void CleanUpSessions(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution);
static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan);
static void AcquireExecutorShardLocks(DistributedExecution *execution); static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution);
static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution);
static bool DistributedPlanModifiesDatabase(DistributedPlan *plan); static bool DistributedPlanModifiesDatabase(DistributedPlan *plan);
static bool TaskListModifiesDatabase(CmdType operation, List *taskList); static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList);
static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution);
static bool SelectForUpdateOnReferenceTable(CmdType operation, List *taskList); static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList);
static void AssignTasksToConnections(DistributedExecution *execution); static void AssignTasksToConnections(DistributedExecution *execution);
static void UnclaimAllSessionConnections(List *sessionList); static void UnclaimAllSessionConnections(List *sessionList);
static bool UseConnectionPerPlacement(void); static bool UseConnectionPerPlacement(void);
static PlacementExecutionOrder ExecutionOrderForTask(CmdType operation, Task *task); static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task);
static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution,
WorkerNode *workerNode); WorkerNode *workerNode);
static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool,
@ -610,7 +610,7 @@ AdaptiveExecutor(CustomScanState *node)
targetPoolSize = 1; targetPoolSize = 1;
} }
execution = CreateDistributedExecution(distributedPlan->operation, taskList, execution = CreateDistributedExecution(distributedPlan->modLevel, taskList,
distributedPlan->hasReturning, distributedPlan->hasReturning,
paramListInfo, tupleDescriptor, paramListInfo, tupleDescriptor,
tupleStore, targetPoolSize); tupleStore, targetPoolSize);
@ -626,7 +626,7 @@ AdaptiveExecutor(CustomScanState *node)
RunDistributedExecution(execution); RunDistributedExecution(execution);
} }
if (distributedPlan->operation != CMD_SELECT) if (distributedPlan->modLevel != ROW_MODIFY_READONLY)
{ {
executorState->es_processed = execution->rowsProcessed; executorState->es_processed = execution->rowsProcessed;
} }
@ -654,14 +654,14 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
{ {
if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE) if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE)
{ {
ExecuteTaskList(CMD_UTILITY, taskList, targetPoolSize); ExecuteTaskList(ROW_MODIFY_NONE, taskList, targetPoolSize);
} }
else else
{ {
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION || if (MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
forceSequentialExecution) forceSequentialExecution)
{ {
ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY); ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE);
} }
else else
{ {
@ -676,13 +676,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
* for some of the arguments. * for some of the arguments.
*/ */
uint64 uint64
ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL; Tuplestorestate *tupleStore = NULL;
bool hasReturning = false; bool hasReturning = false;
return ExecuteTaskListExtended(operation, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize); tupleStore, hasReturning, targetPoolSize);
} }
@ -692,7 +692,7 @@ ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize)
* runs it. * runs it.
*/ */
uint64 uint64
ExecuteTaskListExtended(CmdType operation, List *taskList, ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize) bool hasReturning, int targetPoolSize)
{ {
@ -705,7 +705,7 @@ ExecuteTaskListExtended(CmdType operation, List *taskList,
} }
execution = execution =
CreateDistributedExecution(operation, taskList, hasReturning, paramListInfo, CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize); tupleDescriptor, tupleStore, targetPoolSize);
StartDistributedExecution(execution); StartDistributedExecution(execution);
@ -721,14 +721,14 @@ ExecuteTaskListExtended(CmdType operation, List *taskList,
* a distributed plan. * a distributed plan.
*/ */
DistributedExecution * DistributedExecution *
CreateDistributedExecution(CmdType operation, List *taskList, bool hasReturning, CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize) Tuplestorestate *tupleStore, int targetPoolSize)
{ {
DistributedExecution *execution = DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution)); (DistributedExecution *) palloc0(sizeof(DistributedExecution));
execution->operation = operation; execution->modLevel = modLevel;
execution->tasksToExecute = taskList; execution->tasksToExecute = taskList;
execution->hasReturning = hasReturning; execution->hasReturning = hasReturning;
@ -823,7 +823,7 @@ StartDistributedExecution(DistributedExecution *execution)
} }
/* prevent unsafe concurrent modifications */ /* prevent unsafe concurrent modifications */
AcquireExecutorShardLocks(execution); AcquireExecutorShardLocksForExecution(execution);
} }
@ -834,7 +834,7 @@ StartDistributedExecution(DistributedExecution *execution)
static bool static bool
DistributedExecutionModifiesDatabase(DistributedExecution *execution) DistributedExecutionModifiesDatabase(DistributedExecution *execution)
{ {
return TaskListModifiesDatabase(execution->operation, execution->tasksToExecute); return TaskListModifiesDatabase(execution->modLevel, execution->tasksToExecute);
} }
@ -845,7 +845,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution)
static bool static bool
DistributedPlanModifiesDatabase(DistributedPlan *plan) DistributedPlanModifiesDatabase(DistributedPlan *plan)
{ {
return TaskListModifiesDatabase(plan->operation, plan->workerJob->taskList); return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList);
} }
@ -854,22 +854,22 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan)
* DistributedPlanModifiesDatabase. * DistributedPlanModifiesDatabase.
*/ */
static bool static bool
TaskListModifiesDatabase(CmdType operation, List *taskList) TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList)
{ {
Task *firstTask = NULL; Task *firstTask = NULL;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) if (modLevel > ROW_MODIFY_READONLY)
{ {
return true; return true;
} }
/* /*
* If we cannot decide by only checking the operation, we should look closer * If we cannot decide by only checking the row modify level,
* to the tasks. * we should look closer to the tasks.
*/ */
if (list_length(taskList) < 1) if (list_length(taskList) < 1)
{ {
/* does this ever possible? */ /* is this ever possible? */
return false; return false;
} }
@ -968,12 +968,12 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution)
* that contains FOR UPDATE clause that locks any reference tables. * that contains FOR UPDATE clause that locks any reference tables.
*/ */
static bool static bool
SelectForUpdateOnReferenceTable(CmdType operation, List *taskList) SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList)
{ {
Task *task = NULL; Task *task = NULL;
ListCell *rtiLockCell = NULL; ListCell *rtiLockCell = NULL;
if (operation != CMD_SELECT) if (modLevel != ROW_MODIFY_READONLY)
{ {
return false; return false;
} }
@ -1025,8 +1025,8 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
/* /*
* AcquireExecutorShardLocks acquires advisory lock on shard IDs to prevent * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs
* unsafe concurrent modifications of shards. * to prevent unsafe concurrent modifications of shards.
* *
* We prevent concurrent modifications of shards in two cases: * We prevent concurrent modifications of shards in two cases:
* 1. Any non-commutative writes to a replicated table * 1. Any non-commutative writes to a replicated table
@ -1042,13 +1042,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
* TRUNCATE because the table locks already prevent concurrent access. * TRUNCATE because the table locks already prevent concurrent access.
*/ */
static void static void
AcquireExecutorShardLocks(DistributedExecution *execution) AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
{ {
CmdType operation = execution->operation; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
if (!(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE || if (modLevel <= ROW_MODIFY_READONLY &&
SelectForUpdateOnReferenceTable(operation, taskList))) !SelectForUpdateOnReferenceTable(modLevel, taskList))
{ {
/* /*
* Executor locks only apply to DML commands and SELECT FOR UPDATE queries * Executor locks only apply to DML commands and SELECT FOR UPDATE queries
@ -1069,7 +1069,7 @@ AcquireExecutorShardLocks(DistributedExecution *execution)
{ {
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLocks(task, modLevel);
} }
} }
else if (list_length(taskList) > 1) else if (list_length(taskList) > 1)
@ -1211,7 +1211,7 @@ UnclaimAllSessionConnections(List *sessionList)
static void static void
AssignTasksToConnections(DistributedExecution *execution) AssignTasksToConnections(DistributedExecution *execution)
{ {
CmdType operation = execution->operation; RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
bool hasReturning = execution->hasReturning; bool hasReturning = execution->hasReturning;
@ -1233,13 +1233,15 @@ AssignTasksToConnections(DistributedExecution *execution)
shardCommandExecution = shardCommandExecution =
(ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution)); (ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution));
shardCommandExecution->task = task; shardCommandExecution->task = task;
shardCommandExecution->executionOrder = ExecutionOrderForTask(operation, task); shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task);
shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED;
shardCommandExecution->placementExecutions = shardCommandExecution->placementExecutions =
(TaskPlacementExecution **) palloc0(placementExecutionCount * (TaskPlacementExecution **) palloc0(placementExecutionCount *
sizeof(TaskPlacementExecution *)); sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount; shardCommandExecution->placementExecutionCount = placementExecutionCount;
shardCommandExecution->expectResults = hasReturning || operation == CMD_SELECT;
shardCommandExecution->expectResults = hasReturning ||
modLevel == ROW_MODIFY_READONLY;
foreach(taskPlacementCell, task->taskPlacementList) foreach(taskPlacementCell, task->taskPlacementList)
@ -1390,7 +1392,7 @@ UseConnectionPerPlacement(void)
* ExecutionOrderForTask gives the appropriate execution order for a task. * ExecutionOrderForTask gives the appropriate execution order for a task.
*/ */
static PlacementExecutionOrder static PlacementExecutionOrder
ExecutionOrderForTask(CmdType operation, Task *task) ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
{ {
switch (task->taskType) switch (task->taskType)
{ {
@ -1402,7 +1404,14 @@ ExecutionOrderForTask(CmdType operation, Task *task)
case MODIFY_TASK: case MODIFY_TASK:
{ {
if (operation == CMD_INSERT && !task->upsertQuery) /*
* For non-commutative modifications we take aggressive locks, so
* there is no risk of deadlock and we can run them in parallel.
* When the modification is commutative, we take no additional
* locks, so we take a conservative approach and execute sequentially
* to avoid deadlocks.
*/
if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{ {
return EXECUTION_ORDER_SEQUENTIAL; return EXECUTION_ORDER_SEQUENTIAL;
} }

View File

@ -163,7 +163,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
scanState = (CitusScanState *) node; scanState = (CitusScanState *) node;
distributedPlan = scanState->distributedPlan; distributedPlan = scanState->distributedPlan;
if (distributedPlan->operation == CMD_SELECT || if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectSubquery != NULL) distributedPlan->insertSelectSubquery != NULL)
{ {
/* no more action required */ /* no more action required */

View File

@ -133,7 +133,8 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{ {
ExecuteModifyTasksSequentially(scanState, prunedTaskList, ExecuteModifyTasksSequentially(scanState, prunedTaskList,
CMD_INSERT, hasReturning); ROW_MODIFY_COMMUTATIVE,
hasReturning);
} }
else else
{ {
@ -151,7 +152,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
scanState->tuplestorestate = scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem); tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ExecuteTaskListExtended(CMD_INSERT, prunedTaskList, ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate, tupleDescriptor, scanState->tuplestorestate,
hasReturning, MaxAdaptiveExecutorPoolSize); hasReturning, MaxAdaptiveExecutorPoolSize);
} }

View File

@ -91,15 +91,17 @@ bool SortReturning = false;
/* functions needed during run phase */ /* functions needed during run phase */
static void AcquireMetadataLocks(List *taskList); static void AcquireMetadataLocks(List *taskList);
static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task,
operation, bool alwaysThrowErrorOnFailure, bool RowModifyLevel modLevel,
expectResults); bool alwaysThrowErrorOnFailure, bool expectResults);
static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task);
static List * BuildPlacementAccessList(int32 groupId, List *relationShardList, static List * BuildPlacementAccessList(int32 groupId, List *relationShardList,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
static List * GetModifyConnections(Task *task, bool markCritical); static List * GetModifyConnections(Task *task, bool markCritical);
static int64 ExecuteModifyTasks(List *taskList, bool expectResults, static int64 ExecuteModifyTasks(List *taskList, bool expectResults,
ParamListInfo paramListInfo, CitusScanState *scanState); ParamListInfo paramListInfo, CitusScanState *scanState);
static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel);
static void AcquireExecutorShardLocksForRelationRowLockList(Task *task);
static bool RequiresConsistentSnapshot(Task *task); static bool RequiresConsistentSnapshot(Task *task);
static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterMultiModifyExecScan(CustomScanState *node);
static void RouterSequentialModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node);
@ -138,27 +140,18 @@ AcquireMetadataLocks(List *taskList)
} }
/* static void
* AcquireExecutorShardLock acquires a lock on the shard for the given task and AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel)
* command type if necessary to avoid divergence between multiple replicas of
* the same shard. No lock is obtained when there is only one replica.
*
* The function determines the appropriate lock mode based on the commutativity
* rule of the command. In each case, it uses a lock mode that enforces the
* commutativity rule.
*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/
void
AcquireExecutorShardLock(Task *task, CmdType commandType)
{ {
LOCKMODE lockMode = NoLock; LOCKMODE lockMode = NoLock;
int64 shardId = task->anchorShardId; int64 shardId = task->anchorShardId;
if (commandType == CMD_SELECT) if (shardId == INVALID_SHARD_ID)
{
return;
}
if (modLevel <= ROW_MODIFY_READONLY)
{ {
/* /*
* The executor shard lock is used to maintain consistency between * The executor shard lock is used to maintain consistency between
@ -205,24 +198,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
lockMode = RowExclusiveLock; lockMode = RowExclusiveLock;
} }
else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE)
{
/*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock;
}
else if (commandType == CMD_INSERT)
{ {
/* /*
* An INSERT commutes with other INSERT commands, since performing them * An INSERT commutes with other INSERT commands, since performing them
@ -245,16 +221,34 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
} }
else else
{ {
ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); /*
* UPDATE/DELETE/UPSERT commands do not commute with other modifications
* since the rows modified by one command may be affected by the outcome
* of another command.
*
* We need to handle upsert before INSERT, because PostgreSQL models
* upsert commands as INSERT with an ON CONFLICT section.
*
* ExclusiveLock conflicts with all lock types used by modifications
* and therefore prevents other modifications from running
* concurrently.
*/
lockMode = ExclusiveLock;
} }
if (shardId != INVALID_SHARD_ID && lockMode != NoLock) if (lockMode != NoLock)
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode);
} }
}
static void
AcquireExecutorShardLocksForRelationRowLockList(Task *task)
{
/* /*
* If lock clause exists and it effects any reference table, we need to get * If lock clause exists and it effects any reference table, we need to get
* lock on shard resource. Type of lock is determined by the type of row lock * lock on shard resource. Type of lock is determined by the type of row lock
@ -301,6 +295,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType)
} }
} }
} }
}
/*
* AcquireExecutorShardLocks acquires locks on shards for the given task if
* necessary to avoid divergence between multiple replicas of the same shard.
* No lock is obtained when there is only one replica.
*
* The function determines the appropriate lock mode based on the commutativity
* rule of the command. In each case, it uses a lock mode that enforces the
* commutativity rule.
*
* The mapping is overridden when all_modifications_commutative is set to true.
* In that case, all modifications are treated as commutative, which can be used
* to communicate that the application is only generating commutative
* UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary.
*/
void
AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel)
{
AcquireExecutorShardLockForRowModify(task, modLevel);
AcquireExecutorShardLocksForRelationRowLockList(task);
/* /*
* If the task has a subselect, then we may need to lock the shards from which * If the task has a subselect, then we may need to lock the shards from which
@ -717,16 +733,16 @@ RouterSequentialModifyExecScan(CustomScanState *node)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
RowModifyLevel modLevel = distributedPlan->modLevel;
bool hasReturning = distributedPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList; List *taskList = workerJob->taskList;
EState *executorState = ScanStateGetExecutorState(scanState); EState *executorState = ScanStateGetExecutorState(scanState);
CmdType operation = scanState->distributedPlan->operation;
Assert(!scanState->finishedRemoteScan); Assert(!scanState->finishedRemoteScan);
executorState->es_processed += executorState->es_processed +=
ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning); ExecuteModifyTasksSequentially(scanState, taskList, modLevel, hasReturning);
} }
@ -1056,7 +1072,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access
* The function returns affectedTupleCount if applicable. * The function returns affectedTupleCount if applicable.
*/ */
static int64 static int64
ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, RowModifyLevel modLevel,
bool alwaysThrowErrorOnFailure, bool expectResults) bool alwaysThrowErrorOnFailure, bool expectResults)
{ {
EState *executorState = NULL; EState *executorState = NULL;
@ -1107,10 +1123,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation
* acquire the necessary locks on the relations, and blocks any * acquire the necessary locks on the relations, and blocks any
* unsafe concurrent operations. * unsafe concurrent operations.
*/ */
if (operation == CMD_INSERT || operation == CMD_UPDATE || if (modLevel > ROW_MODIFY_NONE)
operation == CMD_DELETE || operation == CMD_SELECT)
{ {
AcquireExecutorShardLock(task, operation); AcquireExecutorShardLocks(task, modLevel);
} }
/* try to execute modification on all placements */ /* try to execute modification on all placements */
@ -1356,9 +1371,9 @@ ExecuteModifyTasksWithoutResults(List *taskList)
* and ignores the results. * and ignores the results.
*/ */
int64 int64
ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, RowModifyLevel modLevel)
{ {
return ExecuteModifyTasksSequentially(NULL, taskList, operation, false); return ExecuteModifyTasksSequentially(NULL, taskList, modLevel, false);
} }
@ -1373,7 +1388,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation)
*/ */
int64 int64
ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning) RowModifyLevel modLevel, bool hasReturning)
{ {
ListCell *taskCell = NULL; ListCell *taskCell = NULL;
bool multipleTasks = list_length(taskList) > 1; bool multipleTasks = list_length(taskList) > 1;
@ -1422,7 +1437,7 @@ ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
bool expectResults = (hasReturning || task->relationRowLockList != NIL); bool expectResults = (hasReturning || task->relationRowLockList != NIL);
affectedTupleCount += affectedTupleCount +=
ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure, ExecuteSingleModifyTask(scanState, task, modLevel, alwaysThrowErrorOnFailure,
expectResults); expectResults);
} }

View File

@ -95,7 +95,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT;
} }
Assert(distributedPlan->operation == CMD_SELECT); Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY);
workerNodeList = ActiveReadableNodeList(); workerNodeList = ActiveReadableNodeList();
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);

View File

@ -592,7 +592,7 @@ CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacement
poolSize = MaxAdaptiveExecutorPoolSize; poolSize = MaxAdaptiveExecutorPoolSize;
} }
ExecuteTaskList(CMD_UTILITY, taskList, poolSize); ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize);
} }

View File

@ -69,7 +69,7 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the * start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting queries. The function first sets the localGroupId of the worker * worker for accepting queries. The function first sets the localGroupId of the worker
* so that the worker knows which tuple in pg_dist_node table represents itself. After * so that the worker knows which tuple in pg_dist_node table represents itself. After
* that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are * that, SQL statements for re-creating metadata of MX-eligible distributed tables are
* sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
* is marked as true. * is marked as true.
*/ */
@ -1188,7 +1188,7 @@ DetachPartitionCommandList(void)
/* /*
* We probably do not need this but as an extra precaution, we are enabling * We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to swtich back to original state. * DDL propagation to switch back to original state.
*/ */
detachPartitionCommandList = lappend(detachPartitionCommandList, detachPartitionCommandList = lappend(detachPartitionCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);

View File

@ -70,7 +70,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
query = copyObject(originalQuery); query = copyObject(originalQuery);
copiedInsertRte = ExtractInsertRangeTableEntry(query); copiedInsertRte = ExtractResultRelationRTE(query);
copiedSubqueryRte = ExtractSelectRangeTableEntry(query); copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
copiedSubquery = copiedSubqueryRte->subquery; copiedSubquery = copiedSubqueryRte->subquery;
@ -92,7 +92,8 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList);
} }
else if (task->upsertQuery || valuesRTE != NULL) else if (query->commandType == CMD_INSERT && (query->onConflict != NULL ||
valuesRTE != NULL))
{ {
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;

View File

@ -50,8 +50,8 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log
static uint64 NextPlanId = 1; static uint64 NextPlanId = 1;
/* local function forward declarations */
static bool ListContainsDistributedTableRTE(List *rangeTableList); static bool ListContainsDistributedTableRTE(List *rangeTableList);
static bool IsUpdateOrDelete(Query *query);
static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
Query *originalQuery, Query *query, Query *originalQuery, Query *query,
ParamListInfo boundParams, ParamListInfo boundParams,
@ -423,22 +423,6 @@ IsModifyCommand(Query *query)
} }
/*
* IsMultiShardModifyPlan returns true if the given plan was generated for
* multi shard update or delete query.
*/
bool
IsMultiShardModifyPlan(DistributedPlan *distributedPlan)
{
if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan))
{
return true;
}
return false;
}
/* /*
* IsMultiTaskPlan returns true if job contains multiple tasks. * IsMultiTaskPlan returns true if job contains multiple tasks.
*/ */
@ -457,19 +441,13 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
/* /*
* IsUpdateOrDelete returns true if the query performs update or delete. * IsUpdateOrDelete returns true if the query performs an update or delete.
*/ */
bool bool
IsUpdateOrDelete(DistributedPlan *distributedPlan) IsUpdateOrDelete(Query *query)
{ {
CmdType commandType = distributedPlan->operation; return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
return true;
}
return false;
} }
@ -480,15 +458,7 @@ IsUpdateOrDelete(DistributedPlan *distributedPlan)
bool bool
IsModifyDistributedPlan(DistributedPlan *distributedPlan) IsModifyDistributedPlan(DistributedPlan *distributedPlan)
{ {
bool isModifyDistributedPlan = false; return distributedPlan->modLevel > ROW_MODIFY_READONLY;
CmdType operation = distributedPlan->operation;
if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE)
{
isModifyDistributedPlan = true;
}
return isModifyDistributedPlan;
} }
@ -569,11 +539,12 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi
* query planning failed (possibly) due to prepared statement parameters or * query planning failed (possibly) due to prepared statement parameters or
* if it is planned as a multi shard modify query. * if it is planned as a multi shard modify query.
*/ */
if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) && if ((distributedPlan->planningError ||
(IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) &&
hasUnresolvedParams) hasUnresolvedParams)
{ {
/* /*
* Arbitraryly high cost, but low enough that it can be added up * Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan(). * without overflowing by choose_custom_plan().
*/ */
resultPlan->planTree->total_cost = FLT_MAX / 100000000; resultPlan->planTree->total_cost = FLT_MAX / 100000000;

View File

@ -85,7 +85,7 @@ InsertSelectIntoDistributedTable(Query *query)
if (insertSelectQuery) if (insertSelectQuery)
{ {
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
if (IsDistributedTable(insertRte->relid)) if (IsDistributedTable(insertRte->relid))
{ {
return true; return true;
@ -108,7 +108,7 @@ InsertSelectIntoLocalTable(Query *query)
if (insertSelectQuery) if (insertSelectQuery)
{ {
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
if (!IsDistributedTable(insertRte->relid)) if (!IsDistributedTable(insertRte->relid))
{ {
return true; return true;
@ -220,7 +220,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
Job *workerJob = NULL; Job *workerJob = NULL;
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery);
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
@ -230,7 +230,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allReferenceTables = relationRestrictionContext->allReferenceTables;
bool allDistributionKeysInQueryAreEqual = false; bool allDistributionKeysInQueryAreEqual = false;
distributedPlan->operation = originalQuery->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery);
/* /*
* Error semantics for INSERT ... SELECT queries are different than regular * Error semantics for INSERT ... SELECT queries are different than regular
@ -423,7 +423,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
bool safeToPushdownSubquery) bool safeToPushdownSubquery)
{ {
Query *copiedQuery = copyObject(originalQuery); Query *copiedQuery = copyObject(originalQuery);
RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery);
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery);
Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery;
@ -443,7 +443,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
uint64 jobId = INVALID_JOB_ID; uint64 jobId = INVALID_JOB_ID;
List *insertShardPlacementList = NULL; List *insertShardPlacementList = NULL;
List *intersectedPlacementList = NULL; List *intersectedPlacementList = NULL;
bool upsertQuery = false;
bool replacePrunedQueryWithDummy = false; bool replacePrunedQueryWithDummy = false;
bool allReferenceTables = bool allReferenceTables =
plannerRestrictionContext->relationRestrictionContext->allReferenceTables; plannerRestrictionContext->relationRestrictionContext->allReferenceTables;
@ -566,12 +565,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
/* this is required for correct deparsing of the query */ /* this is required for correct deparsing of the query */
ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte);
/* set the upsert flag */
if (originalQuery->onConflict != NULL)
{
upsertQuery = true;
}
/* setting an alias simplifies deparsing of RETURNING */ /* setting an alias simplifies deparsing of RETURNING */
if (copiedInsertRte->alias == NULL) if (copiedInsertRte->alias == NULL)
{ {
@ -589,7 +582,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
modifyTask->dependedTaskList = NULL; modifyTask->dependedTaskList = NULL;
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = upsertQuery;
modifyTask->relationShardList = relationShardList; modifyTask->relationShardList = relationShardList;
modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->replicationModel = cacheEntry->replicationModel;
@ -1138,11 +1130,11 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse)
Query *selectQuery = NULL; Query *selectQuery = NULL;
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
Oid targetRelationId = insertRte->relid; Oid targetRelationId = insertRte->relid;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
distributedPlan->operation = CMD_INSERT; distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery);
distributedPlan->planningError = distributedPlan->planningError =
CoordinatorInsertSelectSupported(insertSelectQuery); CoordinatorInsertSelectSupported(insertSelectQuery);
@ -1243,7 +1235,7 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery)
return deferredError; return deferredError;
} }
insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); insertRte = ExtractResultRelationRTE(insertSelectQuery);
if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND)
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
@ -1347,7 +1339,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
* then deparse it. * then deparse it.
*/ */
Query *insertResultQuery = copyObject(insertSelectQuery); Query *insertResultQuery = copyObject(insertSelectQuery);
RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertResultQuery); RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
@ -1441,7 +1433,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
modifyTask->dependedTaskList = NULL; modifyTask->dependedTaskList = NULL;
modifyTask->anchorShardId = shardId; modifyTask->anchorShardId = shardId;
modifyTask->taskPlacementList = insertShardPlacementList; modifyTask->taskPlacementList = insertShardPlacementList;
modifyTask->upsertQuery = insertResultQuery->onConflict != NULL;
modifyTask->relationShardList = list_make1(relationShard); modifyTask->relationShardList = list_make1(relationShard);
modifyTask->replicationModel = targetCacheEntry->replicationModel; modifyTask->replicationModel = targetCacheEntry->replicationModel;

View File

@ -228,7 +228,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree,
distributedPlan->workerJob = workerJob; distributedPlan->workerJob = workerJob;
distributedPlan->masterQuery = masterQuery; distributedPlan->masterQuery = masterQuery;
distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan);
distributedPlan->operation = CMD_SELECT; distributedPlan->modLevel = ROW_MODIFY_READONLY;
return distributedPlan; return distributedPlan;
} }
@ -2430,7 +2430,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
subqueryTask->dependedTaskList = NULL; subqueryTask->dependedTaskList = NULL;
subqueryTask->anchorShardId = anchorShardId; subqueryTask->anchorShardId = anchorShardId;
subqueryTask->taskPlacementList = selectPlacementList; subqueryTask->taskPlacementList = selectPlacementList;
subqueryTask->upsertQuery = false;
subqueryTask->relationShardList = relationShardList; subqueryTask->relationShardList = relationShardList;
return subqueryTask; return subqueryTask;
@ -4310,6 +4309,41 @@ GenerateSyntheticShardIntervalArray(int partitionCount)
} }
/*
* Determine RowModifyLevel required for given query
*/
RowModifyLevel
RowModifyLevelForQuery(Query *query)
{
CmdType commandType = query->commandType;
if (commandType == CMD_SELECT)
{
return ROW_MODIFY_READONLY;
}
if (commandType == CMD_INSERT)
{
if (query->onConflict == NULL)
{
return ROW_MODIFY_COMMUTATIVE;
}
else
{
return ROW_MODIFY_NONCOMMUTATIVE;
}
}
if (commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
return ROW_MODIFY_NONCOMMUTATIVE;
}
return ROW_MODIFY_NONE;
}
/* /*
* ColumnName resolves the given column's name. The given column could belong to * ColumnName resolves the given column's name. The given column could belong to
* a regular table or to an intermediate table formed to execute a distributed * a regular table or to an intermediate table formed to execute a distributed

View File

@ -165,7 +165,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
/* /*
* CreateRouterPlan attempts to create a router executor plan for the given * CreateRouterPlan attempts to create a router executor plan for the given
* SELECT statement. ->planningError is set if planning fails. * SELECT statement. ->planningError is set if planning fails.
*/ */
DistributedPlan * DistributedPlan *
CreateRouterPlan(Query *originalQuery, Query *query, CreateRouterPlan(Query *originalQuery, Query *query,
@ -186,8 +186,8 @@ CreateRouterPlan(Query *originalQuery, Query *query,
/* /*
* CreateModifyPlan attempts to create a plan the given modification * CreateModifyPlan attempts to create a plan for the given modification
* statement. If planning fails ->planningError is set to a description of * statement. If planning fails ->planningError is set to a description of
* the failure. * the failure.
*/ */
DistributedPlan * DistributedPlan *
@ -198,7 +198,7 @@ CreateModifyPlan(Query *originalQuery, Query *query,
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false; bool multiShardQuery = false;
distributedPlan->operation = query->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, distributedPlan->planningError = ModifyQuerySupported(query, originalQuery,
multiShardQuery, multiShardQuery,
@ -244,8 +244,8 @@ CreateModifyPlan(Query *originalQuery, Query *query,
* CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is
* either a modify task that changes a single shard, or a router task that returns * either a modify task that changes a single shard, or a router task that returns
* query results from a single worker. Supported modify queries (insert/update/delete) * query results from a single worker. Supported modify queries (insert/update/delete)
* are router plannable by default. If query is not router plannable then either NULL is * are router plannable by default. If query is not router plannable the returned plan
* returned, or the returned plan has planningError set to a description of the problem. * has planningError set to a description of the problem.
*/ */
static void static void
CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery, CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery,
@ -254,7 +254,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer
{ {
Job *job = NULL; Job *job = NULL;
distributedPlan->operation = query->commandType; distributedPlan->modLevel = RowModifyLevelForQuery(query);
/* we cannot have multi shard update/delete query via this code path */ /* we cannot have multi shard update/delete query via this code path */
job = RouterJob(originalQuery, plannerRestrictionContext, job = RouterJob(originalQuery, plannerRestrictionContext,
@ -491,7 +491,7 @@ ModifyQueryResultRelationId(Query *query)
errmsg("input query is not a modification query"))); errmsg("input query is not a modification query")));
} }
resultRte = ExtractInsertRangeTableEntry(query); resultRte = ExtractResultRelationRTE(query);
Assert(OidIsValid(resultRte->relid)); Assert(OidIsValid(resultRte->relid));
return resultRte->relid; return resultRte->relid;
@ -512,20 +512,12 @@ ResultRelationOidForQuery(Query *query)
/* /*
* ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. * ExtractResultRelationRTE returns the table's resultRelation range table entry.
* Note that the function expects and asserts that the input query be
* an INSERT...SELECT query.
*/ */
RangeTblEntry * RangeTblEntry *
ExtractInsertRangeTableEntry(Query *query) ExtractResultRelationRTE(Query *query)
{ {
int resultRelation = query->resultRelation; return rt_fetch(query->resultRelation, query->rtable);
List *rangeTableList = query->rtable;
RangeTblEntry *insertRTE = NULL;
insertRTE = rt_fetch(resultRelation, rangeTableList);
return insertRTE;
} }
@ -1086,7 +1078,7 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
{ {
/* /*
* Yes, so check each join alias var to see if any of them are not * Yes, so check each join alias var to see if any of them are not
* simple references to underlying columns. If so, we have a * simple references to underlying columns. If so, we have a
* dangerous situation and must pick unique aliases. * dangerous situation and must pick unique aliases.
*/ */
RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList); RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList);
@ -1129,14 +1121,8 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode)
bool bool
UpdateOrDeleteQuery(Query *query) UpdateOrDeleteQuery(Query *query)
{ {
CmdType commandType = query->commandType; return query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE;
if (commandType == CMD_UPDATE || commandType == CMD_DELETE)
{
return true;
}
return false;
} }
@ -1538,11 +1524,6 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->replicationModel = cacheEntry->replicationModel;
modifyTask->rowValuesLists = modifyRoute->rowValuesLists; modifyTask->rowValuesLists = modifyRoute->rowValuesLists;
if (query->onConflict != NULL)
{
modifyTask->upsertQuery = true;
}
insertTaskList = lappend(insertTaskList, modifyTask); insertTaskList = lappend(insertTaskList, modifyTask);
} }
@ -1572,7 +1553,6 @@ CreateTask(TaskType taskType)
task->shardInterval = NULL; task->shardInterval = NULL;
task->assignmentConstrained = false; task->assignmentConstrained = false;
task->taskExecution = NULL; task->taskExecution = NULL;
task->upsertQuery = false;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->relationRowLockList = NIL; task->relationRowLockList = NIL;
@ -1858,8 +1838,8 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
/* /*
* GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE.
* it returns it. * Returns that RTE if found.
*/ */
static RangeTblEntry * static RangeTblEntry *
GetUpdateOrDeleteRTE(Query *query) GetUpdateOrDeleteRTE(Query *query)

View File

@ -103,12 +103,12 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
DECLARE_FROM_AND_NEW_NODE(DistributedPlan); DECLARE_FROM_AND_NEW_NODE(DistributedPlan);
COPY_SCALAR_FIELD(planId); COPY_SCALAR_FIELD(planId);
COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(modLevel);
COPY_SCALAR_FIELD(hasReturning); COPY_SCALAR_FIELD(hasReturning);
COPY_SCALAR_FIELD(routerExecutable);
COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(workerJob);
COPY_NODE_FIELD(masterQuery); COPY_NODE_FIELD(masterQuery);
COPY_SCALAR_FIELD(routerExecutable);
COPY_SCALAR_FIELD(queryId); COPY_SCALAR_FIELD(queryId);
COPY_NODE_FIELD(relationIdList); COPY_NODE_FIELD(relationIdList);
@ -257,7 +257,6 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_NODE_FIELD(shardInterval); COPY_NODE_FIELD(shardInterval);
COPY_SCALAR_FIELD(assignmentConstrained); COPY_SCALAR_FIELD(assignmentConstrained);
COPY_NODE_FIELD(taskExecution); COPY_NODE_FIELD(taskExecution);
COPY_SCALAR_FIELD(upsertQuery);
COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(replicationModel);
COPY_SCALAR_FIELD(modifyWithSubquery); COPY_SCALAR_FIELD(modifyWithSubquery);
COPY_NODE_FIELD(relationShardList); COPY_NODE_FIELD(relationShardList);

View File

@ -176,12 +176,12 @@ OutDistributedPlan(OUTFUNC_ARGS)
WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_NODE_TYPE("DISTRIBUTEDPLAN");
WRITE_UINT64_FIELD(planId); WRITE_UINT64_FIELD(planId);
WRITE_INT_FIELD(operation); WRITE_ENUM_FIELD(modLevel, RowModifyLevel);
WRITE_BOOL_FIELD(hasReturning); WRITE_BOOL_FIELD(hasReturning);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(workerJob);
WRITE_NODE_FIELD(masterQuery); WRITE_NODE_FIELD(masterQuery);
WRITE_BOOL_FIELD(routerExecutable);
WRITE_UINT64_FIELD(queryId); WRITE_UINT64_FIELD(queryId);
WRITE_NODE_FIELD(relationIdList); WRITE_NODE_FIELD(relationIdList);
@ -467,7 +467,6 @@ OutTask(OUTFUNC_ARGS)
WRITE_NODE_FIELD(shardInterval); WRITE_NODE_FIELD(shardInterval);
WRITE_BOOL_FIELD(assignmentConstrained); WRITE_BOOL_FIELD(assignmentConstrained);
WRITE_NODE_FIELD(taskExecution); WRITE_NODE_FIELD(taskExecution);
WRITE_BOOL_FIELD(upsertQuery);
WRITE_CHAR_FIELD(replicationModel); WRITE_CHAR_FIELD(replicationModel);
WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_BOOL_FIELD(modifyWithSubquery);
WRITE_NODE_FIELD(relationShardList); WRITE_NODE_FIELD(relationShardList);

View File

@ -202,12 +202,12 @@ ReadDistributedPlan(READFUNC_ARGS)
READ_LOCALS(DistributedPlan); READ_LOCALS(DistributedPlan);
READ_UINT64_FIELD(planId); READ_UINT64_FIELD(planId);
READ_INT_FIELD(operation); READ_ENUM_FIELD(modLevel, RowModifyLevel);
READ_BOOL_FIELD(hasReturning); READ_BOOL_FIELD(hasReturning);
READ_BOOL_FIELD(routerExecutable);
READ_NODE_FIELD(workerJob); READ_NODE_FIELD(workerJob);
READ_NODE_FIELD(masterQuery); READ_NODE_FIELD(masterQuery);
READ_BOOL_FIELD(routerExecutable);
READ_UINT64_FIELD(queryId); READ_UINT64_FIELD(queryId);
READ_NODE_FIELD(relationIdList); READ_NODE_FIELD(relationIdList);
@ -381,7 +381,6 @@ ReadTask(READFUNC_ARGS)
READ_NODE_FIELD(shardInterval); READ_NODE_FIELD(shardInterval);
READ_BOOL_FIELD(assignmentConstrained); READ_BOOL_FIELD(assignmentConstrained);
READ_NODE_FIELD(taskExecution); READ_NODE_FIELD(taskExecution);
READ_BOOL_FIELD(upsertQuery);
READ_CHAR_FIELD(replicationModel); READ_CHAR_FIELD(replicationModel);
READ_BOOL_FIELD(modifyWithSubquery); READ_BOOL_FIELD(modifyWithSubquery);
READ_NODE_FIELD(relationShardList); READ_NODE_FIELD(relationShardList);

View File

@ -108,12 +108,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root,
JoinType jointype, JoinType jointype,
JoinPathExtraData *extra); JoinPathExtraData *extra);
extern bool IsModifyCommand(Query *query); extern bool IsModifyCommand(Query *query);
extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan);
extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan);
extern void EnsurePartitionTableNotReplicated(Oid relationId); extern void EnsurePartitionTableNotReplicated(Oid relationId);
extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan);
extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan);
extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList);
extern int GetRTEIdentity(RangeTblEntry *rte); extern int GetRTEIdentity(RangeTblEntry *rte);

View File

@ -38,13 +38,14 @@ extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);
extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count,
bool execute_once); bool execute_once);
extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node); extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node);
extern uint64 ExecuteTaskListExtended(CmdType operation, List *taskList, extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize); bool hasReturning, int targetPoolSize);
extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize,
bool forceSequentialExecution); bool forceSequentialExecution);
extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize); extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
targetPoolSize);
extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * CitusExecScan(CustomScanState *node);
extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);

View File

@ -110,6 +110,15 @@ typedef enum
} BoundaryNodeJobType; } BoundaryNodeJobType;
/* Enumeration that specifies extent of DML modifications */
typedef enum RowModifyLevel
{
ROW_MODIFY_NONE = 0,
ROW_MODIFY_READONLY = 1,
ROW_MODIFY_COMMUTATIVE = 2,
ROW_MODIFY_NONCOMMUTATIVE = 3
} RowModifyLevel;
/* /*
* Job represents a logical unit of work that contains one set of data transfers * Job represents a logical unit of work that contains one set of data transfers
* in our physical plan. The physical planner maps each SQL query into one or * in our physical plan. The physical planner maps each SQL query into one or
@ -183,7 +192,6 @@ typedef struct Task
ShardInterval *shardInterval; /* only applies to merge tasks */ ShardInterval *shardInterval; /* only applies to merge tasks */
bool assignmentConstrained; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */
TaskExecution *taskExecution; /* used by task tracker executor */ TaskExecution *taskExecution; /* used by task tracker executor */
bool upsertQuery; /* only applies to modify tasks */
char replicationModel; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */
List *relationRowLockList; List *relationRowLockList;
@ -229,21 +237,21 @@ typedef struct DistributedPlan
/* unique identifier of the plan within the session */ /* unique identifier of the plan within the session */
uint64 planId; uint64 planId;
/* type of command to execute (SELECT/INSERT/...) */ /* specifies nature of modifications in query */
CmdType operation; RowModifyLevel modLevel;
/* specifies whether a DML command has a RETURNING */ /* specifies whether a DML command has a RETURNING */
bool hasReturning; bool hasReturning;
/* a router executable query is executed entirely on a worker */
bool routerExecutable;
/* job tree containing the tasks to be executed on workers */ /* job tree containing the tasks to be executed on workers */
Job *workerJob; Job *workerJob;
/* local query that merges results from the workers */ /* local query that merges results from the workers */
Query *masterQuery; Query *masterQuery;
/* a router executable query is executed entirely on a worker */
bool routerExecutable;
/* query identifier (copied from the top-level PlannedStmt) */ /* query identifier (copied from the top-level PlannedStmt) */
uint64 queryId; uint64 queryId;
@ -340,6 +348,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
ShardInterval *secondInterval); ShardInterval *secondInterval);
extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId);
extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount);
extern RowModifyLevel RowModifyLevelForQuery(Query *query);
/* function declarations for Task and Task list operations */ /* function declarations for Task and Task list operations */

View File

@ -46,15 +46,15 @@ extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList,
bool isModificationQuery, bool expectResults); bool isModificationQuery, bool expectResults);
int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList,
CmdType operation, bool hasReturning); RowModifyLevel modLevel, bool hasReturning);
extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksWithoutResults(List *taskList);
extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList,
CmdType operation); RowModifyLevel modLevel);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);
/* helper functions */ /* helper functions */
extern void AcquireExecutorShardLock(Task *task, CmdType commandType); extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
extern void AcquireExecutorMultiShardLocks(List *taskList); extern void AcquireExecutorMultiShardLocks(List *taskList);
extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement,
ShardPlacementAccessType accessType); ShardPlacementAccessType accessType);

View File

@ -61,7 +61,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext(
extern Oid ExtractFirstDistributedTableId(Query *query); extern Oid ExtractFirstDistributedTableId(Query *query);
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
extern Oid ModifyQueryResultRelationId(Query *query); extern Oid ModifyQueryResultRelationId(Query *query);
extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); extern RangeTblEntry * ExtractResultRelationRTE(Query *query);
extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query);
extern bool IsMultiRowInsert(Query *query); extern bool IsMultiRowInsert(Query *query);
extern void AddShardIntervalRestrictionToSelect(Query *subqery, extern void AddShardIntervalRestrictionToSelect(Query *subqery,

View File

@ -17,6 +17,8 @@ multi_subtransactions
multi_modifying_xacts multi_modifying_xacts
multi_insert_select multi_insert_select
sql_procedure sql_procedure
multi_reference_table
multi_create_table_constraints
# the following tests' output are # the following tests' output are
# normalized for EXPLAIN outputs # normalized for EXPLAIN outputs