Lock parent shard resources if partition

pull/5159/head
Sait Talha Nisanci 2021-08-13 17:15:18 +03:00
parent d3faf974ad
commit d8d86629e5
6 changed files with 28 additions and 48 deletions

View File

@ -58,6 +58,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_library_init.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"

View File

@ -26,6 +26,7 @@
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_sync.h"
@ -138,6 +139,8 @@ PreprocessDropTableStmt(Node *node, const char *queryString,
continue;
}
LockParentShardResourcesForShardsOfPartition(relationId);
if (IsCitusTableType(relationId, REFERENCE_TABLE))
{
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
@ -367,6 +370,8 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
parentDistributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}
LockParentShardResourcesForShardsOfPartition(relationId);
}
@ -444,6 +449,7 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
distributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}
LockParentShardResourcesForShardsOfPartition(partitionRelationId);
}
}
@ -849,6 +855,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
Assert(list_length(commandList) <= 1);
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
LockParentShardResourcesForShardsOfPartition(rightRelationId);
}
else if (AlterTableCommandTypeIsTrigger(alterTableType))
{

View File

@ -704,7 +704,6 @@ static void SetAttributeInputMetadata(DistributedExecution *execution,
static void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
int *nodePort);
static bool IsDummyPlacement(ShardPlacement *taskPlacement);
static void LockParentShardResouceIfPartitionTaskList(List *taskList);
/*
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
@ -1560,20 +1559,6 @@ LockPartitionsForDistributedPlan(DistributedPlan *distributedPlan)
Oid targetRelationId = distributedPlan->targetRelationId;
LockPartitionsInRelationList(list_make1_oid(targetRelationId), RowExclusiveLock);
if (PartitionTable(targetRelationId))
{
Oid parentRelationId = PartitionParentOid(targetRelationId);
/*
* Postgres only takes the lock on parent when the session accesses the
* partition for the first time. So it should be okay to get this lock from
* PG perspective. Even though we diverge from PG behavior for concurrent
* modifications on partitions vs CREATE/DROP partitions, we consider this as
* a reasonable trade-off to avoid distributed deadlocks.
*/
LockRelationOid(parentRelationId, AccessShareLock);
}
}
/*
@ -1610,11 +1595,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
/* acquire the locks for both the remote and local tasks */
List *taskList = execution->remoteAndLocalTaskList;
if (modLevel == ROW_MODIFY_NONE)
{
LockParentShardResouceIfPartitionTaskList(taskList);
}
if (modLevel <= ROW_MODIFY_READONLY &&
!SelectForUpdateOnReferenceTable(taskList))
{
@ -1644,34 +1624,6 @@ AcquireExecutorShardLocksForExecution(DistributedExecution *execution)
}
/*
* LockParentShardResouceIfPartitionTaskList locks the parent shard
* resource if the given taskList is on a partition table.
*/
static void
LockParentShardResouceIfPartitionTaskList(List *taskList)
{
if (list_length(taskList) < 1)
{
return;
}
Task *task = (Task *) linitial(taskList);
uint64 shardId = task->anchorShardId;
if (shardId == INVALID_SHARD_ID)
{
return;
}
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
if (PartitionTable(relationId))
{
LockParentShardResourceIfPartition(shardId, AccessExclusiveLock);
}
}
/*
* FinishDistributedExecution cleans up resources associated with a
* distributed execution.

View File

@ -168,6 +168,23 @@ AcquireExecutorMultiShardLocks(List *taskList)
}
void
LockParentShardResourcesForShardsOfPartition(Oid relationId)
{
if (!PartitionTable(relationId) || !IsCitusTable(relationId))
{
return;
}
List *shardIntervalList = LoadShardIntervalList(relationId);
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardIntervalList)
{
LockParentShardResourceIfPartition(shardInterval->shardId, AccessExclusiveLock);
}
}
/*
* RequiresConsistentSnapshot returns true if the given task need to take
* the necessary locks to ensure that a subquery in the modify query

View File

@ -45,6 +45,7 @@
#include "distributed/placement_connection.h"
#include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
@ -332,6 +333,7 @@ DropShards(Oid relationId, char *schemaName, char *relationName,
deletableShardIntervalList);
bool shouldExecuteTasksLocally = ShouldExecuteTasksLocally(dropTaskList);
Task *task = NULL;
foreach_ptr(task, dropTaskList)
{

View File

@ -19,6 +19,7 @@
extern void AcquireExecutorShardLocks(Task *task, RowModifyLevel modLevel);
extern void AcquireExecutorMultiShardLocks(List *taskList);
extern void AcquireMetadataLocks(List *taskList);
extern void LockParentShardResourcesForShardsOfPartition(Oid relationId);
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);