diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6516930e7..f26cf7443 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -14,8 +14,10 @@ #include "distributed/insert_select_planner.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" +#include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "executor/executor.h" #include "nodes/execnodes.h" @@ -60,6 +62,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node) ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); + /* + * If we are dealing with partitioned table, we also need to lock its + * partitions. Here we only lock targetRelation, we acquire necessary + * locks on selected tables during execution of those select queries. + */ + if (PartitionedTable(targetRelationId)) + { + LockPartitionRelations(targetRelationId, RowExclusiveLock); + } + ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery, executorState); diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 2a210ee5f..2492356b9 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -25,6 +25,7 @@ #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" #include "distributed/multi_utility.h" +#include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" #include "commands/copy.h" @@ -248,6 +249,9 @@ RealTimeExecScan(CustomScanState *node) MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + PrepareMasterJobDirectory(workerJob); MultiRealTimeExecute(workerJob); @@ -445,6 +449,9 @@ TaskTrackerExecScan(CustomScanState *node) MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + PrepareMasterJobDirectory(workerJob); MultiTaskTrackerExecute(workerJob); diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 5b6def0fc..5e5138f6c 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -34,6 +34,7 @@ #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_planner.h" #include "distributed/multi_router_executor.h" @@ -98,6 +99,7 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect bool failOnError, int64 *rows); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows); +static LOCKMODE LockModeForModifyTask(Task *task); /* @@ -434,6 +436,12 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) /* prevent concurrent placement changes */ AcquireMetadataLocks(taskList); + /* + * We are taking locks on partitions of partitioned tables. These locks are + * necessary for locking tables that appear in the SELECT part of the query. + */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + /* modify tasks are always assigned using first-replica policy */ workerJob->taskList = FirstReplicaAssignTaskList(taskList); } @@ -519,6 +527,9 @@ RouterSelectExecScan(CustomScanState *node) Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; + /* we are taking locks on partitions of partitioned tables */ + LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock); + if (list_length(taskList) > 0) { Task *task = (Task *) linitial(taskList); @@ -682,6 +693,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); + ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId); + Oid relationId = shardInterval->relationId; + /* * Modifications for reference tables are always done using 2PC. First * ensure that distributed transaction is started. Then force the @@ -712,6 +726,17 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult */ connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit); + /* + * If we are dealing with a partitioned table, we also need to lock its + * partitions. + */ + if (PartitionedTable(relationId)) + { + LOCKMODE lockMode = LockModeForModifyTask(task); + + LockPartitionRelations(relationId, lockMode); + } + /* prevent replicas of the same shard from diverging */ AcquireExecutorShardLock(task, operation); @@ -947,6 +972,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn int64 totalAffectedTupleCount = 0; ListCell *taskCell = NULL; Task *firstTask = NULL; + ShardInterval *firstShardInterval = NULL; int connectionFlags = 0; List *affectedTupleCountList = NIL; HTAB *shardConnectionHash = NULL; @@ -958,13 +984,25 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn return 0; } + /* + * In multi shard modification, we expect that all tasks operates on the + * same relation, so it is enough to acquire a lock on the first task's + * anchor relation's partitions. + */ + firstTask = (Task *) linitial(taskList); + firstShardInterval = LoadShardInterval(firstTask->anchorShardId); + if (PartitionedTable(firstShardInterval->relationId)) + { + LOCKMODE lockMode = LockModeForModifyTask(firstTask); + + LockPartitionRelations(firstShardInterval->relationId, lockMode); + } + /* ensure that there are no concurrent modifications on the same shards */ AcquireExecutorMultiShardLocks(taskList); BeginOrContinueCoordinatedTransaction(); - firstTask = (Task *) linitial(taskList); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC || firstTask->replicationModel == REPLICATION_MODEL_2PC) { @@ -1464,3 +1502,29 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) return gotResponse && !commandFailed; } + + +/* + * LockModeForRouterModifyTask returns appropriate LOCKMODE for given router + * modify task. + */ +static LOCKMODE +LockModeForModifyTask(Task *task) +{ + LOCKMODE lockMode = NoLock; + if (task->taskType == DDL_TASK) + { + lockMode = AccessExclusiveLock; + } + else if (task->taskType == MODIFY_TASK) + { + lockMode = RowExclusiveLock; + } + else + { + /* we do not allow any other task type in these code path */ + Assert(false); + } + + return lockMode; +} diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index d4da891ae..0a085fd74 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -17,11 +17,13 @@ #include "c.h" #include "miscadmin.h" +#include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" -#include "distributed/multi_router_executor.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_planner.h" +#include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" @@ -316,3 +318,69 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) } } } + + +/* + * LockParentShardResourceIfPartition checks whether given shard belongs to a + * partition. If it is, LockParentShardResourceIfPartition acquires a shard + * resource lock on the colocated shard of the parent table. + */ +void +LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; + + if (PartitionTable(relationId)) + { + int shardIndex = ShardIndex(shardInterval); + Oid parentRelationId = PartitionParentOid(relationId); + uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex); + + LockShardResource(parentShardId, lockMode); + } +} + + +/* + * LockPartitionsInRelationList iterates over given list and acquires locks on + * partitions of each partitioned table. It does nothing for non-partitioned tables. + */ +void +LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode) +{ + ListCell *relationIdCell = NULL; + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + if (PartitionedTable(relationId)) + { + LockPartitionRelations(relationId, lockmode); + } + } +} + + +/* + * LockPartitionRelations acquires relation lock on all partitions of given + * partitioned relation. This function expects that given relation is a + * partitioned relation. + */ +void +LockPartitionRelations(Oid relationId, LOCKMODE lockMode) +{ + /* + * PartitionList function generates partition list in the same order + * as PostgreSQL. Therefore we do not need to sort it before acquiring + * locks. + */ + List *partitionList = PartitionList(relationId); + ListCell *partitionCell = NULL; + + foreach(partitionCell, partitionList) + { + Oid partitionRelationId = lfirst_oid(partitionCell); + LockRelationOid(partitionRelationId, lockMode); + } +} diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 896d1fbb0..95a64c7be 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -80,5 +80,8 @@ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode); extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode); +/* Lock partitions of partitioned table */ +extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); +extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode); #endif /* RESOURCE_LOCK_H */