mirror of https://github.com/citusdata/citus.git
wip
parent
f745b3fae8
commit
aeb644be5a
|
@ -60,6 +60,7 @@
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
|
#include "distributed/shard_rebalancer.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_shard_visibility.h"
|
#include "distributed/worker_shard_visibility.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -851,6 +852,13 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||||
|
|
||||||
if (colocatedTableId != InvalidOid)
|
if (colocatedTableId != InvalidOid)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* If there is a concurrent shard move on this
|
||||||
|
* colocation group fail this backend. But, allow
|
||||||
|
* concurrent colocated table creation.
|
||||||
|
*/
|
||||||
|
AcquireColocationLock(colocatedTableId, ShareLock, "colocate distributed table");
|
||||||
|
|
||||||
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
|
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "distributed/shard_rebalancer.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
@ -318,11 +319,12 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
ErrorIfMoveUnsupportedTableType(relationId);
|
ErrorIfMoveUnsupportedTableType(relationId);
|
||||||
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
ErrorIfTargetNodeIsNotSafeToMove(targetNodeName, targetNodePort);
|
||||||
|
|
||||||
|
AcquireColocationLock(relationId, ExclusiveLock, "move placement");
|
||||||
|
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
|
|
||||||
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
List *colocatedTableList = ColocatedTableList(distributedTableId);
|
||||||
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
|
||||||
|
|
||||||
foreach(colocatedTableCell, colocatedTableList)
|
foreach(colocatedTableCell, colocatedTableList)
|
||||||
{
|
{
|
||||||
|
@ -351,6 +353,7 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
/* we sort colocatedShardList so that lock operations will not cause any deadlocks */
|
||||||
|
List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
|
||||||
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
|
||||||
foreach(colocatedShardCell, colocatedShardList)
|
foreach(colocatedShardCell, colocatedShardList)
|
||||||
{
|
{
|
||||||
|
|
|
@ -230,7 +230,6 @@ static float4 NodeCapacity(WorkerNode *workerNode, void *context);
|
||||||
static ShardCost GetShardCost(uint64 shardId, void *context);
|
static ShardCost GetShardCost(uint64 shardId, void *context);
|
||||||
static List * NonColocatedDistRelationIdList(void);
|
static List * NonColocatedDistRelationIdList(void);
|
||||||
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
|
static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid);
|
||||||
static void AcquireColocationLock(Oid relationId, const char *operationName);
|
|
||||||
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
|
static void ExecutePlacementUpdates(List *placementUpdateList, Oid
|
||||||
shardReplicationModeOid, char *noticeOperation);
|
shardReplicationModeOid, char *noticeOperation);
|
||||||
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
|
static float4 CalculateUtilization(float4 totalCost, float4 capacity);
|
||||||
|
@ -621,11 +620,11 @@ GetColocatedRebalanceSteps(List *placementUpdateList)
|
||||||
/*
|
/*
|
||||||
* AcquireColocationLock tries to acquire a lock for rebalance/replication. If
|
* AcquireColocationLock tries to acquire a lock for rebalance/replication. If
|
||||||
* this is it not possible it fails instantly because this means another
|
* this is it not possible it fails instantly because this means another
|
||||||
* rebalance/replication is currently happening. This would really mess up
|
* rebalance/replication/colocated table creation is currently happening.
|
||||||
* planning.
|
* This would really mess up planning, could even cause lost placements.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
AcquireColocationLock(Oid relationId, const char *operationName)
|
AcquireColocationLock(Oid relationId, int lockmode, const char *operationName)
|
||||||
{
|
{
|
||||||
uint32 lockId = relationId;
|
uint32 lockId = relationId;
|
||||||
LOCKTAG tag;
|
LOCKTAG tag;
|
||||||
|
@ -638,12 +637,17 @@ AcquireColocationLock(Oid relationId, const char *operationName)
|
||||||
|
|
||||||
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) lockId);
|
SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) lockId);
|
||||||
|
|
||||||
LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, false, true);
|
LockAcquireResult lockAcquired = LockAcquire(&tag, lockmode, false, true);
|
||||||
if (!lockAcquired)
|
if (!lockAcquired)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
|
ereport(ERROR, (errmsg("could not acquire the lock required to %s %s",
|
||||||
operationName, generate_qualified_relation_name(
|
operationName, generate_qualified_relation_name(
|
||||||
relationId))));
|
relationId)),
|
||||||
|
errdetail("It means that either a concurrent shard move "
|
||||||
|
"or colocated distributed table creation is "
|
||||||
|
"happening."),
|
||||||
|
errhint("Make sure that the concurrent operation has "
|
||||||
|
"finished and re-run the command")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -945,7 +949,7 @@ replicate_table_shards(PG_FUNCTION_ARGS)
|
||||||
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
|
char transferMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||||
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
EnsureReferenceTablesExistOnAllNodesExtended(transferMode);
|
||||||
|
|
||||||
AcquireColocationLock(relationId, "replicate");
|
AcquireColocationLock(relationId, ExclusiveLock, "replicate");
|
||||||
|
|
||||||
List *activeWorkerList = SortedActiveWorkers();
|
List *activeWorkerList = SortedActiveWorkers();
|
||||||
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
|
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
|
||||||
|
@ -1558,7 +1562,7 @@ RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid)
|
||||||
|
|
||||||
foreach_oid(relationId, options->relationIdList)
|
foreach_oid(relationId, options->relationIdList)
|
||||||
{
|
{
|
||||||
AcquireColocationLock(relationId, operationName);
|
AcquireColocationLock(relationId, ExclusiveLock, operationName);
|
||||||
}
|
}
|
||||||
|
|
||||||
List *placementUpdateList = GetRebalanceSteps(options);
|
List *placementUpdateList = GetRebalanceSteps(options);
|
||||||
|
|
|
@ -194,6 +194,8 @@ extern List * RebalancePlacementUpdates(List *workerNodeList,
|
||||||
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
|
extern List * ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
|
||||||
int shardReplicationFactor);
|
int shardReplicationFactor);
|
||||||
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
extern void ExecuteRebalancerCommandInSeparateTransaction(char *command);
|
||||||
|
extern void AcquireColocationLock(Oid relationId, int lockmode,
|
||||||
|
const char *operationName);
|
||||||
|
|
||||||
|
|
||||||
#endif /* SHARD_REBALANCER_H */
|
#endif /* SHARD_REBALANCER_H */
|
||||||
|
|
Loading…
Reference in New Issue