From b6930e3db98a3b4284174531f0f59c2f5b8645df Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Wed, 19 Sep 2018 21:16:02 +0300 Subject: [PATCH] Add distributed locking to truncated mx tables We acquire distributed lock on all mx nodes for truncated tables before actually doing truncate operation. This is needed for distributed serialization of the truncate command without causing a deadlock. --- .../distributed/executor/multi_utility.c | 150 ++++++++++++++++ .../master/master_modify_multiple_shards.c | 133 +------------- src/backend/distributed/utils/listutils.c | 5 + src/backend/distributed/utils/resource_lock.c | 168 ++++++++++++++++-- src/include/distributed/resource_lock.h | 4 + 5 files changed, 317 insertions(+), 143 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 0e0a60dab..91f02c3a6 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -36,6 +36,7 @@ #include "distributed/colocation_utils.h" #include "distributed/foreign_constraint.h" #include "distributed/intermediate_results.h" +#include "distributed/listutils.h" #include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" @@ -50,6 +51,7 @@ #include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/pg_dist_partition.h" #include "distributed/policy.h" +#include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" @@ -87,6 +89,9 @@ #include "utils/syscache.h" +#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" + + bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -146,6 +151,9 @@ static void ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); +static void ProcessTruncateStatement(TruncateStmt *truncateStatement); +static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); +static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt); static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement); @@ -386,6 +394,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, if (IsA(parsetree, TruncateStmt)) { ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); + ProcessTruncateStatement((TruncateStmt *) parsetree); } /* only generate worker DDLJobs if propagation is enabled */ @@ -2921,6 +2930,147 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) } +/* + * ProcessTruncateStatement handles distributed locking + * of truncated tables before standard utility takes over. + * + * Actual distributed truncation occurs inside truncate trigger. + * + * This is only for distributed serialization of truncate commands. + * The function assumes that there is no foreign key relation between + * non-distributed and distributed relations. + */ +static void +ProcessTruncateStatement(TruncateStmt *truncateStatement) +{ + LockTruncatedRelationMetadataInWorkers(truncateStatement); +} + + +/* + * LockTruncatedRelationMetadataInWorkers determines if distributed + * lock is necessary for truncated relations, and acquire locks. + */ +static void +LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) +{ + List *distributedRelationList = NIL; + ListCell *relationCell = NULL; + + /* nothing to do if there is no metadata at worker nodes */ + if (!ClusterHasKnownMetadataWorkers()) + { + return; + } + + foreach(relationCell, truncateStatement->relations) + { + RangeVar *relationRV = (RangeVar *) lfirst(relationCell); + Relation relation = heap_openrv(relationRV, NoLock); + Oid relationId = RelationGetRelid(relation); + DistTableCacheEntry *cacheEntry = NULL; + List *referencingTableList = NIL; + ListCell *referencingTableCell = NULL; + + if (!IsDistributedTable(relationId)) + { + heap_close(relation, NoLock); + continue; + } + + if (list_member_oid(distributedRelationList, relationId)) + { + heap_close(relation, NoLock); + continue; + } + + distributedRelationList = lappend_oid(distributedRelationList, relationId); + + cacheEntry = DistributedTableCacheEntry(relationId); + Assert(cacheEntry != NULL); + + referencingTableList = cacheEntry->referencingRelationsViaForeignKey; + foreach(referencingTableCell, referencingTableList) + { + Oid referencingRelationId = lfirst_oid(referencingTableCell); + distributedRelationList = list_append_unique_oid(distributedRelationList, + referencingRelationId); + } + + heap_close(relation, NoLock); + } + + if (distributedRelationList != NIL) + { + AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock); + } +} + + +/* + * AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes + * for given list of relations ids. Relation id list and worker node list + * sorted so that the lock is acquired in the same order regardless of which + * node it was run on. Notice that no lock is acquired on coordinator node. + * + * Notice that the locking functions is sent to all workers regardless of if + * it has metadata or not. This is because a worker node only knows itself + * and previous workers that has metadata sync turned on. The node does not + * know about other nodes that have metadata sync turned on afterwards. + */ +static void +AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) +{ + ListCell *relationIdCell = NULL; + List *workerNodeList = ActivePrimaryNodeList(); + const char *lockModeText = LockModeToLockModeText(lockMode); + + /* + * We want to acquire locks in the same order accross the nodes. + * Although relation ids may change, their ordering will not. + */ + relationIdList = SortList(relationIdList, CompareOids); + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + BeginOrContinueCoordinatedTransaction(); + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + + /* + * We only acquire distributed lock on relation if + * the relation is sync'ed between mx nodes. + */ + if (ShouldSyncTableMetadata(relationId)) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + StringInfo lockRelationCommand = makeStringInfo(); + ListCell *workerNodeCell = NULL; + + appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS, + qualifiedRelationName, lockModeText); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + /* if local node is one of the targets, acquire the lock locally */ + if (workerNode->groupId == GetLocalGroupId()) + { + LockRelationOid(relationId, lockMode); + continue; + } + + SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data); + } + } + } +} + + /* * OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER * SEQUENCE command, extracting the first OWNED BY option it encounters. The diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 885d8a10e..162bfb0a6 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -19,8 +19,7 @@ #include "libpq-fe.h" #include "miscadmin.h" -#include "access/xact.h" -#include "catalog/namespace.h" + #include "catalog/pg_class.h" #include "commands/dbcommands.h" #include "commands/event_trigger.h" @@ -57,22 +56,14 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" -#if (PG_VERSION_NUM >= 100000) -#include "utils/varlena.h" -#endif - -#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" -#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE" static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); -static LOCKMODE LockModeTextToLockMode(const char *lockModeName); PG_FUNCTION_INFO_V1(master_modify_multiple_shards); -PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -212,26 +203,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); - /* - * We lock the relation we're TRUNCATING on the other worker nodes before - * executing the truncate commands on the shards. This is necessary to prevent - * distributed deadlocks where a concurrent operation on the same table (or a - * cascading table) is executed on the other nodes. - * - * Note that we should skip the current node to prevent a self-deadlock that's why - * we use OTHER_WORKERS tag. - */ - if (truncateOperation && ShouldSyncTableMetadata(relationId)) - { - char *qualifiedRelationName = generate_qualified_relation_name(relationId); - StringInfo lockRelation = makeStringInfo(); - - appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName, - REMOTE_LOCK_MODE_FOR_TRUNCATE); - - SendCommandToWorkers(OTHER_WORKERS, lockRelation->data); - } - if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { affectedTupleCount = @@ -317,105 +288,3 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command) return false; } - - -/* - * lock_relation_if_exists gets a relation name and lock mode - * and returns true if the relation exists and can be locked with - * the given lock mode. If the relation doesn't exists, the function - * return false. - * - * The relation name should be qualified with the schema name. - * - * The function errors out of the lockmode isn't defined in the PostgreSQL's - * explicit locking table. - */ -Datum -lock_relation_if_exists(PG_FUNCTION_ARGS) -{ - text *relationName = PG_GETARG_TEXT_P(0); - text *lockModeText = PG_GETARG_TEXT_P(1); - - Oid relationId = InvalidOid; - char *lockModeCString = text_to_cstring(lockModeText); - List *relationNameList = NIL; - RangeVar *relation = NULL; - LOCKMODE lockMode = NoLock; - - /* ensure that we're in a transaction block */ - RequireTransactionBlock(true, "lock_relation_if_exists"); - - relationId = ResolveRelationId(relationName, true); - if (!OidIsValid(relationId)) - { - PG_RETURN_BOOL(false); - } - - /* get the lock mode */ - lockMode = LockModeTextToLockMode(lockModeCString); - - /* resolve relationId from passed in schema and relation name */ - relationNameList = textToQualifiedNameList(relationName); - relation = makeRangeVarFromNameList(relationNameList); - - /* lock the relation with the lock mode */ - RangeVarGetRelid(relation, lockMode, false); - - PG_RETURN_BOOL(true); -} - - -/* - * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. - * The function errors out if the input lock mode isn't defined in the PostgreSQL's - * explicit locking table. - */ -static LOCKMODE -LockModeTextToLockMode(const char *lockModeName) -{ - if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0) - { - /* there is no explict call for NoLock, but keeping it here for convinience */ - return NoLock; - } - else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0) - { - return AccessShareLock; - } - else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0) - { - return RowShareLock; - } - else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return RowExclusiveLock; - } - else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ShareUpdateExclusiveLock; - } - else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0) - { - return ShareLock; - } - else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ShareRowExclusiveLock; - } - else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ExclusiveLock; - } - else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return AccessExclusiveLock; - } - else - { - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("unknown lock mode: %s", lockModeName))); - } - - return NoLock; -} diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 9d680920b..3643109b8 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -59,6 +59,11 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void * pfree(array); + if (sortedList != NIL) + { + sortedList->type = pointerList->type; + } + return sortedList; } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index aa6dc2018..fce2de7a0 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -17,6 +17,8 @@ #include "c.h" #include "miscadmin.h" +#include "access/xact.h" +#include "catalog/namespace.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" @@ -31,7 +33,38 @@ #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "distributed/version_compat.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#if (PG_VERSION_NUM >= 100000) +#include "utils/varlena.h" +#endif + + +/* static definition and declarations */ +struct LockModeToStringType +{ + LOCKMODE lockMode; + const char *name; +}; + +/* + * list of lock mode mappings, number of items need to be kept in sync + * with lock_mode_to_string_map_count. + */ +static const struct LockModeToStringType lockmode_to_string_map[] = { + { NoLock, "NoLock" }, + { AccessShareLock, "ACCESS SHARE" }, + { RowShareLock, "ROW SHARE" }, + { RowExclusiveLock, "ROW EXCLUSIVE" }, + { ShareUpdateExclusiveLock, "SHARE UPDATE EXCLUSIVE" }, + { ShareLock, "SHARE" }, + { ShareRowExclusiveLock, "SHARE ROW EXCLUSIVE" }, + { ExclusiveLock, "EXCLUSIVE" }, + { AccessExclusiveLock, "ACCESS EXCLUSIVE" } +}; +static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) / + sizeof(lockmode_to_string_map[0]); /* local function forward declarations */ @@ -45,6 +78,7 @@ static bool IsFirstWorkerNode(); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(lock_shard_metadata); PG_FUNCTION_INFO_V1(lock_shard_resources); +PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -70,7 +104,7 @@ lock_shard_metadata(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("no locks specified"))); } - /* we don't want random users to block writes */ + /* we don't want random users to block writes */ EnsureSuperUser(); shardIdCount = ArrayObjectCount(shardIdArrayObject); @@ -110,7 +144,7 @@ lock_shard_resources(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("no locks specified"))); } - /* we don't want random users to block writes */ + /* we don't want random users to block writes */ EnsureSuperUser(); shardIdCount = ArrayObjectCount(shardIdArrayObject); @@ -438,7 +472,7 @@ LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); foreach(shardIntervalCell, shardIntervalList) @@ -460,7 +494,7 @@ LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode) { ListCell *shardPlacementCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardPlacementList = SortList(shardPlacementList, CompareShardPlacementsByShardId); @@ -516,7 +550,7 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); foreach(shardIntervalCell, shardIntervalList) @@ -538,7 +572,7 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) { ListCell *relationShardCell = NULL; - /* lock shards in a consistent order to prevent deadlock */ + /* lock shards in a consistent order to prevent deadlock */ relationShardList = SortList(relationShardList, CompareRelationShards); foreach(relationShardCell, relationShardList) @@ -604,11 +638,11 @@ LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode) void LockPartitionRelations(Oid relationId, LOCKMODE lockMode) { - /* - * PartitionList function generates partition list in the same order - * as PostgreSQL. Therefore we do not need to sort it before acquiring - * locks. - */ + /* + * PartitionList function generates partition list in the same order + * as PostgreSQL. Therefore we do not need to sort it before acquiring + * locks. + */ List *partitionList = PartitionList(relationId); ListCell *partitionCell = NULL; @@ -618,3 +652,115 @@ LockPartitionRelations(Oid relationId, LOCKMODE lockMode) LockRelationOid(partitionRelationId, lockMode); } } + + +/* + * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +LOCKMODE +LockModeTextToLockMode(const char *lockModeName) +{ + LOCKMODE lockMode = -1; + + int lockIndex = 0; + for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++) + { + const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex; + if (pg_strncasecmp(lockMap->name, lockModeName, NAMEDATALEN) == 0) + { + lockMode = lockMap->lockMode; + break; + } + } + + /* we could not find the lock mode we are looking for */ + if (lockMode == -1) + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode: %s", lockModeName))); + } + + return lockMode; +} + + +/* + * LockModeToLockModeText gets a lockMode enum and returns its corresponding text + * representation. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +const char * +LockModeToLockModeText(LOCKMODE lockMode) +{ + const char *lockModeText = NULL; + + int lockIndex = 0; + for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++) + { + const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex; + if (lockMode == lockMap->lockMode) + { + lockModeText = lockMap->name; + break; + } + } + + /* we could not find the lock mode we are looking for */ + if (lockModeText == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode enum: %d", (int) lockMode))); + } + + return lockModeText; +} + + +/* + * lock_relation_if_exists gets a relation name and lock mode + * and returns true if the relation exists and can be locked with + * the given lock mode. If the relation doesn't exists, the function + * return false. + * + * The relation name should be qualified with the schema name. + * + * The function errors out of the lockmode isn't defined in the PostgreSQL's + * explicit locking table. + */ +Datum +lock_relation_if_exists(PG_FUNCTION_ARGS) +{ + text *relationName = PG_GETARG_TEXT_P(0); + text *lockModeText = PG_GETARG_TEXT_P(1); + Oid relationId = InvalidOid; + char *lockModeCString = text_to_cstring(lockModeText); + List *relationNameList = NIL; + RangeVar *relation = NULL; + LOCKMODE lockMode = NoLock; + + /* ensure that we're in a transaction block */ + RequireTransactionBlock(true, "lock_relation_if_exists"); + + relationId = ResolveRelationId(relationName, true); + if (!OidIsValid(relationId)) + { + PG_RETURN_BOOL(false); + } + + /* get the lock mode */ + lockMode = LockModeTextToLockMode(lockModeCString); + + /* resolve relationId from passed in schema and relation name */ + relationNameList = textToQualifiedNameList(relationName); + relation = makeRangeVarFromNameList(relationNameList); + + /* lock the relation with the lock mode */ + RangeVarGetRelid(relation, lockMode, false); + + PG_RETURN_BOOL(true); +} diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 4d78966c0..b38c3ec67 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -97,4 +97,8 @@ extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode); /* Lock parent table's colocated shard resource */ extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode); +/* Lock mode translation between text and enum */ +extern LOCKMODE LockModeTextToLockMode(const char *lockModeName); +extern const char * LockModeToLockModeText(LOCKMODE lockMode); + #endif /* RESOURCE_LOCK_H */