diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 0e0a60dab..91f02c3a6 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -36,6 +36,7 @@ #include "distributed/colocation_utils.h" #include "distributed/foreign_constraint.h" #include "distributed/intermediate_results.h" +#include "distributed/listutils.h" #include "distributed/maintenanced.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" @@ -50,6 +51,7 @@ #include "distributed/multi_utility.h" /* IWYU pragma: keep */ #include "distributed/pg_dist_partition.h" #include "distributed/policy.h" +#include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" @@ -87,6 +89,9 @@ #include "utils/syscache.h" +#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" + + bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -146,6 +151,9 @@ static void ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); +static void ProcessTruncateStatement(TruncateStmt *truncateStatement); +static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); +static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId); static void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt); static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement); @@ -386,6 +394,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, if (IsA(parsetree, TruncateStmt)) { ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); + ProcessTruncateStatement((TruncateStmt *) parsetree); } /* only generate worker DDLJobs if propagation is enabled */ @@ -2921,6 +2930,147 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement) } +/* + * ProcessTruncateStatement handles distributed locking + * of truncated tables before standard utility takes over. + * + * Actual distributed truncation occurs inside truncate trigger. + * + * This is only for distributed serialization of truncate commands. + * The function assumes that there is no foreign key relation between + * non-distributed and distributed relations. + */ +static void +ProcessTruncateStatement(TruncateStmt *truncateStatement) +{ + LockTruncatedRelationMetadataInWorkers(truncateStatement); +} + + +/* + * LockTruncatedRelationMetadataInWorkers determines if distributed + * lock is necessary for truncated relations, and acquire locks. + */ +static void +LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) +{ + List *distributedRelationList = NIL; + ListCell *relationCell = NULL; + + /* nothing to do if there is no metadata at worker nodes */ + if (!ClusterHasKnownMetadataWorkers()) + { + return; + } + + foreach(relationCell, truncateStatement->relations) + { + RangeVar *relationRV = (RangeVar *) lfirst(relationCell); + Relation relation = heap_openrv(relationRV, NoLock); + Oid relationId = RelationGetRelid(relation); + DistTableCacheEntry *cacheEntry = NULL; + List *referencingTableList = NIL; + ListCell *referencingTableCell = NULL; + + if (!IsDistributedTable(relationId)) + { + heap_close(relation, NoLock); + continue; + } + + if (list_member_oid(distributedRelationList, relationId)) + { + heap_close(relation, NoLock); + continue; + } + + distributedRelationList = lappend_oid(distributedRelationList, relationId); + + cacheEntry = DistributedTableCacheEntry(relationId); + Assert(cacheEntry != NULL); + + referencingTableList = cacheEntry->referencingRelationsViaForeignKey; + foreach(referencingTableCell, referencingTableList) + { + Oid referencingRelationId = lfirst_oid(referencingTableCell); + distributedRelationList = list_append_unique_oid(distributedRelationList, + referencingRelationId); + } + + heap_close(relation, NoLock); + } + + if (distributedRelationList != NIL) + { + AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock); + } +} + + +/* + * AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes + * for given list of relations ids. Relation id list and worker node list + * sorted so that the lock is acquired in the same order regardless of which + * node it was run on. Notice that no lock is acquired on coordinator node. + * + * Notice that the locking functions is sent to all workers regardless of if + * it has metadata or not. This is because a worker node only knows itself + * and previous workers that has metadata sync turned on. The node does not + * know about other nodes that have metadata sync turned on afterwards. + */ +static void +AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) +{ + ListCell *relationIdCell = NULL; + List *workerNodeList = ActivePrimaryNodeList(); + const char *lockModeText = LockModeToLockModeText(lockMode); + + /* + * We want to acquire locks in the same order accross the nodes. + * Although relation ids may change, their ordering will not. + */ + relationIdList = SortList(relationIdList, CompareOids); + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + BeginOrContinueCoordinatedTransaction(); + + foreach(relationIdCell, relationIdList) + { + Oid relationId = lfirst_oid(relationIdCell); + + /* + * We only acquire distributed lock on relation if + * the relation is sync'ed between mx nodes. + */ + if (ShouldSyncTableMetadata(relationId)) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + StringInfo lockRelationCommand = makeStringInfo(); + ListCell *workerNodeCell = NULL; + + appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS, + qualifiedRelationName, lockModeText); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + /* if local node is one of the targets, acquire the lock locally */ + if (workerNode->groupId == GetLocalGroupId()) + { + LockRelationOid(relationId, lockMode); + continue; + } + + SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data); + } + } + } +} + + /* * OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER * SEQUENCE command, extracting the first OWNED BY option it encounters. The diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 885d8a10e..162bfb0a6 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -19,8 +19,7 @@ #include "libpq-fe.h" #include "miscadmin.h" -#include "access/xact.h" -#include "catalog/namespace.h" + #include "catalog/pg_class.h" #include "commands/dbcommands.h" #include "commands/event_trigger.h" @@ -57,22 +56,14 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" -#if (PG_VERSION_NUM >= 100000) -#include "utils/varlena.h" -#endif - -#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');" -#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE" static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType taskType); static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command); -static LOCKMODE LockModeTextToLockMode(const char *lockModeName); PG_FUNCTION_INFO_V1(master_modify_multiple_shards); -PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -212,26 +203,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) taskList = ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType); - /* - * We lock the relation we're TRUNCATING on the other worker nodes before - * executing the truncate commands on the shards. This is necessary to prevent - * distributed deadlocks where a concurrent operation on the same table (or a - * cascading table) is executed on the other nodes. - * - * Note that we should skip the current node to prevent a self-deadlock that's why - * we use OTHER_WORKERS tag. - */ - if (truncateOperation && ShouldSyncTableMetadata(relationId)) - { - char *qualifiedRelationName = generate_qualified_relation_name(relationId); - StringInfo lockRelation = makeStringInfo(); - - appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName, - REMOTE_LOCK_MODE_FOR_TRUNCATE); - - SendCommandToWorkers(OTHER_WORKERS, lockRelation->data); - } - if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { affectedTupleCount = @@ -317,105 +288,3 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command) return false; } - - -/* - * lock_relation_if_exists gets a relation name and lock mode - * and returns true if the relation exists and can be locked with - * the given lock mode. If the relation doesn't exists, the function - * return false. - * - * The relation name should be qualified with the schema name. - * - * The function errors out of the lockmode isn't defined in the PostgreSQL's - * explicit locking table. - */ -Datum -lock_relation_if_exists(PG_FUNCTION_ARGS) -{ - text *relationName = PG_GETARG_TEXT_P(0); - text *lockModeText = PG_GETARG_TEXT_P(1); - - Oid relationId = InvalidOid; - char *lockModeCString = text_to_cstring(lockModeText); - List *relationNameList = NIL; - RangeVar *relation = NULL; - LOCKMODE lockMode = NoLock; - - /* ensure that we're in a transaction block */ - RequireTransactionBlock(true, "lock_relation_if_exists"); - - relationId = ResolveRelationId(relationName, true); - if (!OidIsValid(relationId)) - { - PG_RETURN_BOOL(false); - } - - /* get the lock mode */ - lockMode = LockModeTextToLockMode(lockModeCString); - - /* resolve relationId from passed in schema and relation name */ - relationNameList = textToQualifiedNameList(relationName); - relation = makeRangeVarFromNameList(relationNameList); - - /* lock the relation with the lock mode */ - RangeVarGetRelid(relation, lockMode, false); - - PG_RETURN_BOOL(true); -} - - -/* - * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. - * The function errors out if the input lock mode isn't defined in the PostgreSQL's - * explicit locking table. - */ -static LOCKMODE -LockModeTextToLockMode(const char *lockModeName) -{ - if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0) - { - /* there is no explict call for NoLock, but keeping it here for convinience */ - return NoLock; - } - else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0) - { - return AccessShareLock; - } - else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0) - { - return RowShareLock; - } - else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return RowExclusiveLock; - } - else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ShareUpdateExclusiveLock; - } - else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0) - { - return ShareLock; - } - else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ShareRowExclusiveLock; - } - else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return ExclusiveLock; - } - else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0) - { - return AccessExclusiveLock; - } - else - { - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("unknown lock mode: %s", lockModeName))); - } - - return NoLock; -} diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 9d680920b..3643109b8 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -59,6 +59,11 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void * pfree(array); + if (sortedList != NIL) + { + sortedList->type = pointerList->type; + } + return sortedList; } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index aa6dc2018..fce2de7a0 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -17,6 +17,8 @@ #include "c.h" #include "miscadmin.h" +#include "access/xact.h" +#include "catalog/namespace.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" @@ -31,7 +33,38 @@ #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "distributed/version_compat.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#if (PG_VERSION_NUM >= 100000) +#include "utils/varlena.h" +#endif + + +/* static definition and declarations */ +struct LockModeToStringType +{ + LOCKMODE lockMode; + const char *name; +}; + +/* + * list of lock mode mappings, number of items need to be kept in sync + * with lock_mode_to_string_map_count. + */ +static const struct LockModeToStringType lockmode_to_string_map[] = { + { NoLock, "NoLock" }, + { AccessShareLock, "ACCESS SHARE" }, + { RowShareLock, "ROW SHARE" }, + { RowExclusiveLock, "ROW EXCLUSIVE" }, + { ShareUpdateExclusiveLock, "SHARE UPDATE EXCLUSIVE" }, + { ShareLock, "SHARE" }, + { ShareRowExclusiveLock, "SHARE ROW EXCLUSIVE" }, + { ExclusiveLock, "EXCLUSIVE" }, + { AccessExclusiveLock, "ACCESS EXCLUSIVE" } +}; +static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) / + sizeof(lockmode_to_string_map[0]); /* local function forward declarations */ @@ -45,6 +78,7 @@ static bool IsFirstWorkerNode(); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(lock_shard_metadata); PG_FUNCTION_INFO_V1(lock_shard_resources); +PG_FUNCTION_INFO_V1(lock_relation_if_exists); /* @@ -70,7 +104,7 @@ lock_shard_metadata(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("no locks specified"))); } - /* we don't want random users to block writes */ + /* we don't want random users to block writes */ EnsureSuperUser(); shardIdCount = ArrayObjectCount(shardIdArrayObject); @@ -110,7 +144,7 @@ lock_shard_resources(PG_FUNCTION_ARGS) ereport(ERROR, (errmsg("no locks specified"))); } - /* we don't want random users to block writes */ + /* we don't want random users to block writes */ EnsureSuperUser(); shardIdCount = ArrayObjectCount(shardIdArrayObject); @@ -438,7 +472,7 @@ LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); foreach(shardIntervalCell, shardIntervalList) @@ -460,7 +494,7 @@ LockShardsInPlacementListMetadata(List *shardPlacementList, LOCKMODE lockMode) { ListCell *shardPlacementCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardPlacementList = SortList(shardPlacementList, CompareShardPlacementsByShardId); @@ -516,7 +550,7 @@ LockShardListResources(List *shardIntervalList, LOCKMODE lockMode) { ListCell *shardIntervalCell = NULL; - /* lock shards in order of shard id to prevent deadlock */ + /* lock shards in order of shard id to prevent deadlock */ shardIntervalList = SortList(shardIntervalList, CompareShardIntervalsById); foreach(shardIntervalCell, shardIntervalList) @@ -538,7 +572,7 @@ LockRelationShardResources(List *relationShardList, LOCKMODE lockMode) { ListCell *relationShardCell = NULL; - /* lock shards in a consistent order to prevent deadlock */ + /* lock shards in a consistent order to prevent deadlock */ relationShardList = SortList(relationShardList, CompareRelationShards); foreach(relationShardCell, relationShardList) @@ -604,11 +638,11 @@ LockPartitionsInRelationList(List *relationIdList, LOCKMODE lockmode) void LockPartitionRelations(Oid relationId, LOCKMODE lockMode) { - /* - * PartitionList function generates partition list in the same order - * as PostgreSQL. Therefore we do not need to sort it before acquiring - * locks. - */ + /* + * PartitionList function generates partition list in the same order + * as PostgreSQL. Therefore we do not need to sort it before acquiring + * locks. + */ List *partitionList = PartitionList(relationId); ListCell *partitionCell = NULL; @@ -618,3 +652,115 @@ LockPartitionRelations(Oid relationId, LOCKMODE lockMode) LockRelationOid(partitionRelationId, lockMode); } } + + +/* + * LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +LOCKMODE +LockModeTextToLockMode(const char *lockModeName) +{ + LOCKMODE lockMode = -1; + + int lockIndex = 0; + for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++) + { + const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex; + if (pg_strncasecmp(lockMap->name, lockModeName, NAMEDATALEN) == 0) + { + lockMode = lockMap->lockMode; + break; + } + } + + /* we could not find the lock mode we are looking for */ + if (lockMode == -1) + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode: %s", lockModeName))); + } + + return lockMode; +} + + +/* + * LockModeToLockModeText gets a lockMode enum and returns its corresponding text + * representation. + * The function errors out if the input lock mode isn't defined in the PostgreSQL's + * explicit locking table. + */ +const char * +LockModeToLockModeText(LOCKMODE lockMode) +{ + const char *lockModeText = NULL; + + int lockIndex = 0; + for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++) + { + const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex; + if (lockMode == lockMap->lockMode) + { + lockModeText = lockMap->name; + break; + } + } + + /* we could not find the lock mode we are looking for */ + if (lockModeText == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("unknown lock mode enum: %d", (int) lockMode))); + } + + return lockModeText; +} + + +/* + * lock_relation_if_exists gets a relation name and lock mode + * and returns true if the relation exists and can be locked with + * the given lock mode. If the relation doesn't exists, the function + * return false. + * + * The relation name should be qualified with the schema name. + * + * The function errors out of the lockmode isn't defined in the PostgreSQL's + * explicit locking table. + */ +Datum +lock_relation_if_exists(PG_FUNCTION_ARGS) +{ + text *relationName = PG_GETARG_TEXT_P(0); + text *lockModeText = PG_GETARG_TEXT_P(1); + Oid relationId = InvalidOid; + char *lockModeCString = text_to_cstring(lockModeText); + List *relationNameList = NIL; + RangeVar *relation = NULL; + LOCKMODE lockMode = NoLock; + + /* ensure that we're in a transaction block */ + RequireTransactionBlock(true, "lock_relation_if_exists"); + + relationId = ResolveRelationId(relationName, true); + if (!OidIsValid(relationId)) + { + PG_RETURN_BOOL(false); + } + + /* get the lock mode */ + lockMode = LockModeTextToLockMode(lockModeCString); + + /* resolve relationId from passed in schema and relation name */ + relationNameList = textToQualifiedNameList(relationName); + relation = makeRangeVarFromNameList(relationNameList); + + /* lock the relation with the lock mode */ + RangeVarGetRelid(relation, lockMode, false); + + PG_RETURN_BOOL(true); +} diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 4d78966c0..b38c3ec67 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -97,4 +97,8 @@ extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode); /* Lock parent table's colocated shard resource */ extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode); +/* Lock mode translation between text and enum */ +extern LOCKMODE LockModeTextToLockMode(const char *lockModeName); +extern const char * LockModeToLockModeText(LOCKMODE lockMode); + #endif /* RESOURCE_LOCK_H */