mirror of https://github.com/citusdata/citus.git
Concurrent shard move/copy and colocated table creation fix
It turns out that create_distributed_table and citus_move/copy_shard_placement does not work well concurrently. To fix that, we need to acquire a lock, which sounds like a good use of colocation lock. However, the current usage of colocation lock is limited to higher level UDFs like rebalance_table_shards etc. Those usage of lock is still useful, but we cannot acquire the same lock on citus_move_shard_placement etc. because the coordinator connects to itself to acquire the lock. Hence, the high level UDF blocks itself. To fix that, we use one more colocation lock, with the placements are the main objects to consider.improve_locking_try_3
parent
c085ac026a
commit
360fd790c6
|
@ -60,6 +60,7 @@
|
|||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -850,6 +851,12 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
|||
|
||||
if (colocatedTableId != InvalidOid)
|
||||
{
|
||||
/*
|
||||
* TODO: add commnts
|
||||
*/
|
||||
AcquirePlacementColocationLock(colocatedTableId, ShareLock,
|
||||
"colocate distributed table");
|
||||
|
||||
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
|
||||
}
|
||||
else
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#include "distributed/reference_table_utils.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_rebalancer.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
|
@ -165,6 +166,9 @@ citus_copy_shard_placement(PG_FUNCTION_ARGS)
|
|||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
ErrorIfTableCannotBeReplicated(shardInterval->relationId);
|
||||
|
||||
AcquirePlacementColocationLock(shardInterval->relationId, ExclusiveLock,
|
||||
doRepair ? "repair" : "copy");
|
||||
|
||||
if (doRepair)
|
||||
{
|
||||
RepairShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName,
|
||||
|
@ -318,6 +322,8 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
ErrorIfMoveUnsupportedTableType(relationId);
|
||||
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
||||
|
||||
AcquirePlacementColocationLock(relationId, ExclusiveLock, "move");
|
||||
|
||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||
Oid distributedTableId = shardInterval->relationId;
|
||||
|
||||
|
|
|
@ -230,7 +230,7 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context);
|
|||
static ShardCost GetShardCost(uint64 shardId, void *context);
|
||||
static List * NonColocatedDistRelationIdList(void);
|
||||
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
|
||||
static void AcquireColocationLock(Oid relationId, const char *operationName);
|
||||
static void AcquireRelationColocationLock(Oid relationId, const char *operationName);
|
||||
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
|
||||
shardReplicationModeOid, char *noticeOperation);
|
||||
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
|
||||
|
@ -619,13 +619,13 @@ GetColocatedRebalanceSteps(List *placementUpdateList)
|
|||
|
||||
|
||||
/*
|
||||
* AcquireColocationLock tries to acquire a lock for rebalance/replication. If
|
||||
* this is it not possible it fails instantly because this means another
|
||||
* rebalance/replication is currently happening. This would really mess up
|
||||
* planning.
|
||||
* AcquireRelationColocationLock tries to acquire a lock for
|
||||
* rebalance/replication. If this is it not possible it fails
|
||||
* instantly because this means another rebalance/replication
|
||||
* is currently happening. This would really mess up planning.
|
||||
*/
|
||||
static void
|
||||
AcquireColocationLock(Oid relationId, const char *operationName)
|
||||
AcquireRelationColocationLock(Oid relationId, const char *operationName)
|
||||
{
|
||||
uint32 lockId = relationId;
|
||||
LOCKTAG tag;
|
||||
|
@ -636,7 +636,7 @@ AcquireColocationLock(Oid relationId, const char *operationName)
|
|||
lockId = citusTableCacheEntry->colocationId;
|
||||
}
|
||||
|
||||
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) lockId);
|
||||
SET_LOCKTAG_REBALANCE_TABLE_COLOCATION(tag, (int64) lockId);
|
||||
|
||||
LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, false, true);
|
||||
if (!lockAcquired)
|
||||
|
@ -648,6 +648,37 @@ AcquireColocationLock(Oid relationId, const char *operationName)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquirePlacementColocationLock tries to acquire a lock for
|
||||
* rebalance/replication while moving/copying the placement. If this
|
||||
* is it not possible it fails instantly because this means
|
||||
* another move/copy is currently happening. This would really mess up planning.
|
||||
*/
|
||||
void
|
||||
AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
||||
const char *operationName)
|
||||
{
|
||||
uint32 lockId = relationId;
|
||||
LOCKTAG tag;
|
||||
|
||||
CitusTableCacheEntry *citusTableCacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
if (citusTableCacheEntry->colocationId != INVALID_COLOCATION_ID)
|
||||
{
|
||||
lockId = citusTableCacheEntry->colocationId;
|
||||
}
|
||||
|
||||
SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, (int64) lockId);
|
||||
|
||||
LockAcquireResult lockAcquired = LockAcquire(&tag, lockMode, false, true);
|
||||
if (!lockAcquired)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
|
||||
operationName, generate_qualified_relation_name(
|
||||
relationId))));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetResponsiveWorkerList returns a List of workers that respond to new
|
||||
* connection requests.
|
||||
|
@ -945,7 +976,7 @@ replicate_table_shards(PG_FUNCTION_ARGS)
|
|||
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||
|
||||
AcquireColocationLock(relationId, "replicate");
|
||||
AcquireRelationColocationLock(relationId, "replicate");
|
||||
|
||||
List *activeWorkerList = SortedActiveWorkers();
|
||||
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
|
||||
|
@ -1558,7 +1589,7 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
|||
|
||||
foreach_oid(relationId, options->relationIdList)
|
||||
{
|
||||
AcquireColocationLock(relationId, operationName);
|
||||
AcquireRelationColocationLock(relationId, operationName);
|
||||
}
|
||||
|
||||
List *placementUpdateList = GetRebalanceSteps(options);
|
||||
|
|
|
@ -441,7 +441,7 @@ LockColocationId(int colocationId, LOCKMODE lockMode)
|
|||
const bool sessionLock = false;
|
||||
const bool dontWait = false;
|
||||
|
||||
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId);
|
||||
SET_LOCKTAG_REBALANCE_TABLE_COLOCATION(tag, (int64) colocationId);
|
||||
(void) LockAcquire(&tag, lockMode, sessionLock, dontWait);
|
||||
}
|
||||
|
||||
|
@ -455,7 +455,7 @@ UnlockColocationId(int colocationId, LOCKMODE lockMode)
|
|||
LOCKTAG tag;
|
||||
const bool sessionLock = false;
|
||||
|
||||
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId);
|
||||
SET_LOCKTAG_REBALANCE_TABLE_COLOCATION(tag, (int64) colocationId);
|
||||
LockRelease(&tag, lockMode, sessionLock);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,11 +37,12 @@ typedef enum AdvisoryLocktagClass
|
|||
ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4,
|
||||
ADV_LOCKTAG_CLASS_CITUS_SHARD = 5,
|
||||
ADV_LOCKTAG_CLASS_CITUS_JOB = 6,
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_RELATION_COLOCATION = 7,
|
||||
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
|
||||
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
|
||||
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
|
||||
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12
|
||||
ADV_LOCKTAG_CLASS_CITUS_LOGICAL_REPLICATION = 12,
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION = 13
|
||||
} AdvisoryLocktagClass;
|
||||
|
||||
/* CitusOperations has constants for citus operations */
|
||||
|
@ -77,12 +78,23 @@ typedef enum CitusOperations
|
|||
/* reuse advisory lock, but with different, unused field 4 (7)
|
||||
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
|
||||
* are local to each database */
|
||||
#define SET_LOCKTAG_REBALANCE_COLOCATION(tag, colocationOrTableId) \
|
||||
#define SET_LOCKTAG_REBALANCE_TABLE_COLOCATION(tag, colocationOrTableId) \
|
||||
SET_LOCKTAG_ADVISORY(tag, \
|
||||
MyDatabaseId, \
|
||||
(uint32) ((colocationOrTableId) >> 32), \
|
||||
(uint32) (colocationOrTableId), \
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION)
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_RELATION_COLOCATION)
|
||||
|
||||
/* reuse advisory lock, but with different, unused field 4 (13)
|
||||
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
|
||||
* are local to each database */
|
||||
#define SET_LOCKTAG_REBALANCE_PLACEMENT_COLOCATION(tag, colocationOrTableId) \
|
||||
SET_LOCKTAG_ADVISORY(tag, \
|
||||
MyDatabaseId, \
|
||||
(uint32) ((colocationOrTableId) >> 32), \
|
||||
(uint32) (colocationOrTableId), \
|
||||
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_PLACEMENT_COLOCATION)
|
||||
|
||||
|
||||
/* advisory lock for citus operations, also it has the database hardcoded to MyDatabaseId,
|
||||
* to ensure the locks are local to each database */
|
||||
|
|
|
@ -194,6 +194,7 @@ extern List * RebalancePlacementUpdates(List *workerNodeList,
|
|||
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
|
||||
int shardReplicationFactor);
|
||||
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
||||
|
||||
extern void AcquirePlacementColocationLock(Oid relationId, int lockMode,
|
||||
const char *operationName);
|
||||
|
||||
#endif /* SHARD_REBALANCER_H */
|
||||
|
|
|
@ -23,12 +23,11 @@ master_copy_shard_placement
|
|||
|
||||
step s1-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
|
||||
ERROR: could not acquire the lock required to repair public.test_hash_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-repair-placement: <... completed>
|
||||
ERROR: target placement must be in inactive state
|
||||
|
||||
starting permutation: s2-set-placement-inactive s2-begin s2-repair-placement s1-repair-placement s2-commit
|
||||
step s2-set-placement-inactive:
|
||||
|
@ -47,9 +46,8 @@ master_copy_shard_placement
|
|||
|
||||
step s1-repair-placement:
|
||||
SELECT master_copy_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
|
||||
ERROR: could not acquire the lock required to repair public.test_hash_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-repair-placement: <... completed>
|
||||
ERROR: target placement must be in inactive state
|
||||
|
|
|
@ -594,16 +594,11 @@ step s2-move-placement:
|
|||
SELECT master_move_shard_placement(
|
||||
get_shard_id_for_distribution_column('logical_replicate_placement', 4),
|
||||
'localhost', 57637, 'localhost', 57638);
|
||||
<waiting ...>
|
||||
|
||||
ERROR: could not acquire the lock required to move public.logical_replicate_placement
|
||||
step s1-end:
|
||||
COMMIT;
|
||||
|
||||
step s2-move-placement: <... completed>
|
||||
master_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-end:
|
||||
COMMIT;
|
||||
|
||||
|
|
|
@ -17,12 +17,11 @@ master_move_shard_placement
|
|||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
<waiting ...>
|
||||
|
||||
ERROR: could not acquire the lock required to move public.test_move_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
ERROR: source placement must be in active state
|
||||
step s2-print-placements:
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
|
@ -55,12 +54,11 @@ master_move_shard_placement
|
|||
|
||||
step s1-move-placement:
|
||||
SELECT master_move_shard_placement((SELECT * FROM selected_shard_for_test_table), 'localhost', 57637, 'localhost', 57638, 'force_logical');
|
||||
<waiting ...>
|
||||
|
||||
ERROR: could not acquire the lock required to move public.test_move_table
|
||||
step s2-commit:
|
||||
COMMIT;
|
||||
|
||||
step s1-move-placement: <... completed>
|
||||
ERROR: source placement must be in active state
|
||||
step s2-print-placements:
|
||||
SELECT
|
||||
nodename, nodeport, count(*)
|
||||
|
|
Loading…
Reference in New Issue