From 09150273895cd05359f71f9aa058982c509c139e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 27 Jun 2019 13:22:32 +0200 Subject: [PATCH] DistributedPlan: replace operation with modLevel This causes no behaviorial changes, only organizes better to implement modifying CTEs Also rename ExtactInsertRangeTableEntry to ExtractResultRelationRTE, as the source of this function didn't match the documentation Remove Task's upsertQuery in favor of ROW_MODIFY_NONCOMMUTATIVE Split up AcquireExecutorShardLock into more internal functions Tests: Normalize multi_reference_table multi_create_table_constraints --- .../distributed/executor/adaptive_executor.c | 87 +++++++------ .../distributed/executor/citus_custom_scan.c | 2 +- .../executor/insert_select_executor.c | 5 +- .../executor/multi_router_executor.c | 115 ++++++++++-------- .../executor/multi_server_executor.c | 2 +- .../master/master_stage_protocol.c | 2 +- .../distributed/metadata/metadata_sync.c | 4 +- .../distributed/planner/deparse_shard_query.c | 5 +- .../distributed/planner/distributed_planner.c | 47 ++----- .../planner/insert_select_planner.c | 27 ++-- .../planner/multi_physical_planner.c | 38 +++++- .../planner/multi_router_planner.c | 52 +++----- .../distributed/utils/citus_copyfuncs.c | 5 +- .../distributed/utils/citus_outfuncs.c | 5 +- .../distributed/utils/citus_readfuncs.c | 5 +- src/include/distributed/distributed_planner.h | 2 - src/include/distributed/multi_executor.h | 5 +- .../distributed/multi_physical_planner.h | 21 +++- .../distributed/multi_router_executor.h | 6 +- .../distributed/multi_router_planner.h | 2 +- src/test/regress/bin/normalized_tests.lst | 2 + 21 files changed, 224 insertions(+), 215 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 8c0567561..72b7e3ae4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -158,8 +158,8 @@ */ typedef struct DistributedExecution { - /* the corresponding distributed plan's operation */ - CmdType operation; + /* the corresponding distributed plan's modLevel */ + RowModifyLevel modLevel; List *tasksToExecute; @@ -504,7 +504,7 @@ int ExecutorSlowStartInterval = 10; /* local functions */ -static DistributedExecution * CreateDistributedExecution(CmdType operation, +static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, @@ -520,16 +520,16 @@ static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); static void LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan); -static void AcquireExecutorShardLocks(DistributedExecution *execution); +static void AcquireExecutorShardLocksForExecution(DistributedExecution *execution); static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution); static bool DistributedPlanModifiesDatabase(DistributedPlan *plan); -static bool TaskListModifiesDatabase(CmdType operation, List *taskList); +static bool TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList); static bool DistributedExecutionRequiresRollback(DistributedExecution *execution); -static bool SelectForUpdateOnReferenceTable(CmdType operation, List *taskList); +static bool SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList); static void AssignTasksToConnections(DistributedExecution *execution); static void UnclaimAllSessionConnections(List *sessionList); static bool UseConnectionPerPlacement(void); -static PlacementExecutionOrder ExecutionOrderForTask(CmdType operation, Task *task); +static PlacementExecutionOrder ExecutionOrderForTask(RowModifyLevel modLevel, Task *task); static WorkerPool * FindOrCreateWorkerPool(DistributedExecution *execution, WorkerNode *workerNode); static WorkerSession * FindOrCreateWorkerSession(WorkerPool *workerPool, @@ -610,7 +610,7 @@ AdaptiveExecutor(CustomScanState *node) targetPoolSize = 1; } - execution = CreateDistributedExecution(distributedPlan->operation, taskList, + execution = CreateDistributedExecution(distributedPlan->modLevel, taskList, distributedPlan->hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize); @@ -626,7 +626,7 @@ AdaptiveExecutor(CustomScanState *node) RunDistributedExecution(execution); } - if (distributedPlan->operation != CMD_SELECT) + if (distributedPlan->modLevel != ROW_MODIFY_READONLY) { executorState->es_processed = execution->rowsProcessed; } @@ -654,14 +654,14 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, { if (TaskExecutorType == MULTI_EXECUTOR_ADAPTIVE) { - ExecuteTaskList(CMD_UTILITY, taskList, targetPoolSize); + ExecuteTaskList(ROW_MODIFY_NONE, taskList, targetPoolSize); } else { if (MultiShardConnectionType == SEQUENTIAL_CONNECTION || forceSequentialExecution) { - ExecuteModifyTasksSequentiallyWithoutResults(taskList, CMD_UTILITY); + ExecuteModifyTasksSequentiallyWithoutResults(taskList, ROW_MODIFY_NONE); } else { @@ -676,13 +676,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, * for some of the arguments. */ uint64 -ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; bool hasReturning = false; - return ExecuteTaskListExtended(operation, taskList, tupleDescriptor, + return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize); } @@ -692,7 +692,7 @@ ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize) * runs it. */ uint64 -ExecuteTaskListExtended(CmdType operation, List *taskList, +ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize) { @@ -705,7 +705,7 @@ ExecuteTaskListExtended(CmdType operation, List *taskList, } execution = - CreateDistributedExecution(operation, taskList, hasReturning, paramListInfo, + CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize); StartDistributedExecution(execution); @@ -721,14 +721,14 @@ ExecuteTaskListExtended(CmdType operation, List *taskList, * a distributed plan. */ DistributedExecution * -CreateDistributedExecution(CmdType operation, List *taskList, bool hasReturning, +CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); - execution->operation = operation; + execution->modLevel = modLevel; execution->tasksToExecute = taskList; execution->hasReturning = hasReturning; @@ -823,7 +823,7 @@ StartDistributedExecution(DistributedExecution *execution) } /* prevent unsafe concurrent modifications */ - AcquireExecutorShardLocks(execution); + AcquireExecutorShardLocksForExecution(execution); } @@ -834,7 +834,7 @@ StartDistributedExecution(DistributedExecution *execution) static bool DistributedExecutionModifiesDatabase(DistributedExecution *execution) { - return TaskListModifiesDatabase(execution->operation, execution->tasksToExecute); + return TaskListModifiesDatabase(execution->modLevel, execution->tasksToExecute); } @@ -845,7 +845,7 @@ DistributedExecutionModifiesDatabase(DistributedExecution *execution) static bool DistributedPlanModifiesDatabase(DistributedPlan *plan) { - return TaskListModifiesDatabase(plan->operation, plan->workerJob->taskList); + return TaskListModifiesDatabase(plan->modLevel, plan->workerJob->taskList); } @@ -854,22 +854,22 @@ DistributedPlanModifiesDatabase(DistributedPlan *plan) * DistributedPlanModifiesDatabase. */ static bool -TaskListModifiesDatabase(CmdType operation, List *taskList) +TaskListModifiesDatabase(RowModifyLevel modLevel, List *taskList) { Task *firstTask = NULL; - if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) + if (modLevel > ROW_MODIFY_READONLY) { return true; } /* - * If we cannot decide by only checking the operation, we should look closer - * to the tasks. + * If we cannot decide by only checking the row modify level, + * we should look closer to the tasks. */ if (list_length(taskList) < 1) { - /* does this ever possible? */ + /* is this ever possible? */ return false; } @@ -968,12 +968,12 @@ DistributedExecutionRequiresRollback(DistributedExecution *execution) * that contains FOR UPDATE clause that locks any reference tables. */ static bool -SelectForUpdateOnReferenceTable(CmdType operation, List *taskList) +SelectForUpdateOnReferenceTable(RowModifyLevel modLevel, List *taskList) { Task *task = NULL; ListCell *rtiLockCell = NULL; - if (operation != CMD_SELECT) + if (modLevel != ROW_MODIFY_READONLY) { return false; } @@ -1025,8 +1025,8 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) /* - * AcquireExecutorShardLocks acquires advisory lock on shard IDs to prevent - * unsafe concurrent modifications of shards. + * AcquireExecutorShardLocksForExecution acquires advisory lock on shard IDs + * to prevent unsafe concurrent modifications of shards. * * We prevent concurrent modifications of shards in two cases: * 1. Any non-commutative writes to a replicated table @@ -1042,13 +1042,13 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) * TRUNCATE because the table locks already prevent concurrent access. */ static void -AcquireExecutorShardLocks(DistributedExecution *execution) +AcquireExecutorShardLocksForExecution(DistributedExecution *execution) { - CmdType operation = execution->operation; + RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; - if (!(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE || - SelectForUpdateOnReferenceTable(operation, taskList))) + if (modLevel <= ROW_MODIFY_READONLY && + !SelectForUpdateOnReferenceTable(modLevel, taskList)) { /* * Executor locks only apply to DML commands and SELECT FOR UPDATE queries @@ -1069,7 +1069,7 @@ AcquireExecutorShardLocks(DistributedExecution *execution) { Task *task = (Task *) lfirst(taskCell); - AcquireExecutorShardLock(task, operation); + AcquireExecutorShardLocks(task, modLevel); } } else if (list_length(taskList) > 1) @@ -1211,7 +1211,7 @@ UnclaimAllSessionConnections(List *sessionList) static void AssignTasksToConnections(DistributedExecution *execution) { - CmdType operation = execution->operation; + RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->tasksToExecute; bool hasReturning = execution->hasReturning; @@ -1233,13 +1233,15 @@ AssignTasksToConnections(DistributedExecution *execution) shardCommandExecution = (ShardCommandExecution *) palloc0(sizeof(ShardCommandExecution)); shardCommandExecution->task = task; - shardCommandExecution->executionOrder = ExecutionOrderForTask(operation, task); + shardCommandExecution->executionOrder = ExecutionOrderForTask(modLevel, task); shardCommandExecution->executionState = TASK_EXECUTION_NOT_FINISHED; shardCommandExecution->placementExecutions = (TaskPlacementExecution **) palloc0(placementExecutionCount * sizeof(TaskPlacementExecution *)); shardCommandExecution->placementExecutionCount = placementExecutionCount; - shardCommandExecution->expectResults = hasReturning || operation == CMD_SELECT; + + shardCommandExecution->expectResults = hasReturning || + modLevel == ROW_MODIFY_READONLY; foreach(taskPlacementCell, task->taskPlacementList) @@ -1390,7 +1392,7 @@ UseConnectionPerPlacement(void) * ExecutionOrderForTask gives the appropriate execution order for a task. */ static PlacementExecutionOrder -ExecutionOrderForTask(CmdType operation, Task *task) +ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) { switch (task->taskType) { @@ -1402,7 +1404,14 @@ ExecutionOrderForTask(CmdType operation, Task *task) case MODIFY_TASK: { - if (operation == CMD_INSERT && !task->upsertQuery) + /* + * For non-commutative modifications we take aggressive locks, so + * there is no risk of deadlock and we can run them in parallel. + * When the modification is commutative, we take no additional + * locks, so we take a conservative approach and execute sequentially + * to avoid deadlocks. + */ + if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) { return EXECUTION_ORDER_SEQUENTIAL; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 6399c3e57..e320dd2a3 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -163,7 +163,7 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) scanState = (CitusScanState *) node; distributedPlan = scanState->distributedPlan; - if (distributedPlan->operation == CMD_SELECT || + if (distributedPlan->modLevel == ROW_MODIFY_READONLY || distributedPlan->insertSelectSubquery != NULL) { /* no more action required */ diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4625ae6ab..829a96d81 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -133,7 +133,8 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { ExecuteModifyTasksSequentially(scanState, prunedTaskList, - CMD_INSERT, hasReturning); + ROW_MODIFY_COMMUTATIVE, + hasReturning); } else { @@ -151,7 +152,7 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExecuteTaskListExtended(CMD_INSERT, prunedTaskList, + ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, prunedTaskList, tupleDescriptor, scanState->tuplestorestate, hasReturning, MaxAdaptiveExecutorPoolSize); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 2e3176080..debc03ba3 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -91,15 +91,17 @@ bool SortReturning = false; /* functions needed during run phase */ static void AcquireMetadataLocks(List *taskList); -static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType - operation, bool alwaysThrowErrorOnFailure, bool - expectResults); +static int64 ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, + RowModifyLevel modLevel, + bool alwaysThrowErrorOnFailure, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); static List * BuildPlacementAccessList(int32 groupId, List *relationShardList, ShardPlacementAccessType accessType); static List * GetModifyConnections(Task *task, bool markCritical); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, CitusScanState *scanState); +static void AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel); +static void AcquireExecutorShardLocksForRelationRowLockList(Task *task); static bool RequiresConsistentSnapshot(Task *task); static void RouterMultiModifyExecScan(CustomScanState *node); static void RouterSequentialModifyExecScan(CustomScanState *node); @@ -138,27 +140,18 @@ AcquireMetadataLocks(List *taskList) } -/* - * AcquireExecutorShardLock acquires a lock on the shard for the given task and - * command type if necessary to avoid divergence between multiple replicas of - * the same shard. No lock is obtained when there is only one replica. - * - * The function determines the appropriate lock mode based on the commutativity - * rule of the command. In each case, it uses a lock mode that enforces the - * commutativity rule. - * - * The mapping is overridden when all_modifications_commutative is set to true. - * In that case, all modifications are treated as commutative, which can be used - * to communicate that the application is only generating commutative - * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. - */ -void -AcquireExecutorShardLock(Task *task, CmdType commandType) +static void +AcquireExecutorShardLockForRowModify(Task *task, RowModifyLevel modLevel) { LOCKMODE lockMode = NoLock; int64 shardId = task->anchorShardId; - if (commandType == CMD_SELECT) + if (shardId == INVALID_SHARD_ID) + { + return; + } + + if (modLevel <= ROW_MODIFY_READONLY) { /* * The executor shard lock is used to maintain consistency between @@ -205,24 +198,7 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) lockMode = RowExclusiveLock; } - else if (task->upsertQuery || commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - /* - * UPDATE/DELETE/UPSERT commands do not commute with other modifications - * since the rows modified by one command may be affected by the outcome - * of another command. - * - * We need to handle upsert before INSERT, because PostgreSQL models - * upsert commands as INSERT with an ON CONFLICT section. - * - * ExclusiveLock conflicts with all lock types used by modifications - * and therefore prevents other modifications from running - * concurrently. - */ - - lockMode = ExclusiveLock; - } - else if (commandType == CMD_INSERT) + else if (modLevel < ROW_MODIFY_NONCOMMUTATIVE) { /* * An INSERT commutes with other INSERT commands, since performing them @@ -245,16 +221,34 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } else { - ereport(ERROR, (errmsg("unrecognized operation code: %d", (int) commandType))); + /* + * UPDATE/DELETE/UPSERT commands do not commute with other modifications + * since the rows modified by one command may be affected by the outcome + * of another command. + * + * We need to handle upsert before INSERT, because PostgreSQL models + * upsert commands as INSERT with an ON CONFLICT section. + * + * ExclusiveLock conflicts with all lock types used by modifications + * and therefore prevents other modifications from running + * concurrently. + */ + + lockMode = ExclusiveLock; } - if (shardId != INVALID_SHARD_ID && lockMode != NoLock) + if (lockMode != NoLock) { ShardInterval *shardInterval = LoadShardInterval(shardId); SerializeNonCommutativeWrites(list_make1(shardInterval), lockMode); } +} + +static void +AcquireExecutorShardLocksForRelationRowLockList(Task *task) +{ /* * If lock clause exists and it effects any reference table, we need to get * lock on shard resource. Type of lock is determined by the type of row lock @@ -301,6 +295,28 @@ AcquireExecutorShardLock(Task *task, CmdType commandType) } } } +} + + +/* + * AcquireExecutorShardLocks acquires locks on shards for the given task if + * necessary to avoid divergence between multiple replicas of the same shard. + * No lock is obtained when there is only one replica. + * + * The function determines the appropriate lock mode based on the commutativity + * rule of the command. In each case, it uses a lock mode that enforces the + * commutativity rule. + * + * The mapping is overridden when all_modifications_commutative is set to true. + * In that case, all modifications are treated as commutative, which can be used + * to communicate that the application is only generating commutative + * UPDATE/DELETE/UPSERT commands and exclusive locks are unnecessary. + */ +void +AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel) +{ + AcquireExecutorShardLockForRowModify(task, modLevel); + AcquireExecutorShardLocksForRelationRowLockList(task); /* * If the task has a subselect, then we may need to lock the shards from which @@ -717,16 +733,16 @@ RouterSequentialModifyExecScan(CustomScanState *node) { CitusScanState *scanState = (CitusScanState *) node; DistributedPlan *distributedPlan = scanState->distributedPlan; + RowModifyLevel modLevel = distributedPlan->modLevel; bool hasReturning = distributedPlan->hasReturning; Job *workerJob = distributedPlan->workerJob; List *taskList = workerJob->taskList; EState *executorState = ScanStateGetExecutorState(scanState); - CmdType operation = scanState->distributedPlan->operation; Assert(!scanState->finishedRemoteScan); executorState->es_processed += - ExecuteModifyTasksSequentially(scanState, taskList, operation, hasReturning); + ExecuteModifyTasksSequentially(scanState, taskList, modLevel, hasReturning); } @@ -1056,7 +1072,7 @@ CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType access * The function returns affectedTupleCount if applicable. */ static int64 -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation, +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, RowModifyLevel modLevel, bool alwaysThrowErrorOnFailure, bool expectResults) { EState *executorState = NULL; @@ -1107,10 +1123,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, CmdType operation * acquire the necessary locks on the relations, and blocks any * unsafe concurrent operations. */ - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE || operation == CMD_SELECT) + if (modLevel > ROW_MODIFY_NONE) { - AcquireExecutorShardLock(task, operation); + AcquireExecutorShardLocks(task, modLevel); } /* try to execute modification on all placements */ @@ -1356,9 +1371,9 @@ ExecuteModifyTasksWithoutResults(List *taskList) * and ignores the results. */ int64 -ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) +ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, RowModifyLevel modLevel) { - return ExecuteModifyTasksSequentially(NULL, taskList, operation, false); + return ExecuteModifyTasksSequentially(NULL, taskList, modLevel, false); } @@ -1373,7 +1388,7 @@ ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, CmdType operation) */ int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, - CmdType operation, bool hasReturning) + RowModifyLevel modLevel, bool hasReturning) { ListCell *taskCell = NULL; bool multipleTasks = list_length(taskList) > 1; @@ -1422,7 +1437,7 @@ ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, bool expectResults = (hasReturning || task->relationRowLockList != NIL); affectedTupleCount += - ExecuteSingleModifyTask(scanState, task, operation, alwaysThrowErrorOnFailure, + ExecuteSingleModifyTask(scanState, task, modLevel, alwaysThrowErrorOnFailure, expectResults); } diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index e54ca4358..fff0fa42f 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -95,7 +95,7 @@ JobExecutorType(DistributedPlan *distributedPlan) return MULTI_EXECUTOR_COORDINATOR_INSERT_SELECT; } - Assert(distributedPlan->operation == CMD_SELECT); + Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY); workerNodeList = ActiveReadableNodeList(); workerNodeCount = list_length(workerNodeList); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index f34cd5e1e..49d3f7bd4 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -592,7 +592,7 @@ CreateShardsOnWorkersViaExecutor(Oid distributedRelationId, List *shardPlacement poolSize = MaxAdaptiveExecutorPoolSize; } - ExecuteTaskList(CMD_UTILITY, taskList, poolSize); + ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index ef2b75d39..7792d6d07 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -69,7 +69,7 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); * start_metadata_sync_to_node function creates the metadata in a worker for preparing the * worker for accepting queries. The function first sets the localGroupId of the worker * so that the worker knows which tuple in pg_dist_node table represents itself. After - * that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are + * that, SQL statements for re-creating metadata of MX-eligible distributed tables are * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node * is marked as true. */ @@ -1188,7 +1188,7 @@ DetachPartitionCommandList(void) /* * We probably do not need this but as an extra precaution, we are enabling - * DDL propagation to swtich back to original state. + * DDL propagation to switch back to original state. */ detachPartitionCommandList = lappend(detachPartitionCommandList, ENABLE_DDL_PROPAGATION); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index e44ceeb04..1f906fb26 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -70,7 +70,7 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) query = copyObject(originalQuery); - copiedInsertRte = ExtractInsertRangeTableEntry(query); + copiedInsertRte = ExtractResultRelationRTE(query); copiedSubqueryRte = ExtractSelectRangeTableEntry(query); copiedSubquery = copiedSubqueryRte->subquery; @@ -92,7 +92,8 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) UpdateRelationToShardNames((Node *) copiedSubquery, relationShardList); } - else if (task->upsertQuery || valuesRTE != NULL) + else if (query->commandType == CMD_INSERT && (query->onConflict != NULL || + valuesRTE != NULL)) { RangeTblEntry *rangeTableEntry = NULL; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 88c705549..a769e9b78 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -50,8 +50,8 @@ int MultiTaskQueryLogLevel = MULTI_TASK_QUERY_INFO_OFF; /* multi-task query log static uint64 NextPlanId = 1; -/* local function forward declarations */ static bool ListContainsDistributedTableRTE(List *rangeTableList); +static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, Query *query, ParamListInfo boundParams, @@ -423,22 +423,6 @@ IsModifyCommand(Query *query) } -/* - * IsMultiShardModifyPlan returns true if the given plan was generated for - * multi shard update or delete query. - */ -bool -IsMultiShardModifyPlan(DistributedPlan *distributedPlan) -{ - if (IsUpdateOrDelete(distributedPlan) && IsMultiTaskPlan(distributedPlan)) - { - return true; - } - - return false; -} - - /* * IsMultiTaskPlan returns true if job contains multiple tasks. */ @@ -457,19 +441,13 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan) /* - * IsUpdateOrDelete returns true if the query performs update or delete. + * IsUpdateOrDelete returns true if the query performs an update or delete. */ bool -IsUpdateOrDelete(DistributedPlan *distributedPlan) +IsUpdateOrDelete(Query *query) { - CmdType commandType = distributedPlan->operation; - - if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - return true; - } - - return false; + return query->commandType == CMD_UPDATE || + query->commandType == CMD_DELETE; } @@ -480,15 +458,7 @@ IsUpdateOrDelete(DistributedPlan *distributedPlan) bool IsModifyDistributedPlan(DistributedPlan *distributedPlan) { - bool isModifyDistributedPlan = false; - CmdType operation = distributedPlan->operation; - - if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) - { - isModifyDistributedPlan = true; - } - - return isModifyDistributedPlan; + return distributedPlan->modLevel > ROW_MODIFY_READONLY; } @@ -569,11 +539,12 @@ CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *origi * query planning failed (possibly) due to prepared statement parameters or * if it is planned as a multi shard modify query. */ - if ((distributedPlan->planningError || IsMultiShardModifyPlan(distributedPlan)) && + if ((distributedPlan->planningError || + (IsUpdateOrDelete(originalQuery) && IsMultiTaskPlan(distributedPlan))) && hasUnresolvedParams) { /* - * Arbitraryly high cost, but low enough that it can be added up + * Arbitrarily high cost, but low enough that it can be added up * without overflowing by choose_custom_plan(). */ resultPlan->planTree->total_cost = FLT_MAX / 100000000; diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 6f097fe30..96e1620a6 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -85,7 +85,7 @@ InsertSelectIntoDistributedTable(Query *query) if (insertSelectQuery) { - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); + RangeTblEntry *insertRte = ExtractResultRelationRTE(query); if (IsDistributedTable(insertRte->relid)) { return true; @@ -108,7 +108,7 @@ InsertSelectIntoLocalTable(Query *query) if (insertSelectQuery) { - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(query); + RangeTblEntry *insertRte = ExtractResultRelationRTE(query); if (!IsDistributedTable(insertRte->relid)) { return true; @@ -220,7 +220,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, Job *workerJob = NULL; uint64 jobId = INVALID_JOB_ID; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(originalQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery); RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery); Oid targetRelationId = insertRte->relid; DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); @@ -230,7 +230,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery, bool allReferenceTables = relationRestrictionContext->allReferenceTables; bool allDistributionKeysInQueryAreEqual = false; - distributedPlan->operation = originalQuery->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(originalQuery); /* * Error semantics for INSERT ... SELECT queries are different than regular @@ -423,7 +423,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter bool safeToPushdownSubquery) { Query *copiedQuery = copyObject(originalQuery); - RangeTblEntry *copiedInsertRte = ExtractInsertRangeTableEntry(copiedQuery); + RangeTblEntry *copiedInsertRte = ExtractResultRelationRTE(copiedQuery); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(copiedQuery); Query *copiedSubquery = (Query *) copiedSubqueryRte->subquery; @@ -443,7 +443,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter uint64 jobId = INVALID_JOB_ID; List *insertShardPlacementList = NULL; List *intersectedPlacementList = NULL; - bool upsertQuery = false; bool replacePrunedQueryWithDummy = false; bool allReferenceTables = plannerRestrictionContext->relationRestrictionContext->allReferenceTables; @@ -566,12 +565,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter /* this is required for correct deparsing of the query */ ReorderInsertSelectTargetLists(copiedQuery, copiedInsertRte, copiedSubqueryRte); - /* set the upsert flag */ - if (originalQuery->onConflict != NULL) - { - upsertQuery = true; - } - /* setting an alias simplifies deparsing of RETURNING */ if (copiedInsertRte->alias == NULL) { @@ -589,7 +582,6 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter modifyTask->dependedTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->upsertQuery = upsertQuery; modifyTask->relationShardList = relationShardList; modifyTask->replicationModel = cacheEntry->replicationModel; @@ -1138,11 +1130,11 @@ CreateCoordinatorInsertSelectPlan(uint64 planId, Query *parse) Query *selectQuery = NULL; RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertSelectQuery); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); Oid targetRelationId = insertRte->relid; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); - distributedPlan->operation = CMD_INSERT; + distributedPlan->modLevel = RowModifyLevelForQuery(insertSelectQuery); distributedPlan->planningError = CoordinatorInsertSelectSupported(insertSelectQuery); @@ -1243,7 +1235,7 @@ CoordinatorInsertSelectSupported(Query *insertSelectQuery) return deferredError; } - insertRte = ExtractInsertRangeTableEntry(insertSelectQuery); + insertRte = ExtractResultRelationRTE(insertSelectQuery); if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, @@ -1347,7 +1339,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, * then deparse it. */ Query *insertResultQuery = copyObject(insertSelectQuery); - RangeTblEntry *insertRte = ExtractInsertRangeTableEntry(insertResultQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId); @@ -1441,7 +1433,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, modifyTask->dependedTaskList = NULL; modifyTask->anchorShardId = shardId; modifyTask->taskPlacementList = insertShardPlacementList; - modifyTask->upsertQuery = insertResultQuery->onConflict != NULL; modifyTask->relationShardList = list_make1(relationShard); modifyTask->replicationModel = targetCacheEntry->replicationModel; diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c1f949760..66d380356 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -228,7 +228,7 @@ CreatePhysicalDistributedPlan(MultiTreeRoot *multiTree, distributedPlan->workerJob = workerJob; distributedPlan->masterQuery = masterQuery; distributedPlan->routerExecutable = DistributedPlanRouterExecutable(distributedPlan); - distributedPlan->operation = CMD_SELECT; + distributedPlan->modLevel = ROW_MODIFY_READONLY; return distributedPlan; } @@ -2430,7 +2430,6 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, subqueryTask->dependedTaskList = NULL; subqueryTask->anchorShardId = anchorShardId; subqueryTask->taskPlacementList = selectPlacementList; - subqueryTask->upsertQuery = false; subqueryTask->relationShardList = relationShardList; return subqueryTask; @@ -4310,6 +4309,41 @@ GenerateSyntheticShardIntervalArray(int partitionCount) } +/* + * Determine RowModifyLevel required for given query + */ +RowModifyLevel +RowModifyLevelForQuery(Query *query) +{ + CmdType commandType = query->commandType; + + if (commandType == CMD_SELECT) + { + return ROW_MODIFY_READONLY; + } + + if (commandType == CMD_INSERT) + { + if (query->onConflict == NULL) + { + return ROW_MODIFY_COMMUTATIVE; + } + else + { + return ROW_MODIFY_NONCOMMUTATIVE; + } + } + + if (commandType == CMD_UPDATE || + commandType == CMD_DELETE) + { + return ROW_MODIFY_NONCOMMUTATIVE; + } + + return ROW_MODIFY_NONE; +} + + /* * ColumnName resolves the given column's name. The given column could belong to * a regular table or to an intermediate table formed to execute a distributed diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 1bd4cfaf3..d6dfca007 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -165,7 +165,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, /* * CreateRouterPlan attempts to create a router executor plan for the given - * SELECT statement. ->planningError is set if planning fails. + * SELECT statement. ->planningError is set if planning fails. */ DistributedPlan * CreateRouterPlan(Query *originalQuery, Query *query, @@ -186,8 +186,8 @@ CreateRouterPlan(Query *originalQuery, Query *query, /* - * CreateModifyPlan attempts to create a plan the given modification - * statement. If planning fails ->planningError is set to a description of + * CreateModifyPlan attempts to create a plan for the given modification + * statement. If planning fails ->planningError is set to a description of * the failure. */ DistributedPlan * @@ -198,7 +198,7 @@ CreateModifyPlan(Query *originalQuery, Query *query, DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); bool multiShardQuery = false; - distributedPlan->operation = query->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(query); distributedPlan->planningError = ModifyQuerySupported(query, originalQuery, multiShardQuery, @@ -244,8 +244,8 @@ CreateModifyPlan(Query *originalQuery, Query *query, * CreateSingleTaskRouterPlan creates a physical plan for given query. The created plan is * either a modify task that changes a single shard, or a router task that returns * query results from a single worker. Supported modify queries (insert/update/delete) - * are router plannable by default. If query is not router plannable then either NULL is - * returned, or the returned plan has planningError set to a description of the problem. + * are router plannable by default. If query is not router plannable the returned plan + * has planningError set to a description of the problem. */ static void CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuery, @@ -254,7 +254,7 @@ CreateSingleTaskRouterPlan(DistributedPlan *distributedPlan, Query *originalQuer { Job *job = NULL; - distributedPlan->operation = query->commandType; + distributedPlan->modLevel = RowModifyLevelForQuery(query); /* we cannot have multi shard update/delete query via this code path */ job = RouterJob(originalQuery, plannerRestrictionContext, @@ -491,7 +491,7 @@ ModifyQueryResultRelationId(Query *query) errmsg("input query is not a modification query"))); } - resultRte = ExtractInsertRangeTableEntry(query); + resultRte = ExtractResultRelationRTE(query); Assert(OidIsValid(resultRte->relid)); return resultRte->relid; @@ -512,20 +512,12 @@ ResultRelationOidForQuery(Query *query) /* - * ExtractInsertRangeTableEntry returns the INSERT'ed table's range table entry. - * Note that the function expects and asserts that the input query be - * an INSERT...SELECT query. + * ExtractResultRelationRTE returns the table's resultRelation range table entry. */ RangeTblEntry * -ExtractInsertRangeTableEntry(Query *query) +ExtractResultRelationRTE(Query *query) { - int resultRelation = query->resultRelation; - List *rangeTableList = query->rtable; - RangeTblEntry *insertRTE = NULL; - - insertRTE = rt_fetch(resultRelation, rangeTableList); - - return insertRTE; + return rt_fetch(query->resultRelation, query->rtable); } @@ -1086,7 +1078,7 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) { /* * Yes, so check each join alias var to see if any of them are not - * simple references to underlying columns. If so, we have a + * simple references to underlying columns. If so, we have a * dangerous situation and must pick unique aliases. */ RangeTblEntry *joinRTE = rt_fetch(joinExpr->rtindex, rtableList); @@ -1129,14 +1121,8 @@ HasDangerousJoinUsing(List *rtableList, Node *joinTreeNode) bool UpdateOrDeleteQuery(Query *query) { - CmdType commandType = query->commandType; - - if (commandType == CMD_UPDATE || commandType == CMD_DELETE) - { - return true; - } - - return false; + return query->commandType == CMD_UPDATE || + query->commandType == CMD_DELETE; } @@ -1538,11 +1524,6 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->rowValuesLists = modifyRoute->rowValuesLists; - if (query->onConflict != NULL) - { - modifyTask->upsertQuery = true; - } - insertTaskList = lappend(insertTaskList, modifyTask); } @@ -1572,7 +1553,6 @@ CreateTask(TaskType taskType) task->shardInterval = NULL; task->assignmentConstrained = false; task->taskExecution = NULL; - task->upsertQuery = false; task->replicationModel = REPLICATION_MODEL_INVALID; task->relationRowLockList = NIL; @@ -1858,8 +1838,8 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, /* - * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. If it finds - * it returns it. + * GetUpdateOrDeleteRTE checks query if it has an UPDATE or DELETE RTE. + * Returns that RTE if found. */ static RangeTblEntry * GetUpdateOrDeleteRTE(Query *query) diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 6d302e441..0f98934fb 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -103,12 +103,12 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS) DECLARE_FROM_AND_NEW_NODE(DistributedPlan); COPY_SCALAR_FIELD(planId); - COPY_SCALAR_FIELD(operation); + COPY_SCALAR_FIELD(modLevel); COPY_SCALAR_FIELD(hasReturning); + COPY_SCALAR_FIELD(routerExecutable); COPY_NODE_FIELD(workerJob); COPY_NODE_FIELD(masterQuery); - COPY_SCALAR_FIELD(routerExecutable); COPY_SCALAR_FIELD(queryId); COPY_NODE_FIELD(relationIdList); @@ -257,7 +257,6 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_NODE_FIELD(shardInterval); COPY_SCALAR_FIELD(assignmentConstrained); COPY_NODE_FIELD(taskExecution); - COPY_SCALAR_FIELD(upsertQuery); COPY_SCALAR_FIELD(replicationModel); COPY_SCALAR_FIELD(modifyWithSubquery); COPY_NODE_FIELD(relationShardList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 94caafa0e..bc92940b9 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -176,12 +176,12 @@ OutDistributedPlan(OUTFUNC_ARGS) WRITE_NODE_TYPE("DISTRIBUTEDPLAN"); WRITE_UINT64_FIELD(planId); - WRITE_INT_FIELD(operation); + WRITE_ENUM_FIELD(modLevel, RowModifyLevel); WRITE_BOOL_FIELD(hasReturning); + WRITE_BOOL_FIELD(routerExecutable); WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); - WRITE_BOOL_FIELD(routerExecutable); WRITE_UINT64_FIELD(queryId); WRITE_NODE_FIELD(relationIdList); @@ -467,7 +467,6 @@ OutTask(OUTFUNC_ARGS) WRITE_NODE_FIELD(shardInterval); WRITE_BOOL_FIELD(assignmentConstrained); WRITE_NODE_FIELD(taskExecution); - WRITE_BOOL_FIELD(upsertQuery); WRITE_CHAR_FIELD(replicationModel); WRITE_BOOL_FIELD(modifyWithSubquery); WRITE_NODE_FIELD(relationShardList); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index 93df55fab..a15c5d0f6 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -202,12 +202,12 @@ ReadDistributedPlan(READFUNC_ARGS) READ_LOCALS(DistributedPlan); READ_UINT64_FIELD(planId); - READ_INT_FIELD(operation); + READ_ENUM_FIELD(modLevel, RowModifyLevel); READ_BOOL_FIELD(hasReturning); + READ_BOOL_FIELD(routerExecutable); READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); - READ_BOOL_FIELD(routerExecutable); READ_UINT64_FIELD(queryId); READ_NODE_FIELD(relationIdList); @@ -381,7 +381,6 @@ ReadTask(READFUNC_ARGS) READ_NODE_FIELD(shardInterval); READ_BOOL_FIELD(assignmentConstrained); READ_NODE_FIELD(taskExecution); - READ_BOOL_FIELD(upsertQuery); READ_CHAR_FIELD(replicationModel); READ_BOOL_FIELD(modifyWithSubquery); READ_NODE_FIELD(relationShardList); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 50daeb93f..927cef48b 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -108,12 +108,10 @@ extern void multi_join_restriction_hook(PlannerInfo *root, JoinType jointype, JoinPathExtraData *extra); extern bool IsModifyCommand(Query *query); -extern bool IsUpdateOrDelete(struct DistributedPlan *distributedPlan); extern bool IsModifyDistributedPlan(struct DistributedPlan *distributedPlan); extern void EnsurePartitionTableNotReplicated(Oid relationId); extern Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); extern bool IsMultiTaskPlan(struct DistributedPlan *distributedPlan); -extern bool IsMultiShardModifyPlan(struct DistributedPlan *distributedPlan); extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); extern int GetRTEIdentity(RangeTblEntry *rte); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 9d2358a5c..ed8989bd5 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -38,13 +38,14 @@ extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once); extern TupleTableSlot * AdaptiveExecutor(CustomScanState *node); -extern uint64 ExecuteTaskListExtended(CmdType operation, List *taskList, +extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize); extern void ExecuteUtilityTaskListWithoutResults(List *taskList, int targetPoolSize, bool forceSequentialExecution); -extern uint64 ExecuteTaskList(CmdType operation, List *taskList, int targetPoolSize); +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int + targetPoolSize); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 8a573a079..cabb46df0 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -110,6 +110,15 @@ typedef enum } BoundaryNodeJobType; +/* Enumeration that specifies extent of DML modifications */ +typedef enum RowModifyLevel +{ + ROW_MODIFY_NONE = 0, + ROW_MODIFY_READONLY = 1, + ROW_MODIFY_COMMUTATIVE = 2, + ROW_MODIFY_NONCOMMUTATIVE = 3 +} RowModifyLevel; + /* * Job represents a logical unit of work that contains one set of data transfers * in our physical plan. The physical planner maps each SQL query into one or @@ -183,7 +192,6 @@ typedef struct Task ShardInterval *shardInterval; /* only applies to merge tasks */ bool assignmentConstrained; /* only applies to merge tasks */ TaskExecution *taskExecution; /* used by task tracker executor */ - bool upsertQuery; /* only applies to modify tasks */ char replicationModel; /* only applies to modify tasks */ List *relationRowLockList; @@ -229,21 +237,21 @@ typedef struct DistributedPlan /* unique identifier of the plan within the session */ uint64 planId; - /* type of command to execute (SELECT/INSERT/...) */ - CmdType operation; + /* specifies nature of modifications in query */ + RowModifyLevel modLevel; /* specifies whether a DML command has a RETURNING */ bool hasReturning; + /* a router executable query is executed entirely on a worker */ + bool routerExecutable; + /* job tree containing the tasks to be executed on workers */ Job *workerJob; /* local query that merges results from the workers */ Query *masterQuery; - /* a router executable query is executed entirely on a worker */ - bool routerExecutable; - /* query identifier (copied from the top-level PlannedStmt) */ uint64 queryId; @@ -340,6 +348,7 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval); extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); +extern RowModifyLevel RowModifyLevelForQuery(Query *query); /* function declarations for Task and Task list operations */ diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b92c46d30..c0d7e80e7 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -46,15 +46,15 @@ extern void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); int64 ExecuteModifyTasksSequentially(CitusScanState *scanState, List *taskList, - CmdType operation, bool hasReturning); + RowModifyLevel modLevel, bool hasReturning); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); extern int64 ExecuteModifyTasksSequentiallyWithoutResults(List *taskList, - CmdType operation); + RowModifyLevel modLevel); extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); /* helper functions */ -extern void AcquireExecutorShardLock(Task *task, CmdType commandType); +extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel); extern void AcquireExecutorMultiShardLocks(List *taskList); extern ShardPlacementAccess * CreatePlacementAccess(ShardPlacement *placement, ShardPlacementAccessType accessType); diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 4ba110e2d..8a868dfca 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -61,7 +61,7 @@ extern RelationRestrictionContext * CopyRelationRestrictionContext( extern Oid ExtractFirstDistributedTableId(Query *query); extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query); extern Oid ModifyQueryResultRelationId(Query *query); -extern RangeTblEntry * ExtractInsertRangeTableEntry(Query *query); +extern RangeTblEntry * ExtractResultRelationRTE(Query *query); extern RangeTblEntry * ExtractDistributedInsertValuesRTE(Query *query); extern bool IsMultiRowInsert(Query *query); extern void AddShardIntervalRestrictionToSelect(Query *subqery, diff --git a/src/test/regress/bin/normalized_tests.lst b/src/test/regress/bin/normalized_tests.lst index b1408edb7..39cf8803b 100644 --- a/src/test/regress/bin/normalized_tests.lst +++ b/src/test/regress/bin/normalized_tests.lst @@ -17,6 +17,8 @@ multi_subtransactions multi_modifying_xacts multi_insert_select sql_procedure +multi_reference_table +multi_create_table_constraints # the following tests' output are # normalized for EXPLAIN outputs