mirror of https://github.com/citusdata/citus.git
Acquire relation locks on partitions while operation on parent table
parent
52b9e35d50
commit
a321e750c0
|
@ -14,8 +14,10 @@
|
|||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "executor/executor.h"
|
||||
#include "nodes/execnodes.h"
|
||||
|
@ -60,6 +62,16 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
|
|||
|
||||
ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator")));
|
||||
|
||||
/*
|
||||
* If we are dealing with partitioned table, we also need to lock its
|
||||
* partitions. Here we only lock targetRelation, we acquire necessary
|
||||
* locks on selected tables during execution of those select queries.
|
||||
*/
|
||||
if (PartitionedTable(targetRelationId))
|
||||
{
|
||||
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||
}
|
||||
|
||||
ExecuteSelectIntoRelation(targetRelationId, insertTargetList, selectQuery,
|
||||
executorState);
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "distributed/multi_resowner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "executor/execdebug.h"
|
||||
#include "commands/copy.h"
|
||||
|
@ -248,6 +249,9 @@ RealTimeExecScan(CustomScanState *node)
|
|||
MultiPlan *multiPlan = scanState->multiPlan;
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
|
||||
/* we are taking locks on partitions of partitioned tables */
|
||||
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||
|
||||
PrepareMasterJobDirectory(workerJob);
|
||||
MultiRealTimeExecute(workerJob);
|
||||
|
||||
|
@ -445,6 +449,9 @@ TaskTrackerExecScan(CustomScanState *node)
|
|||
MultiPlan *multiPlan = scanState->multiPlan;
|
||||
Job *workerJob = multiPlan->workerJob;
|
||||
|
||||
/* we are taking locks on partitions of partitioned tables */
|
||||
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||
|
||||
PrepareMasterJobDirectory(workerJob);
|
||||
MultiTaskTrackerExecute(workerJob);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
|
@ -98,6 +99,7 @@ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connect
|
|||
bool failOnError, int64 *rows);
|
||||
static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError,
|
||||
int64 *rows);
|
||||
static LOCKMODE LockModeForModifyTask(Task *task);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -434,6 +436,12 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
|
|||
/* prevent concurrent placement changes */
|
||||
AcquireMetadataLocks(taskList);
|
||||
|
||||
/*
|
||||
* We are taking locks on partitions of partitioned tables. These locks are
|
||||
* necessary for locking tables that appear in the SELECT part of the query.
|
||||
*/
|
||||
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||
|
||||
/* modify tasks are always assigned using first-replica policy */
|
||||
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
|
||||
}
|
||||
|
@ -519,6 +527,9 @@ RouterSelectExecScan(CustomScanState *node)
|
|||
Job *workerJob = multiPlan->workerJob;
|
||||
List *taskList = workerJob->taskList;
|
||||
|
||||
/* we are taking locks on partitions of partitioned tables */
|
||||
LockPartitionsInRelationList(multiPlan->relationIdList, AccessShareLock);
|
||||
|
||||
if (list_length(taskList) > 0)
|
||||
{
|
||||
Task *task = (Task *) linitial(taskList);
|
||||
|
@ -682,6 +693,9 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
|||
char *queryString = task->queryString;
|
||||
bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC);
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(task->anchorShardId);
|
||||
Oid relationId = shardInterval->relationId;
|
||||
|
||||
/*
|
||||
* Modifications for reference tables are always done using 2PC. First
|
||||
* ensure that distributed transaction is started. Then force the
|
||||
|
@ -712,6 +726,17 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResult
|
|||
*/
|
||||
connectionList = GetModifyConnections(task, taskRequiresTwoPhaseCommit);
|
||||
|
||||
/*
|
||||
* If we are dealing with a partitioned table, we also need to lock its
|
||||
* partitions.
|
||||
*/
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
LOCKMODE lockMode = LockModeForModifyTask(task);
|
||||
|
||||
LockPartitionRelations(relationId, lockMode);
|
||||
}
|
||||
|
||||
/* prevent replicas of the same shard from diverging */
|
||||
AcquireExecutorShardLock(task, operation);
|
||||
|
||||
|
@ -947,6 +972,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
int64 totalAffectedTupleCount = 0;
|
||||
ListCell *taskCell = NULL;
|
||||
Task *firstTask = NULL;
|
||||
ShardInterval *firstShardInterval = NULL;
|
||||
int connectionFlags = 0;
|
||||
List *affectedTupleCountList = NIL;
|
||||
HTAB *shardConnectionHash = NULL;
|
||||
|
@ -958,13 +984,25 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
|||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* In multi shard modification, we expect that all tasks operates on the
|
||||
* same relation, so it is enough to acquire a lock on the first task's
|
||||
* anchor relation's partitions.
|
||||
*/
|
||||
firstTask = (Task *) linitial(taskList);
|
||||
firstShardInterval = LoadShardInterval(firstTask->anchorShardId);
|
||||
if (PartitionedTable(firstShardInterval->relationId))
|
||||
{
|
||||
LOCKMODE lockMode = LockModeForModifyTask(firstTask);
|
||||
|
||||
LockPartitionRelations(firstShardInterval->relationId, lockMode);
|
||||
}
|
||||
|
||||
/* ensure that there are no concurrent modifications on the same shards */
|
||||
AcquireExecutorMultiShardLocks(taskList);
|
||||
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
||||
firstTask = (Task *) linitial(taskList);
|
||||
|
||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
||||
firstTask->replicationModel == REPLICATION_MODEL_2PC)
|
||||
{
|
||||
|
@ -1464,3 +1502,29 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
|
|||
|
||||
return gotResponse && !commandFailed;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockModeForRouterModifyTask returns appropriate LOCKMODE for given router
|
||||
* modify task.
|
||||
*/
|
||||
static LOCKMODE
|
||||
LockModeForModifyTask(Task *task)
|
||||
{
|
||||
LOCKMODE lockMode = NoLock;
|
||||
if (task->taskType == DDL_TASK)
|
||||
{
|
||||
lockMode = AccessExclusiveLock;
|
||||
}
|
||||
else if (task->taskType == MODIFY_TASK)
|
||||
{
|
||||
lockMode = RowExclusiveLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* we do not allow any other task type in these code path */
|
||||
Assert(false);
|
||||
}
|
||||
|
||||
return lockMode;
|
||||
}
|
||||
|
|
|
@ -17,11 +17,13 @@
|
|||
#include "c.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
@ -316,3 +318,69 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockParentShardResourceIfPartition checks whether given shard belongs to a
|
||||
* partition. If it is, LockParentShardResourceIfPartition acquires a shard
|
||||
* resource lock on the colocated shard of the parent table.
|
||||
*/
|
||||
void
|
||||
LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode)
|
||||
{
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid relationId = shardInterval->relationId;
|
||||
|
||||
if (PartitionTable(relationId))
|
||||
{
|
||||
int shardIndex = ShardIndex(shardInterval);
|
||||
Oid parentRelationId = PartitionParentOid(relationId);
|
||||
uint64 parentShardId = ColocatedShardIdInRelation(parentRelationId, shardIndex);
|
||||
|
||||
LockShardResource(parentShardId, lockMode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockPartitionsInRelationList iterates over given list and acquires locks on
|
||||
* partitions of each partitioned table. It does nothing for non-partitioned tables.
|
||||
*/
|
||||
void
|
||||
LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode)
|
||||
{
|
||||
ListCell *relationIdCell = NULL;
|
||||
|
||||
foreach(relationIdCell, relationIdList)
|
||||
{
|
||||
Oid relationId = lfirst_oid(relationIdCell);
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
LockPartitionRelations(relationId, lockmode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockPartitionRelations acquires relation lock on all partitions of given
|
||||
* partitioned relation. This function expects that given relation is a
|
||||
* partitioned relation.
|
||||
*/
|
||||
void
|
||||
LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
|
||||
{
|
||||
/*
|
||||
* PartitionList function generates partition list in the same order
|
||||
* as PostgreSQL. Therefore we do not need to sort it before acquiring
|
||||
* locks.
|
||||
*/
|
||||
List *partitionList = PartitionList(relationId);
|
||||
ListCell *partitionCell = NULL;
|
||||
|
||||
foreach(partitionCell, partitionList)
|
||||
{
|
||||
Oid partitionRelationId = lfirst_oid(partitionCell);
|
||||
LockRelationOid(partitionRelationId, lockMode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,5 +80,8 @@ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode);
|
|||
extern void LockShardListResources(List *shardIntervalList, LOCKMODE lockMode);
|
||||
extern void LockRelationShardResources(List *relationShardList, LOCKMODE lockMode);
|
||||
|
||||
/* Lock partitions of partitioned table */
|
||||
extern void LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode);
|
||||
extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
|
||||
|
||||
#endif /* RESOURCE_LOCK_H */
|
||||
|
|
Loading…
Reference in New Issue