diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8c0567561..72b7e3ae4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -158,8 +158,8 @@ */ typedef struct DistributedExecution { - /* the corresponding distributed plan's operation */ - CmdType operation; + /* the corresponding distributed plan's modLevel */ + RowModifyLevel modLevel; List *tasksToExecute; @@ -504,7 +504,7 @@ int ExecutorSlowStartInterval = 10; /* local functions */ -static DistributedExecution * CreateDistributedExecution(CmdType operation, +static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, @@ -520,16 +520,16 @@ static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); -static void AcquireExecutorShardLocks(DistributedExecution *execution); +static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool DistributedPlanModifiesDatabase(DistributedPlan *plan); -static bool TaskListModifiesDatabase(CmdType operation, List *taskList); +static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); -static bool SelectForUpdateOnReferenceTable(CmdType operation, List *taskList); +static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static void AssignTasksToConnections(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); -static PlacementExecutionOrder ExecutionOrderForTask(CmdType operation, Task *task); +static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, @@ -610,7 +610,7 @@ AdaptiveExecutor(CustomScanState *node) targetPoolSize = 1; } - execution = CreateDistributedExecution(distributedPlan->operation, taskList, + execution = CreateDistributedExecution(distributedPlan->modLevel, taskList, distributedPlan->hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize); @@ -626,7 +626,7 @@ AdaptiveExecutor(CustomScanState *node) RunDistributedExecution(execution); } - if (distributedPlan->operation != CMD_SELECT) + if (distributedPlan->modLevel != ROW_MODIFY_READONLY) { executorState->es_processed = execution->rowsProcessed; } @@ -654,14 +654,14 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, { if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE) { - ExecuteTaskList(CMD_UTILITY, taskList, targetPoolSize); + ExecuteTaskList(ROW_MODIFY_NONE, taskList, targetPoolSize); } else { if (MultiShardConnectionType == SEQUENTIAL_CONNECTION || forceSequentialExecution) { - ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY); + ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE); } else { @@ -676,13 +676,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, * for some of the arguments. */ uint64 -ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; bool hasReturning = false; - return ExecuteTaskListExtended(operation, taskList, tupleDescriptor, + return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize); } @@ -692,7 +692,7 @@ ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) * runs it. */ uint64 -ExecuteTaskListExtended(CmdType operation, List *taskList, +ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize) { @@ -705,7 +705,7 @@ ExecuteTaskListExtended(CmdType operation, List *taskList, } execution = - CreateDistributedExecution(operation, taskList, hasReturning, paramListInfo, + CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize); StartDistributedExecution(execution); @@ -721,14 +721,14 @@ ExecuteTaskListExtended(CmdType operation, List *taskList, * a distributed plan. */ DistributedExecution * -CreateDistributedExecution(CmdType operation, List *taskList, bool hasReturning, +CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); - execution->operation = operation; + execution->modLevel = modLevel; execution->tasksToExecute = taskList; execution->hasReturning = hasReturning; @@ -823,7 +823,7 @@ StartDistributedExecution(DistributedExecution *execution) } /* prevent unsafe concurrent modifications */ - AcquireExecutorShardLocks(execution); + AcquireExecutorShardLocksForExecution(execution); } @@ -834,7 +834,7 @@ StartDistributedExecution(DistributedExecution *execution) static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution) { - return TaskListModifiesDatabase(execution->operation, execution->tasksToExecute); + return TaskListModifiesDatabase(execution->modLevel, execution->tasksToExecute); } @@ -845,7 +845,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) static bool DistributedPlanModifiesDatabase(DistributedPlan *plan) { - return TaskListModifiesDatabase(plan->operation, plan->workerJob->taskList); + return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList); } @@ -854,22 +854,22 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan) * DistributedPlanModifiesDatabase. */ static bool -TaskListModifiesDatabase(CmdType operation, List *taskList) +TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList) { Task *firstTask = NULL; - if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) + if (modLevel > ROW_MODIFY_READONLY) { return true; } /* - * If we cannot decide by only checking the operation, we should look closer - * to the tasks. + * If we cannot decide by only checking the row modify level, + * we should look closer to the tasks. */ if (list_length(taskList) < 1) { - /* does this ever possible? */ + /* is this ever possible? */ return false; } @@ -968,12 +968,12 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution) * that contains FOR UPDATE clause that locks any reference tables. */ static bool -SelectForUpdateOnReferenceTable(CmdType operation, List *taskList) +SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) { Task *task = NULL; ListCell *rtiLockCell = NULL; - if (operation != CMD_SELECT) + if (modLevel != ROW_MODIFY_READONLY) { return false; } @@ -1025,8 +1025,8 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) /* - * AcquireExecutorShardLocks acquires advisory lock on shard IDs to prevent - * unsafe concurrent modifications of shards. + * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs + * to prevent unsafe concurrent modifications of shards. * * We prevent concurrent modifications of shards in two cases: * 1. Any non-commutative writes to a replicated table @@ -1042,13 +1042,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) * TRUNCATE because the table locks already prevent concurrent access. */ static void -AcquireExecutorShardLocks(DistributedExecution *execution) +AcquireExecutorShardLocksForExecution(DistributedExecution *execution) { - CmdType operation = execution->operation; + RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; - if (!(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE || - SelectForUpdateOnReferenceTable(operation, taskList))) + if (modLevel <= ROW_MODIFY_READONLY && + !SelectForUpdateOnReferenceTable(modLevel, taskList)) { /* * Executor locks only apply to DML commands and SELECT FOR UPDATE queries @@ -1069,7 +1069,7 @@ AcquireExecutorShardLocks(DistributedExecution *execution) { Task *task = (Task *) lfirst(taskCell); - AcquireExecutorShardLock(task, operation); + AcquireExecutorShardLocks(task, modLevel); } } else if (list_length(taskList) > 1) @@ -1211,7 +1211,7 @@ UnclaimAllSessionConnections(List *sessionList) static void AssignTasksToConnections(DistributedExecution *execution) { - CmdType operation = execution->operation; + RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; bool hasReturning = execution->hasReturning; @@ -1233,13 +1233,15 @@ AssignTasksToConnections(DistributedExecution *execution) shardCommandExecution = (ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution)); shardCommandExecution->task = task; - shardCommandExecution->executionOrder = ExecutionOrderForTask(operation, task); + shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task); shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; shardCommandExecution->placementExecutions = (TaskPlacementExecution **) palloc0(placementExecutionCount * sizeof(TaskPlacementExecution *)); shardCommandExecution->placementExecutionCount = placementExecutionCount; - shardCommandExecution->expectResults = hasReturning || operation == CMD_SELECT; + + shardCommandExecution->expectResults = hasReturning || + modLevel == ROW_MODIFY_READONLY; foreach(taskPlacementCell, task->taskPlacementList) @@ -1390,7 +1392,7 @@ UseConnectionPerPlacement(void) * ExecutionOrderForTask gives the appropriate execution order for a task. */ static PlacementExecutionOrder -ExecutionOrderForTask(CmdType operation, Task *task) +ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) { switch (task->taskType) { @@ -1402,7 +1404,14 @@ ExecutionOrderForTask(CmdType operation, Task *task) case MODIFY_TASK: { - if (operation == CMD_INSERT && !task->upsertQuery) + /* + * For non-commutative modifications we take aggressive locks, so + * there is no risk of deadlock and we can run them in parallel. + * When the modification is commutative, we take no additional + * locks, so we take a conservative approach and execute sequentially + * to avoid deadlocks. + */ + if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) { return EXECUTION_ORDER_SEQUENTIAL; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 6399c3e57..e320dd2a3 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -163,7 +163,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) scanState = (CitusScanState *) node; distributedPlan = scanState->distributedPlan; - if (distributedPlan->operation == CMD_SELECT || + if (distributedPlan->modLevel == ROW_MODIFY_READONLY || distributedPlan->insertSelectSubquery != NULL) { /* no more action required */ diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4625ae6ab..829a96d81 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -133,7 +133,8 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { ExecuteModifyTasksSequentially(scanState, prunedTaskList, - CMD_INSERT, hasReturning); + ROW_MODIFY_COMMUTATIVE, + hasReturning); } else { @@ -151,7 +152,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExecuteTaskListExtended(CMD_INSERT, prunedTaskList, + ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList, tupleDescriptor, scanState->tuplestorestate, hasReturning, MaxAdaptiveExecutorPoolSize); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2e3176080..debc03ba3 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -91,15 +91,17 @@ bool SortReturning = false; /* functions needed during run phase */ static void AcquireMetadataLocks(List *taskList); -static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType - operation, bool alwaysThrowErrorOnFailure, bool - expectResults); +static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, + RowModifyLevel modLevel, + bool alwaysThrowErrorOnFailure, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * BuildPlacementAccessList(int32 groupId, List *relationShardList, ShardPlacementAccessType accessType); static List * GetModifyConnections(Task *task, bool markCritical); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, CitusScanState *scanState); +static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); +static void AcquireExecutorShardLocksForRelationRowLockList(Task *task); static bool RequiresConsistentSnapshot(Task *task); static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node); @@ -138,27 +140,18 @@ AcquireMetadataLocks(List *taskList) } -/* - * AcquireExecutorShardLock acquires a lock on the shard for the given task and - * command type if necessary to avoid divergence between multiple replicas of - * the same shard. No lock is obtained when there is only one replica. - * - * The function determines the appropriate lock mode based on the commutativity - * rule of the command. In each case, it uses a lock mode that enforces the - * commutativity rule. - * - * The mapping is overridden when all_modifications_commutative is set to true. - * In that case, all modifications are treated as commutative, which can be used - * to communicate that the application is only generating commutative - * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. - */ -void -AcquireExecutorShardLock(Task *task, CmdType commandType) +static void +AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel) { LOCKMODE lockMode = NoLock; int64 shardId = task->anchorShardId; - if (commandType == CMD_SELECT) + if (shardId == INVALID_SHARD_ID) + { + return; + } + + if (modLevel <= ROW_MODIFY_READONLY) { /* * The executor shard lock is used to maintain consistency between @@ -205,24 +198,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) lockMode = RowExclusiveLock; } - else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - /* - * UPDATE/DELETE/UPSERT commands do not commute with other modifications - * since the rows modified by one command may be affected by the outcome - * of another command. - * - * We need to handle upsert before INSERT, because PostgreSQL models - * upsert commands as INSERT with an ON CONFLICT section. - * - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - lockMode = ExclusiveLock; - } - else if (commandType == CMD_INSERT) + else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) { /* * An INSERT commutes with other INSERT commands, since performing them @@ -245,16 +221,34 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } else { - ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); + /* + * UPDATE/DELETE/UPSERT commands do not commute with other modifications + * since the rows modified by one command may be affected by the outcome + * of another command. + * + * We need to handle upsert before INSERT, because PostgreSQL models + * upsert commands as INSERT with an ON CONFLICT section. + * + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + + lockMode = ExclusiveLock; } - if (shardId != INVALID_SHARD_ID && lockMode != NoLock) + if (lockMode != NoLock) { ShardInterval *shardInterval = LoadShardInterval(shardId); SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); } +} + +static void +AcquireExecutorShardLocksForRelationRowLockList(Task *task) +{ /* * If lock clause exists and it effects any reference table, we need to get * lock on shard resource. Type of lock is determined by the type of row lock @@ -301,6 +295,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } } } +} + + +/* + * AcquireExecutorShardLocks acquires locks on shards for the given task if + * necessary to avoid divergence between multiple replicas of the same shard. + * No lock is obtained when there is only one replica. + * + * The function determines the appropriate lock mode based on the commutativity + * rule of the command. In each case, it uses a lock mode that enforces the + * commutativity rule. + * + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. + */ +void +AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) +{ + AcquireExecutorShardLockForRowModify(task, modLevel); + AcquireExecutorShardLocksForRelationRowLockList(task); /* * If the task has a subselect, then we may need to lock the shards from which @@ -717,16 +733,16 @@ RouterSequentialModifyExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; + RowModifyLevel modLevel = distributedPlan->modLevel; bool hasReturning = distributedPlan->hasReturning; Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; EState *executorState = ScanStateGetExecutorState(scanState); - CmdType operation = scanState->distributedPlan->operation; Assert(!scanState->finishedRemoteScan); executorState->es_processed += - ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning); + ExecuteModifyTasksSequentially(scanState, taskList, modLevel, hasReturning); } @@ -1056,7 +1072,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * The function returns affectedTupleCount if applicable. */ static int64 -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, RowModifyLevel modLevel, bool alwaysThrowErrorOnFailure, bool expectResults) { EState *executorState = NULL; @@ -1107,10 +1123,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * acquire the necessary locks on the relations, and blocks any * unsafe concurrent operations. */ - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE || operation == CMD_SELECT) + if (modLevel > ROW_MODIFY_NONE) { - AcquireExecutorShardLock(task, operation); + AcquireExecutorShardLocks(task, modLevel); } /* try to execute modification on all placements */ @@ -1356,9 +1371,9 @@ ExecuteModifyTasksWithoutResults(List *taskList) * and ignores the results. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, RowModifyLevel modLevel) { - return ExecuteModifyTasksSequentially(NULL, taskList, operation, false); + return ExecuteModifyTasksSequentially(NULL, taskList, modLevel, false); } @@ -1373,7 +1388,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) */ int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, - CmdType operation, bool hasReturning) + RowModifyLevel modLevel, bool hasReturning) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -1422,7 +1437,7 @@ ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, bool expectResults = (hasReturning || task->relationRowLockList != NIL); affectedTupleCount += - ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure, + ExecuteSingleModifyTask(scanState, task, modLevel, alwaysThrowErrorOnFailure, expectResults); } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index e54ca4358..fff0fa42f 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -95,7 +95,7 @@ JobExecutorType(DistributedPlan *distributedPlan) return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; } - Assert(distributedPlan->operation == CMD_SELECT); + Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY); workerNodeList = ActiveReadableNodeList(); workerNodeCount = list_length(workerNodeList); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f34cd5e1e..49d3f7bd4 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -592,7 +592,7 @@ CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacement poolSize = MaxAdaptiveExecutorPoolSize; } - ExecuteTaskList(CMD_UTILITY, taskList, poolSize); + ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index ef2b75d39..7792d6d07 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -69,7 +69,7 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); * start_metadata_sync_to_node function creates the metadata in a worker for preparing the * worker for accepting queries. The function first sets the localGroupId of the worker * so that the worker knows which tuple in pg_dist_node table represents itself. After - * that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are + * that, SQL statements for re-creating metadata of MX-eligible distributed tables are * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node * is marked as true. */ @@ -1188,7 +1188,7 @@ DetachPartitionCommandList(void) /* * We probably do not need this but as an extra precaution, we are enabling - * DDL propagation to swtich back to original state. + * DDL propagation to switch back to original state. */ detachPartitionCommandList = lappend(detachPartitionCommandList, ENABLE_DDL_PROPAGATION); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index e44ceeb04..1f906fb26 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -70,7 +70,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) query = copyObject(originalQuery); - copiedInsertRte = ExtractInsertRangeTableEntry(query); + copiedInsertRte = ExtractResultRelationRTE(query); copiedSubqueryRte = ExtractSelectRangeTableEntry(query); copiedSubquery = copiedSubqueryRte->subquery; @@ -92,7 +92,8 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); } - else if (task->upsertQuery || valuesRTE != NULL) + else if (query->commandType == CMD_INSERT && (query->onConflict != NULL || + valuesRTE != NULL)) { RangeTblEntry *rangeTableEntry = NULL; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 88c705549..a769e9b78 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -50,8 +50,8 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log static uint64 NextPlanId = 1; -/* local function forward declarations */ static bool ListContainsDistributedTableRTE(List *rangeTableList); +static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, @@ -423,22 +423,6 @@ IsModifyCommand(Query *query) } -/* - * IsMultiShardModifyPlan returns true if the given plan was generated for - * multi shard update or delete query. - */ -bool -IsMultiShardModifyPlan(DistributedPlan *distributedPlan) -{ - if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan)) - { - return true; - } - - return false; -} - - /* * IsMultiTaskPlan returns true if job contains multiple tasks. */ @@ -457,19 +441,13 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) /* - * IsUpdateOrDelete returns true if the query performs update or delete. + * IsUpdateOrDelete returns true if the query performs an update or delete. */ bool -IsUpdateOrDelete(DistributedPlan *distributedPlan) +IsUpdateOrDelete(Query *query) { - CmdType commandType = distributedPlan->operation; - - if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - return true; - } - - return false; + return query->commandType == CMD_UPDATE || + query->commandType == CMD_DELETE; } @@ -480,15 +458,7 @@ IsUpdateOrDelete(DistributedPlan *distributedPlan) bool IsModifyDistributedPlan(DistributedPlan *distributedPlan) { - bool isModifyDistributedPlan = false; - CmdType operation = distributedPlan->operation; - - if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) - { - isModifyDistributedPlan = true; - } - - return isModifyDistributedPlan; + return distributedPlan->modLevel > ROW_MODIFY_READONLY; } @@ -569,11 +539,12 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi * query planning failed (possibly) due to prepared statement parameters or * if it is planned as a multi shard modify query. */ - if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) && + if ((distributedPlan->planningError || + (IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && hasUnresolvedParams) { /* - * Arbitraryly high cost, but low enough that it can be added up + * Arbitrarily high cost, but low enough that it can be added up * without overflowing by choose_custom_plan(). */ resultPlan->planTree->total_cost = FLT_MAX / 100000000; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 6f097fe30..96e1620a6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -85,7 +85,7 @@ InsertSelectIntoDistributedTable(Query *query) if (insertSelectQuery) { - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); + RangeTblEntry *insertRte = ExtractResultRelationRTE(query); if (IsDistributedTable(insertRte->relid)) { return true; @@ -108,7 +108,7 @@ InsertSelectIntoLocalTable(Query *query) if (insertSelectQuery) { - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); + RangeTblEntry *insertRte = ExtractResultRelationRTE(query); if (!IsDistributedTable(insertRte->relid)) { return true; @@ -220,7 +220,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, Job *workerJob = NULL; uint64 jobId = INVALID_JOB_ID; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); Oid targetRelationId = insertRte->relid; DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); @@ -230,7 +230,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allDistributionKeysInQueryAreEqual = false; - distributedPlan->operation = originalQuery->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery); /* * Error semantics for INSERT ... SELECT queries are different than regular @@ -423,7 +423,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter bool safeToPushdownSubquery) { Query *copiedQuery = copyObject(originalQuery); - RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); + RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; @@ -443,7 +443,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter uint64 jobId = INVALID_JOB_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; - bool upsertQuery = false; bool replacePrunedQueryWithDummy = false; bool allReferenceTables = plannerRestrictionContext->relationRestrictionContext->allReferenceTables; @@ -566,12 +565,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter /* this is required for correct deparsing of the query */ ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); - /* set the upsert flag */ - if (originalQuery->onConflict != NULL) - { - upsertQuery = true; - } - /* setting an alias simplifies deparsing of RETURNING */ if (copiedInsertRte->alias == NULL) { @@ -589,7 +582,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask->dependedTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->upsertQuery = upsertQuery; modifyTask->relationShardList = relationShardList; modifyTask->replicationModel = cacheEntry->replicationModel; @@ -1138,11 +1130,11 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) Query *selectQuery = NULL; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - distributedPlan->operation = CMD_INSERT; + distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); distributedPlan->planningError = CoordinatorInsertSelectSupported(insertSelectQuery); @@ -1243,7 +1235,7 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery) return deferredError; } - insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + insertRte = ExtractResultRelationRTE(insertSelectQuery); if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1347,7 +1339,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, * then deparse it. */ Query *insertResultQuery = copyObject(insertSelectQuery); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertResultQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); @@ -1441,7 +1433,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, modifyTask->dependedTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->upsertQuery = insertResultQuery->onConflict != NULL; modifyTask->relationShardList = list_make1(relationShard); modifyTask->replicationModel = targetCacheEntry->replicationModel; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c1f949760..66d380356 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -228,7 +228,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, distributedPlan->workerJob = workerJob; distributedPlan->masterQuery = masterQuery; distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); - distributedPlan->operation = CMD_SELECT; + distributedPlan->modLevel = ROW_MODIFY_READONLY; return distributedPlan; } @@ -2430,7 +2430,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, subqueryTask->dependedTaskList = NULL; subqueryTask->anchorShardId = anchorShardId; subqueryTask->taskPlacementList = selectPlacementList; - subqueryTask->upsertQuery = false; subqueryTask->relationShardList = relationShardList; return subqueryTask; @@ -4310,6 +4309,41 @@ GenerateSyntheticShardIntervalArray(int partitionCount) } +/* + * Determine RowModifyLevel required for given query + */ +RowModifyLevel +RowModifyLevelForQuery(Query *query) +{ + CmdType commandType = query->commandType; + + if (commandType == CMD_SELECT) + { + return ROW_MODIFY_READONLY; + } + + if (commandType == CMD_INSERT) + { + if (query->onConflict == NULL) + { + return ROW_MODIFY_COMMUTATIVE; + } + else + { + return ROW_MODIFY_NONCOMMUTATIVE; + } + } + + if (commandType == CMD_UPDATE || + commandType == CMD_DELETE) + { + return ROW_MODIFY_NONCOMMUTATIVE; + } + + return ROW_MODIFY_NONE; +} + + /* * ColumnName resolves the given column's name. The given column could belong to * a regular table or to an intermediate table formed to execute a distributed diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1bd4cfaf3..d6dfca007 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -165,7 +165,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, /* * CreateRouterPlan attempts to create a router executor plan for the given - * SELECT statement. ->planningError is set if planning fails. + * SELECT statement. ->planningError is set if planning fails. */ DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, @@ -186,8 +186,8 @@ CreateRouterPlan(Query *originalQuery, Query *query, /* - * CreateModifyPlan attempts to create a plan the given modification - * statement. If planning fails ->planningError is set to a description of + * CreateModifyPlan attempts to create a plan for the given modification + * statement. If planning fails ->planningError is set to a description of * the failure. */ DistributedPlan * @@ -198,7 +198,7 @@ CreateModifyPlan(Query *originalQuery, Query *query, DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); bool multiShardQuery = false; - distributedPlan->operation = query->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(query); distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, multiShardQuery, @@ -244,8 +244,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * either a modify task that changes a single shard, or a router task that returns * query results from a single worker. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable then either NULL is - * returned, or the returned plan has planningError set to a description of the problem. + * are router plannable by default. If query is not router plannable the returned plan + * has planningError set to a description of the problem. */ static void CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery, @@ -254,7 +254,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer { Job *job = NULL; - distributedPlan->operation = query->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(query); /* we cannot have multi shard update/delete query via this code path */ job = RouterJob(originalQuery, plannerRestrictionContext, @@ -491,7 +491,7 @@ ModifyQueryResultRelationId(Query *query) errmsg("input query is not a modification query"))); } - resultRte = ExtractInsertRangeTableEntry(query); + resultRte = ExtractResultRelationRTE(query); Assert(OidIsValid(resultRte->relid)); return resultRte->relid; @@ -512,20 +512,12 @@ ResultRelationOidForQuery(Query *query) /* - * ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. - * Note that the function expects and asserts that the input query be - * an INSERT...SELECT query. + * ExtractResultRelationRTE returns the table's resultRelation range table entry. */ RangeTblEntry * -ExtractInsertRangeTableEntry(Query *query) +ExtractResultRelationRTE(Query *query) { - int resultRelation = query->resultRelation; - List *rangeTableList = query->rtable; - RangeTblEntry *insertRTE = NULL; - - insertRTE = rt_fetch(resultRelation, rangeTableList); - - return insertRTE; + return rt_fetch(query->resultRelation, query->rtable); } @@ -1086,7 +1078,7 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) { /* * Yes, so check each join alias var to see if any of them are not - * simple references to underlying columns. If so, we have a + * simple references to underlying columns. If so, we have a * dangerous situation and must pick unique aliases. */ RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList); @@ -1129,14 +1121,8 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) bool UpdateOrDeleteQuery(Query *query) { - CmdType commandType = query->commandType; - - if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - return true; - } - - return false; + return query->commandType == CMD_UPDATE || + query->commandType == CMD_DELETE; } @@ -1538,11 +1524,6 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->rowValuesLists = modifyRoute->rowValuesLists; - if (query->onConflict != NULL) - { - modifyTask->upsertQuery = true; - } - insertTaskList = lappend(insertTaskList, modifyTask); } @@ -1572,7 +1553,6 @@ CreateTask(TaskType taskType) task->shardInterval = NULL; task->assignmentConstrained = false; task->taskExecution = NULL; - task->upsertQuery = false; task->replicationModel = REPLICATION_MODEL_INVALID; task->relationRowLockList = NIL; @@ -1858,8 +1838,8 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, /* - * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds - * it returns it. + * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. + * Returns that RTE if found. */ static RangeTblEntry * GetUpdateOrDeleteRTE(Query *query) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 6d302e441..0f98934fb 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -103,12 +103,12 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) DECLARE_FROM_AND_NEW_NODE(DistributedPlan); COPY_SCALAR_FIELD(planId); - COPY_SCALAR_FIELD(operation); + COPY_SCALAR_FIELD(modLevel); COPY_SCALAR_FIELD(hasReturning); + COPY_SCALAR_FIELD(routerExecutable); COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(masterQuery); - COPY_SCALAR_FIELD(routerExecutable); COPY_SCALAR_FIELD(queryId); COPY_NODE_FIELD(relationIdList); @@ -257,7 +257,6 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(shardInterval); COPY_SCALAR_FIELD(assignmentConstrained); COPY_NODE_FIELD(taskExecution); - COPY_SCALAR_FIELD(upsertQuery); COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(modifyWithSubquery); COPY_NODE_FIELD(relationShardList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 94caafa0e..bc92940b9 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -176,12 +176,12 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_UINT64_FIELD(planId); - WRITE_INT_FIELD(operation); + WRITE_ENUM_FIELD(modLevel, RowModifyLevel); WRITE_BOOL_FIELD(hasReturning); + WRITE_BOOL_FIELD(routerExecutable); WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); - WRITE_BOOL_FIELD(routerExecutable); WRITE_UINT64_FIELD(queryId); WRITE_NODE_FIELD(relationIdList); @@ -467,7 +467,6 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(shardInterval); WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); - WRITE_BOOL_FIELD(upsertQuery); WRITE_CHAR_FIELD(replicationModel); WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_NODE_FIELD(relationShardList); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 93df55fab..a15c5d0f6 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -202,12 +202,12 @@ ReadDistributedPlan(READFUNC_ARGS) READ_LOCALS(DistributedPlan); READ_UINT64_FIELD(planId); - READ_INT_FIELD(operation); + READ_ENUM_FIELD(modLevel, RowModifyLevel); READ_BOOL_FIELD(hasReturning); + READ_BOOL_FIELD(routerExecutable); READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); - READ_BOOL_FIELD(routerExecutable); READ_UINT64_FIELD(queryId); READ_NODE_FIELD(relationIdList); @@ -381,7 +381,6 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(shardInterval); READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); - READ_BOOL_FIELD(upsertQuery); READ_CHAR_FIELD(replicationModel); READ_BOOL_FIELD(modifyWithSubquery); READ_NODE_FIELD(relationShardList); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 50daeb93f..927cef48b 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -108,12 +108,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root, JoinType jointype, JoinPathExtraData *extra); extern bool IsModifyCommand(Query *query); -extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern void EnsurePartitionTableNotReplicated(Oid relationId); extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); -extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9d2358a5c..ed8989bd5 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -38,13 +38,14 @@ extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node); -extern uint64 ExecuteTaskListExtended(CmdType operation, List *taskList, +extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize); extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, bool forceSequentialExecution); -extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize); +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int + targetPoolSize); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 8a573a079..cabb46df0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -110,6 +110,15 @@ typedef enum } BoundaryNodeJobType; +/* Enumeration that specifies extent of DML modifications */ +typedef enum RowModifyLevel +{ + ROW_MODIFY_NONE = 0, + ROW_MODIFY_READONLY = 1, + ROW_MODIFY_COMMUTATIVE = 2, + ROW_MODIFY_NONCOMMUTATIVE = 3 +} RowModifyLevel; + /* * Job represents a logical unit of work that contains one set of data transfers * in our physical plan. The physical planner maps each SQL query into one or @@ -183,7 +192,6 @@ typedef struct Task ShardInterval *shardInterval; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ - bool upsertQuery; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */ List *relationRowLockList; @@ -229,21 +237,21 @@ typedef struct DistributedPlan /* unique identifier of the plan within the session */ uint64 planId; - /* type of command to execute (SELECT/INSERT/...) */ - CmdType operation; + /* specifies nature of modifications in query */ + RowModifyLevel modLevel; /* specifies whether a DML command has a RETURNING */ bool hasReturning; + /* a router executable query is executed entirely on a worker */ + bool routerExecutable; + /* job tree containing the tasks to be executed on workers */ Job *workerJob; /* local query that merges results from the workers */ Query *masterQuery; - /* a router executable query is executed entirely on a worker */ - bool routerExecutable; - /* query identifier (copied from the top-level PlannedStmt) */ uint64 queryId; @@ -340,6 +348,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); +extern RowModifyLevel RowModifyLevelForQuery(Query *query); /* function declarations for Task and Task list operations */ diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b92c46d30..c0d7e80e7 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -46,15 +46,15 @@ extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, - CmdType operation, bool hasReturning); + RowModifyLevel modLevel, bool hasReturning); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, - CmdType operation); + RowModifyLevel modLevel); extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); /* helper functions */ -extern void AcquireExecutorShardLock(Task *task, CmdType commandType); +extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel); extern void AcquireExecutorMultiShardLocks(List *taskList); extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 4ba110e2d..8a868dfca 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -61,7 +61,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext( extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern Oid ModifyQueryResultRelationId(Query *query); -extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); +extern RangeTblEntry * ExtractResultRelationRTE(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, diff --git a/src/test/regress/bin/normalized_tests.lst b/src/test/regress/bin/normalized_tests.lst index b1408edb7..39cf8803b 100644 --- a/src/test/regress/bin/normalized_tests.lst +++ b/src/test/regress/bin/normalized_tests.lst @@ -17,6 +17,8 @@ multi_subtransactions multi_modifying_xacts multi_insert_select sql_procedure +multi_reference_table +multi_create_table_constraints # the following tests' output are # normalized for EXPLAIN outputs