Test Fix + Comments + Renames

pull/5899/head
gledis69 2022-04-11 14:42:25 +03:00
parent e50ad9e8f9
commit a7fa46ea63
2 changed files with 48 additions and 29 deletions

View File

@ -893,12 +893,12 @@ LockParentShardResourceIfPartition(List *shardIntervalList, LOCKMODE lockMode)
/*
* LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
* LockModeCStringToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
* explicit locking table.
*/
LOCKMODE
LockModeTextToLockMode(const char *lockModeName)
LockModeCStringToLockMode(const char *lockModeName)
{
LOCKMODE lockMode = -1;
@ -925,13 +925,13 @@ LockModeTextToLockMode(const char *lockModeName)
/*
* LockModeToLockModeText gets a lockMode enum and returns its corresponding text
* LockModeToLockModeCString gets a lockMode enum and returns its corresponding text
* representation.
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
* explicit locking table.
*/
const char *
LockModeToLockModeText(LOCKMODE lockMode)
LockModeToLockModeCString(LOCKMODE lockMode)
{
const char *lockModeText = NULL;
@ -958,16 +958,46 @@ LockModeToLockModeText(LOCKMODE lockMode)
/*
* lock_relation_if_exists gets a relation name and lock mode
* LockRelationIfExists gets a relation name, lock mode, a nowait flag
* and returns true if the relation exists and can be locked with
* the given lock mode. If the relation doesn't exists, the function
* return false.
*
* If nowait is true, and the lock is already acquired an error will be
* thrown.
*
* The relation name should be qualified with the schema name.
*
* The function errors out if the lockmode isn't defined in the PostgreSQL's
* explicit locking table.
*/
static bool
LockRelationIfExists(text *relationName, const char *lockModeCString, bool nowait)
{
/* get the lock mode */
LOCKMODE lockMode = LockModeCStringToLockMode(lockModeCString);
/* resolve relationId from passed in schema and relation name */
List *relationNameList = textToQualifiedNameList(relationName);
RangeVar *relation = makeRangeVarFromNameList(relationNameList);
uint32 nowaitFlag = nowait ? RVR_NOWAIT : 0;
/* lock the relation with the lock mode */
Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK |
nowaitFlag,
CitusRangeVarCallbackForLockTable,
(void *) &lockMode);
bool relationExists = OidIsValid(relationId);
return relationExists;
}
/*
* lock_relation_if_exists is a wrapper for LockRelationIfExists.
* Because lock_relation_if_exists is a udf and exposed to the user,
* this function also checks that it must be executed in a transaction block.
*/
Datum
lock_relation_if_exists(PG_FUNCTION_ARGS)
{
@ -984,21 +1014,7 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
/* ensure that we're in a transaction block */
RequireTransactionBlock(true, "lock_relation_if_exists");
/* get the lock mode */
LOCKMODE lockMode = LockModeTextToLockMode(lockModeCString);
/* resolve relationId from passed in schema and relation name */
List *relationNameList = textToQualifiedNameList(relationName);
RangeVar *relation = makeRangeVarFromNameList(relationNameList);
uint32 nowaitFlag = nowait ? RVR_NOWAIT : 0;
/* lock the relation with the lock mode */
Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK |
nowaitFlag,
CitusRangeVarCallbackForLockTable,
(void *) &lockMode);
bool relationExists = OidIsValid(relationId);
bool relationExists = LockRelationIfExists(relationName, lockModeCString, nowait);
PG_RETURN_BOOL(relationExists);
}
@ -1081,6 +1097,9 @@ CitusLockTableAclCheck(Oid relationId, LOCKMODE lockmode, Oid userId)
* it has metadata or not. This is because a worker node only knows itself
* and previous workers that has metadata sync turned on. The node does not
* know about other nodes that have metadata sync turned on afterwards.
*
* A nowait flag is used to require the locks to be available immediately
* and if that is not the case, an error will be thrown
*/
void
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool nowait)
@ -1088,7 +1107,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool
Oid relationId = InvalidOid;
List *workerNodeList = ActivePrimaryNodeList(NoLock);
const char *lockModeText = LockModeToLockModeText(lockMode);
const char *lockModeCString = LockModeToLockModeCString(lockMode);
/*
* We want to acquire locks in the same order across the nodes.
@ -1120,7 +1139,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool
LOCK_RELATION_IF_EXISTS;
appendStringInfo(lockRelationCommand, lockCommand,
quote_literal_cstr(qualifiedRelationName),
lockModeText);
lockModeCString);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
@ -1131,11 +1150,11 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == localGroupId)
{
DirectFunctionCall3(
lock_relation_if_exists,
(Datum) cstring_to_text(qualifiedRelationName),
(Datum) cstring_to_text(lockModeText),
(Datum) nowait);
LockRelationIfExists(
cstring_to_text(qualifiedRelationName),
lockModeCString,
nowait
);
continue;
}

View File

@ -153,8 +153,8 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList,
LOCKMODE lockMode);
/* Lock mode translation between text and enum */
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
extern LOCKMODE LockModeCStringToLockMode(const char *lockModeName);
extern const char * LockModeToLockModeCString(LOCKMODE lockMode);
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode,
bool nowait);
#endif /* RESOURCE_LOCK_H */