From d8d86629e57b1b9f564e8f8fd933266f40d60417 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Fri, 13 Aug 2021 17:15:18 +0300 Subject: [PATCH] Lock parent shard resources if partition --- .../commands/create_distributed_table.c | 1 + src/backend/distributed/commands/table.c | 7 +++ .../distributed/executor/adaptive_executor.c | 48 ------------------- .../executor/distributed_execution_locks.c | 17 +++++++ .../distributed/operations/delete_protocol.c | 2 + .../distributed/distributed_execution_locks.h | 1 + 6 files changed, 28 insertions(+), 48 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 111181a0c..a3a4fe5d4 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -58,6 +58,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/shared_library_init.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 0b6b2c767..aa4fc9494 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -26,6 +26,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/deparser.h" #include "distributed/deparse_shard_query.h" +#include "distributed/distributed_execution_locks.h" #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_sync.h" @@ -138,6 +139,8 @@ PreprocessDropTableStmt(Node *node, const char *queryString, continue; } + LockParentShardResourcesForShardsOfPartition(relationId); + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ @@ -367,6 +370,8 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const parentDistributionMethod, ShardCount, false, parentRelationName, viaDeprecatedAPI); } + + LockParentShardResourcesForShardsOfPartition(relationId); } @@ -444,6 +449,7 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement, distributionMethod, ShardCount, false, parentRelationName, viaDeprecatedAPI); } + LockParentShardResourcesForShardsOfPartition(partitionRelationId); } } @@ -849,6 +855,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, Assert(list_length(commandList) <= 1); rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false); + LockParentShardResourcesForShardsOfPartition(rightRelationId); } else if (AlterTableCommandTypeIsTrigger(alterTableType)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 9eba7e6e9..6bfcbe753 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -704,7 +704,6 @@ static void SetAttributeInputMetadata(DistributedExecution *execution, static void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, int *nodePort); static bool IsDummyPlacement(ShardPlacement *taskPlacement); -static void LockParentShardResouceIfPartitionTaskList(List *taskList); /* * AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor @@ -1560,20 +1559,6 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan) Oid targetRelationId = distributedPlan->targetRelationId; LockPartitionsInRelationList(list_make1_oid(targetRelationId), RowExclusiveLock); - - if (PartitionTable(targetRelationId)) - { - Oid parentRelationId = PartitionParentOid(targetRelationId); - - /* - * Postgres only takes the lock on parent when the session accesses the - * partition for the first time. So it should be okay to get this lock from - * PG perspective. Even though we diverge from PG behavior for concurrent - * modifications on partitions vs CREATE/DROP partitions, we consider this as - * a reasonable trade-off to avoid distributed deadlocks. - */ - LockRelationOid(parentRelationId, AccessShareLock); - } } /* @@ -1610,11 +1595,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) /* acquire the locks for both the remote and local tasks */ List *taskList = execution->remoteAndLocalTaskList; - if (modLevel == ROW_MODIFY_NONE) - { - LockParentShardResouceIfPartitionTaskList(taskList); - } - if (modLevel <= ROW_MODIFY_READONLY && !SelectForUpdateOnReferenceTable(taskList)) { @@ -1644,34 +1624,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution) } -/* - * LockParentShardResouceIfPartitionTaskList locks the parent shard - * resource if the given taskList is on a partition table. - */ -static void -LockParentShardResouceIfPartitionTaskList(List *taskList) -{ - if (list_length(taskList) < 1) - { - return; - } - - Task *task = (Task *) linitial(taskList); - uint64 shardId = task->anchorShardId; - if (shardId == INVALID_SHARD_ID) - { - return; - } - - ShardInterval *shardInterval = LoadShardInterval(shardId); - Oid relationId = shardInterval->relationId; - if (PartitionTable(relationId)) - { - LockParentShardResourceIfPartition(shardId, AccessExclusiveLock); - } -} - - /* * FinishDistributedExecution cleans up resources associated with a * distributed execution. diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index bf581f26a..fc1f6b703 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -168,6 +168,23 @@ AcquireExecutorMultiShardLocks(List *taskList) } +void +LockParentShardResourcesForShardsOfPartition(Oid relationId) +{ + if (!PartitionTable(relationId) || !IsCitusTable(relationId)) + { + return; + } + + List *shardIntervalList = LoadShardIntervalList(relationId); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + LockParentShardResourceIfPartition(shardInterval->shardId, AccessExclusiveLock); + } +} + + /* * RequiresConsistentSnapshot returns true if the given task need to take * the necessary locks to ensure that a subquery in the modify query diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index ab5ff7be6..04355d04f 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -45,6 +45,7 @@ #include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" +#include "distributed/resource_lock.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" @@ -332,6 +333,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName, deletableShardIntervalList); bool shouldExecuteTasksLocally = ShouldExecuteTasksLocally(dropTaskList); + Task *task = NULL; foreach_ptr(task, dropTaskList) { diff --git a/src/include/distributed/distributed_execution_locks.h b/src/include/distributed/distributed_execution_locks.h index eaa51676a..90ab709cf 100644 --- a/src/include/distributed/distributed_execution_locks.h +++ b/src/include/distributed/distributed_execution_locks.h @@ -19,6 +19,7 @@ extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel); extern void AcquireExecutorMultiShardLocks(List *taskList); extern void AcquireMetadataLocks(List *taskList); +extern void LockParentShardResourcesForShardsOfPartition(Oid relationId); extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode); extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);