mirror of https://github.com/citusdata/citus.git
Enterprise functions about metadata/resource locks
parent
6cea01620a
commit
2639149bd8
|
@ -24,6 +24,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
|
@ -123,6 +124,54 @@ master_move_shard_placement(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BlockWritesToShardList blocks writes to all shards in the given shard
|
||||
* list. The function assumes that all the shards in the list are colocated.
|
||||
*/
|
||||
void
|
||||
BlockWritesToShardList(List *shardList)
|
||||
{
|
||||
ListCell *shardCell = NULL;
|
||||
|
||||
bool shouldSyncMetadata = false;
|
||||
ShardInterval *firstShardInterval = NULL;
|
||||
Oid firstDistributedTableId = InvalidOid;
|
||||
|
||||
foreach(shardCell, shardList)
|
||||
{
|
||||
ShardInterval *shard = (ShardInterval *) lfirst(shardCell);
|
||||
|
||||
/*
|
||||
* We need to lock the referenced reference table metadata to avoid
|
||||
* asynchronous shard copy in case of cascading DML operations.
|
||||
*/
|
||||
LockReferencedReferenceShardDistributionMetadata(shard->shardId,
|
||||
ExclusiveLock);
|
||||
|
||||
LockShardDistributionMetadata(shard->shardId, ExclusiveLock);
|
||||
}
|
||||
|
||||
/* following code relies on the list to have at least one shard */
|
||||
if (list_length(shardList) == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Since the function assumes that the input shards are colocated,
|
||||
* calculating shouldSyncMetadata for a single table is sufficient.
|
||||
*/
|
||||
firstShardInterval = (ShardInterval *) linitial(shardList);
|
||||
firstDistributedTableId = firstShardInterval->relationId;
|
||||
|
||||
shouldSyncMetadata = ShouldSyncTableMetadata(firstDistributedTableId);
|
||||
if (shouldSyncMetadata)
|
||||
{
|
||||
LockShardListMetadataOnWorkers(ExclusiveLock, shardList);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
|
||||
* values to a char.
|
||||
|
|
|
@ -122,6 +122,46 @@ lock_shard_resources(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockShardListMetadataOnWorkers acquires the matadata locks for the specified shards on
|
||||
* metadata workers. Note that the function does not sort the shard list, therefore the
|
||||
* caller should sort the shard list in order to avoid deadlocks.
|
||||
*/
|
||||
void
|
||||
LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList)
|
||||
{
|
||||
StringInfo lockCommand = makeStringInfo();
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
int processedShardIntervalCount = 0;
|
||||
int totalShardIntervalCount = list_length(shardIntervalList);
|
||||
|
||||
if (list_length(shardIntervalList) == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
appendStringInfo(lockCommand, "SELECT lock_shard_metadata(%d, ARRAY[", lockmode);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
int64 shardId = shardInterval->shardId;
|
||||
|
||||
appendStringInfo(lockCommand, "%lu", shardId);
|
||||
|
||||
processedShardIntervalCount++;
|
||||
if (processedShardIntervalCount != totalShardIntervalCount)
|
||||
{
|
||||
appendStringInfo(lockCommand, ", ");
|
||||
}
|
||||
}
|
||||
|
||||
appendStringInfo(lockCommand, "])");
|
||||
|
||||
SendCommandToWorkers(WORKERS_WITH_METADATA, lockCommand->data);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IntToLockMode verifies whether the specified integer is an accepted lock mode
|
||||
* and returns it as a LOCKMODE enum.
|
||||
|
|
|
@ -172,5 +172,6 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt
|
|||
extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList,
|
||||
char *nodeName, uint32 nodePort,
|
||||
bool missingOk);
|
||||
extern void BlockWritesToShardList(List *shardList);
|
||||
|
||||
#endif /* MASTER_PROTOCOL_H */
|
||||
|
|
|
@ -66,6 +66,7 @@ typedef enum AdvisoryLocktagClass
|
|||
/* Lock shard/relation metadata for safe modifications */
|
||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
|
||||
|
||||
/* Lock shard/relation metadata of the referenced reference table if exists */
|
||||
extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE
|
||||
|
|
Loading…
Reference in New Issue