diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index e8ea461b4..92433f1fa 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -39,16 +39,11 @@ #include "utils/regproc.h" #include "utils/rel.h" - -#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, '%s');" - - /* Local functions forward declarations for unsupported command checks */ static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command); static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement); static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement); -static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode); static List * TruncateTaskList(Oid relationId); @@ -402,74 +397,8 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement) if (distributedRelationList != NIL) { - AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock); - } -} - - -/* - * AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes - * for given list of relations ids. Relation id list and worker node list - * sorted so that the lock is acquired in the same order regardless of which - * node it was run on. Notice that no lock is acquired on coordinator node. - * - * Notice that the locking functions is sent to all workers regardless of if - * it has metadata or not. This is because a worker node only knows itself - * and previous workers that has metadata sync turned on. The node does not - * know about other nodes that have metadata sync turned on afterwards. - */ -static void -AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) -{ - Oid relationId = InvalidOid; - List *workerNodeList = ActivePrimaryNodeList(NoLock); - const char *lockModeText = LockModeToLockModeText(lockMode); - - /* - * We want to acquire locks in the same order across the nodes. - * Although relation ids may change, their ordering will not. - */ - relationIdList = SortList(relationIdList, CompareOids); - workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - - UseCoordinatedTransaction(); - - int32 localGroupId = GetLocalGroupId(); - - foreach_oid(relationId, relationIdList) - { - /* - * We only acquire distributed lock on relation if - * the relation is sync'ed between mx nodes. - * - * Even if users disable metadata sync, we cannot - * allow them not to acquire the remote locks. - * Hence, we have !IsCoordinator() check. - */ - if (ShouldSyncTableMetadata(relationId) || !IsCoordinator()) - { - char *qualifiedRelationName = generate_qualified_relation_name(relationId); - StringInfo lockRelationCommand = makeStringInfo(); - - appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS, - quote_literal_cstr(qualifiedRelationName), - lockModeText); - - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, workerNodeList) - { - const char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - - /* if local node is one of the targets, acquire the lock locally */ - if (workerNode->groupId == localGroupId) - { - LockRelationOid(relationId, lockMode); - continue; - } - - SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data); - } - } + bool nowait = false; + AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock, + nowait); } } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 9c93f0737..06f4aa9fe 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -75,6 +75,7 @@ #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "distributed/listutils.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE; @@ -163,7 +164,6 @@ multi_ProcessUtility(PlannedStmt *pstmt, parsetree = pstmt->utilityStmt; if (IsA(parsetree, TransactionStmt) || - IsA(parsetree, LockStmt) || IsA(parsetree, ListenStmt) || IsA(parsetree, NotifyStmt) || IsA(parsetree, ExecuteStmt) || @@ -468,6 +468,36 @@ ProcessUtilityInternal(PlannedStmt *pstmt, PreprocessTruncateStatement((TruncateStmt *) parsetree); } + if (IsA(parsetree, LockStmt)) + { + LockStmt *stmt = (LockStmt *) parsetree; + List *distributedRelationList = NIL; + + RangeVar *rangeVar = NULL; + foreach_ptr(rangeVar, stmt->relations) + { + Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false); + + if (!IsCitusTable(relationId)) + { + continue; + } + + if (list_member_oid(distributedRelationList, relationId)) + { + continue; + } + + distributedRelationList = lappend_oid(distributedRelationList, relationId); + } + + if (distributedRelationList != NIL) + { + AcquireDistributedLockOnRelations(distributedRelationList, stmt->mode, + stmt->nowait); + } + } + /* * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * and distribute the partition if necessary. diff --git a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql index 374350d56..2d439a76b 100644 --- a/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-2--11.1-1.sql @@ -6,3 +6,14 @@ DROP FUNCTION pg_catalog.worker_hash_partition_table(bigint, integer, text, text DROP FUNCTION pg_catalog.worker_merge_files_into_table(bigint, integer, text[], text[]); DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, text, oid, anyarray); DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); + +SET search_path = 'pg_catalog'; + +CREATE OR REPLACE FUNCTION lock_relation_if_exists(table_name text, lock_mode text, nowait boolean) +RETURNS BOOL +LANGUAGE C STRICT as 'MODULE_PATHNAME', +$$lock_relation_if_exists$$; +COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text, nowait boolean) +IS 'locks relation in the lock_mode if the relation exists'; + +RESET search_path; diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index a92426f5b..1cee4a5a2 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -975,6 +975,15 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) text *lockModeText = PG_GETARG_TEXT_P(1); char *lockModeCString = text_to_cstring(lockModeText); + bool nowait = false; + if (!PG_ARGISNULL(2)) + { + nowait = PG_GETARG_BOOL(2); + } + + ereport(NOTICE, errmsg("%s %s %s", text_to_cstring(relationName), text_to_cstring( + lockModeText), nowait ? "nowait" : "wait")); + /* ensure that we're in a transaction block */ RequireTransactionBlock(true, "lock_relation_if_exists"); @@ -985,8 +994,11 @@ lock_relation_if_exists(PG_FUNCTION_ARGS) List *relationNameList = textToQualifiedNameList(relationName); RangeVar *relation = makeRangeVarFromNameList(relationNameList); + uint32 nowaitFlag = nowait ? RVR_NOWAIT : 0; + /* lock the relation with the lock mode */ - Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK, + Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK | + nowaitFlag, CitusRangeVarCallbackForLockTable, (void *) &lockMode); bool relationExists = OidIsValid(relationId); @@ -1060,3 +1072,78 @@ CitusLockTableAclCheck(Oid relationId, LOCKMODE lockmode, Oid userId) return aclResult; } + + +/* + * AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes + * for given list of relations ids. Relation id list and worker node list + * sorted so that the lock is acquired in the same order regardless of which + * node it was run on. Notice that no lock is acquired on coordinator node. + * + * Notice that the locking functions is sent to all workers regardless of if + * it has metadata or not. This is because a worker node only knows itself + * and previous workers that has metadata sync turned on. The node does not + * know about other nodes that have metadata sync turned on afterwards. + */ +void +AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool nowait) +{ + Oid relationId = InvalidOid; + List *workerNodeList = ActivePrimaryNodeList(NoLock); + + const char *lockModeText = LockModeToLockModeText(lockMode); + + /* + * We want to acquire locks in the same order across the nodes. + * Although relation ids may change, their ordering will not. + */ + relationIdList = SortList(relationIdList, CompareOids); + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + UseCoordinatedTransaction(); + + int32 localGroupId = GetLocalGroupId(); + + foreach_oid(relationId, relationIdList) + { + /* + * We only acquire distributed lock on relation if + * the relation is sync'ed between mx nodes. + * + * Even if users disable metadata sync, we cannot + * allow them not to acquire the remote locks. + * Hence, we have !IsCoordinator() check. + */ + if (ShouldSyncTableMetadata(relationId) || !IsCoordinator()) + { + char *qualifiedRelationName = generate_qualified_relation_name(relationId); + StringInfo lockRelationCommand = makeStringInfo(); + + char *lockCommand = nowait ? LOCK_RELATION_IF_EXISTS_NOWAIT : + LOCK_RELATION_IF_EXISTS; + appendStringInfo(lockRelationCommand, lockCommand, + quote_literal_cstr(qualifiedRelationName), + lockModeText); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + const char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + + /* if local node is one of the targets, acquire the lock locally */ + if (workerNode->groupId == localGroupId) + { + DirectFunctionCall3( + lock_relation_if_exists, + (Datum) cstring_to_text(qualifiedRelationName), + (Datum) cstring_to_text(lockModeText), + (Datum) nowait); + continue; + } + + SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data); + } + } + } +} diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 4fa53144c..16b8aaed9 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -110,6 +110,10 @@ typedef enum CitusOperations ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP) +#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, '%s');" +#define LOCK_RELATION_IF_EXISTS_NOWAIT \ + "SELECT lock_relation_if_exists(%s, '%s', nowait => true);" + /* Lock shard/relation metadata for safe modifications */ extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); extern void LockPlacementCleanup(void); @@ -151,5 +155,6 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList, /* Lock mode translation between text and enum */ extern LOCKMODE LockModeTextToLockMode(const char *lockModeName); extern const char * LockModeToLockModeText(LOCKMODE lockMode); - +extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, + bool nowait); #endif /* RESOURCE_LOCK_H */