From 2639149bd850542373eeeb03aae482da2342cd72 Mon Sep 17 00:00:00 2001 From: velioglu Date: Wed, 22 Aug 2018 20:25:25 +0300 Subject: [PATCH] Enterprise functions about metadata/resource locks --- .../distributed/master/master_repair_shards.c | 49 +++++++++++++++++++ src/backend/distributed/utils/resource_lock.c | 40 +++++++++++++++ src/include/distributed/master_protocol.h | 1 + src/include/distributed/resource_lock.h | 1 + 4 files changed, 91 insertions(+) diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 40ecce5a4..666505e61 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -24,6 +24,7 @@ #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/metadata_sync.h" #include "distributed/multi_router_executor.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -123,6 +124,54 @@ master_move_shard_placement(PG_FUNCTION_ARGS) } +/* + * BlockWritesToShardList blocks writes to all shards in the given shard + * list. The function assumes that all the shards in the list are colocated. + */ +void +BlockWritesToShardList(List *shardList) +{ + ListCell *shardCell = NULL; + + bool shouldSyncMetadata = false; + ShardInterval *firstShardInterval = NULL; + Oid firstDistributedTableId = InvalidOid; + + foreach(shardCell, shardList) + { + ShardInterval *shard = (ShardInterval *) lfirst(shardCell); + + /* + * We need to lock the referenced reference table metadata to avoid + * asynchronous shard copy in case of cascading DML operations. + */ + LockReferencedReferenceShardDistributionMetadata(shard->shardId, + ExclusiveLock); + + LockShardDistributionMetadata(shard->shardId, ExclusiveLock); + } + + /* following code relies on the list to have at least one shard */ + if (list_length(shardList) == 0) + { + return; + } + + /* + * Since the function assumes that the input shards are colocated, + * calculating shouldSyncMetadata for a single table is sufficient. + */ + firstShardInterval = (ShardInterval *) linitial(shardList); + firstDistributedTableId = firstShardInterval->relationId; + + shouldSyncMetadata = ShouldSyncTableMetadata(firstDistributedTableId); + if (shouldSyncMetadata) + { + LockShardListMetadataOnWorkers(ExclusiveLock, shardList); + } +} + + /* * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 06fd61a99..7a8c18421 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -122,6 +122,46 @@ lock_shard_resources(PG_FUNCTION_ARGS) } +/* + * LockShardListMetadataOnWorkers acquires the matadata locks for the specified shards on + * metadata workers. Note that the function does not sort the shard list, therefore the + * caller should sort the shard list in order to avoid deadlocks. + */ +void +LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList) +{ + StringInfo lockCommand = makeStringInfo(); + ListCell *shardIntervalCell = NULL; + int processedShardIntervalCount = 0; + int totalShardIntervalCount = list_length(shardIntervalList); + + if (list_length(shardIntervalList) == 0) + { + return; + } + + appendStringInfo(lockCommand, "SELECT lock_shard_metadata(%d, ARRAY[", lockmode); + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + int64 shardId = shardInterval->shardId; + + appendStringInfo(lockCommand, "%lu", shardId); + + processedShardIntervalCount++; + if (processedShardIntervalCount != totalShardIntervalCount) + { + appendStringInfo(lockCommand, ", "); + } + } + + appendStringInfo(lockCommand, "])"); + + SendCommandToWorkers(WORKERS_WITH_METADATA, lockCommand->data); +} + + /* * IntToLockMode verifies whether the specified integer is an accepted lock mode * and returns it as a LOCKMODE enum. diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index a9d2b4cb9..37a2349a4 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -172,5 +172,6 @@ extern void CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInt extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 nodePort, bool missingOk); +extern void BlockWritesToShardList(List *shardList); #endif /* MASTER_PROTOCOL_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index a83519088..1726c9c33 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -66,6 +66,7 @@ typedef enum AdvisoryLocktagClass /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); extern bool TryLockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); +extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList); /* Lock shard/relation metadata of the referenced reference table if exists */ extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE